-
Notifications
You must be signed in to change notification settings - Fork 153
/
Copy pathmysql.go
124 lines (113 loc) · 4.6 KB
/
mysql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package ddbuilder_mysql
import (
"context"
"fmt"
"log/slog"
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
"github.com/nucleuscloud/neosync/backend/pkg/sqlmanager"
sqlmanager_mysql "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/mysql"
sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager"
destdb_shared "github.com/nucleuscloud/neosync/internal/destination-database-builder/shared"
)
type MysqlDestinationDatabaseBuilderService struct {
logger *slog.Logger
sqlmanagerclient sqlmanager.SqlManagerClient
sourceConnection *mgmtv1alpha1.Connection
destinationConnection *mgmtv1alpha1.Connection
destOpts *mgmtv1alpha1.MysqlDestinationConnectionOptions
destdb *sqlmanager.SqlConnection
sourcedb *sqlmanager.SqlConnection
}
func NewMysqlDestinationDatabaseBuilderService(
ctx context.Context,
logger *slog.Logger,
session connectionmanager.SessionInterface,
sqlmanagerclient sqlmanager.SqlManagerClient,
sourceConnection *mgmtv1alpha1.Connection,
destinationConnection *mgmtv1alpha1.Connection,
destOpts *mgmtv1alpha1.MysqlDestinationConnectionOptions,
) (*MysqlDestinationDatabaseBuilderService, error) {
sourcedb, err := sqlmanagerclient.NewSqlConnection(ctx, session, sourceConnection, logger)
if err != nil {
return nil, fmt.Errorf("unable to create new sql db: %w", err)
}
defer sourcedb.Db().Close()
destdb, err := sqlmanagerclient.NewSqlConnection(ctx, session, destinationConnection, logger)
if err != nil {
return nil, fmt.Errorf("unable to create new sql db: %w", err)
}
return &MysqlDestinationDatabaseBuilderService{
logger: logger,
sqlmanagerclient: sqlmanagerclient,
sourceConnection: sourceConnection,
destinationConnection: destinationConnection,
destOpts: destOpts,
destdb: destdb,
sourcedb: sourcedb,
}, nil
}
func (d *MysqlDestinationDatabaseBuilderService) InitializeSchema(ctx context.Context, uniqueTables map[string]struct{}) ([]*destdb_shared.InitSchemaError, error) {
initErrors := []*destdb_shared.InitSchemaError{}
if !d.destOpts.GetInitTableSchema() {
d.logger.Info("skipping schema init as it is not enabled")
return initErrors, nil
}
tables := []*sqlmanager_shared.SchemaTable{}
for tableKey := range uniqueTables {
schema, table := sqlmanager_shared.SplitTableKey(tableKey)
tables = append(tables, &sqlmanager_shared.SchemaTable{Schema: schema, Table: table})
}
initblocks, err := d.sourcedb.Db().GetSchemaInitStatements(ctx, tables)
if err != nil {
return nil, err
}
for _, block := range initblocks {
d.logger.Info(fmt.Sprintf("[%s] found %d statements to execute during schema initialization", block.Label, len(block.Statements)))
if len(block.Statements) == 0 {
continue
}
err = d.destdb.Db().BatchExec(ctx, destdb_shared.BatchSizeConst, block.Statements, &sqlmanager_shared.BatchExecOpts{})
if err != nil {
d.logger.Error(fmt.Sprintf("unable to exec mysql %s statements: %s", block.Label, err.Error()))
if block.Label != sqlmanager_mysql.SchemasLabel {
return nil, fmt.Errorf("unable to exec mysql %s statements: %w", block.Label, err)
}
for _, stmt := range block.Statements {
err = d.destdb.Db().BatchExec(ctx, 1, []string{stmt}, &sqlmanager_shared.BatchExecOpts{})
if err != nil {
initErrors = append(initErrors, &destdb_shared.InitSchemaError{
Statement: stmt,
Error: err.Error(),
})
}
}
}
}
return initErrors, nil
}
func (d *MysqlDestinationDatabaseBuilderService) TruncateData(ctx context.Context, uniqueTables map[string]struct{}, uniqueSchemas []string) error {
if !d.destOpts.GetTruncateTable().GetTruncateBeforeInsert() {
d.logger.Info("skipping truncate as it is not enabled")
return nil
}
tableTruncate := []string{}
for table := range uniqueTables {
schema, table := sqlmanager_shared.SplitTableKey(table)
stmt, err := sqlmanager_mysql.BuildMysqlTruncateStatement(schema, table)
if err != nil {
return err
}
tableTruncate = append(tableTruncate, stmt)
}
d.logger.Info(fmt.Sprintf("executing %d sql statements that will truncate tables", len(tableTruncate)))
disableFkChecks := sqlmanager_shared.DisableForeignKeyChecks
err := d.destdb.Db().BatchExec(ctx, destdb_shared.BatchSizeConst, tableTruncate, &sqlmanager_shared.BatchExecOpts{Prefix: &disableFkChecks})
if err != nil {
return err
}
return nil
}
func (d *MysqlDestinationDatabaseBuilderService) CloseConnections() {
d.destdb.Db().Close()
}