Skip to content

Commit 1772f40

Browse files
authored
update canal to support batch later. (#93)
1 parent 4f21bc6 commit 1772f40

File tree

14 files changed

+226
-171
lines changed

14 files changed

+226
-171
lines changed

.gitignore

-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
var
21
bin

cmd/go-mysql-elasticsearch/main.go

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"syscall"
99

1010
"github.com/juju/errors"
11+
"github.com/ngaut/log"
1112
"github.com/siddontang/go-mysql-elasticsearch/river"
1213
)
1314

@@ -20,11 +21,14 @@ var data_dir = flag.String("data_dir", "", "path for go-mysql-elasticsearch to s
2021
var server_id = flag.Int("server_id", 0, "MySQL server id, as a pseudo slave")
2122
var flavor = flag.String("flavor", "", "flavor: mysql or mariadb")
2223
var execution = flag.String("exec", "", "mysqldump execution path")
24+
var logLevel = flag.String("log_level", "info", "log level")
2325

2426
func main() {
2527
runtime.GOMAXPROCS(runtime.NumCPU())
2628
flag.Parse()
2729

30+
log.SetLevelByString(*logLevel)
31+
2832
sc := make(chan os.Signal, 1)
2933
signal.Notify(sc,
3034
os.Kill,

etc/river.toml

+4-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ my_pass = ""
77
# Elasticsearch address
88
es_addr = "127.0.0.1:9200"
99

10-
# Path to store data, like master.info, and dump MySQL data
11-
data_dir = "./var"
10+
# Path to store data, like master.info, if not set or empty,
11+
# we must use this to support breakpoint resume syncing.
12+
# TODO: support other storage, like etcd.
13+
data_dir = ""
1214

1315
# Inner Http status address
1416
stat_addr = "127.0.0.1:12800"

glide.lock

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

glide.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import:
77
- package: github.com/satori/go.uuid
88
version: ^1.1.0
99
- package: github.com/siddontang/go-mysql
10-
version: 3ca161ffa3b5844340e534d4b3cdc03b60f32610
10+
version: 1a46a5f32930348fc4f6050692c5ec571e53e8f1
1111
subpackages:
1212
- canal
1313
- client

river/master.go

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package river
2+
3+
import (
4+
"bytes"
5+
"os"
6+
"path"
7+
"sync"
8+
"time"
9+
10+
"github.com/BurntSushi/toml"
11+
"github.com/juju/errors"
12+
"github.com/ngaut/log"
13+
"github.com/siddontang/go-mysql/mysql"
14+
"github.com/siddontang/go/ioutil2"
15+
)
16+
17+
type masterInfo struct {
18+
sync.RWMutex
19+
20+
Name string `toml:"bin_name"`
21+
Pos uint32 `toml:"bin_pos"`
22+
23+
filePath string
24+
lastSaveTime time.Time
25+
}
26+
27+
func loadMasterInfo(dataDir string) (*masterInfo, error) {
28+
var m masterInfo
29+
30+
if len(dataDir) == 0 {
31+
return &m, nil
32+
}
33+
34+
m.filePath = path.Join(dataDir, "master.info")
35+
m.lastSaveTime = time.Now()
36+
37+
if err := os.MkdirAll(dataDir, 0755); err != nil {
38+
return nil, errors.Trace(err)
39+
}
40+
41+
f, err := os.Open(m.filePath)
42+
if err != nil && !os.IsNotExist(errors.Cause(err)) {
43+
return nil, errors.Trace(err)
44+
} else if os.IsNotExist(errors.Cause(err)) {
45+
return &m, nil
46+
}
47+
defer f.Close()
48+
49+
_, err = toml.DecodeReader(f, &m)
50+
return &m, errors.Trace(err)
51+
}
52+
53+
func (m *masterInfo) Save(pos mysql.Position) error {
54+
log.Infof("save position %s", pos)
55+
56+
m.Lock()
57+
defer m.Unlock()
58+
59+
m.Name = pos.Name
60+
m.Pos = pos.Pos
61+
62+
if len(m.filePath) == 0 {
63+
return nil
64+
}
65+
66+
n := time.Now()
67+
if n.Sub(m.lastSaveTime) < time.Second {
68+
return nil
69+
}
70+
71+
m.lastSaveTime = n
72+
var buf bytes.Buffer
73+
e := toml.NewEncoder(&buf)
74+
75+
e.Encode(m)
76+
77+
var err error
78+
if err = ioutil2.WriteFileAtomic(m.filePath, buf.Bytes(), 0644); err != nil {
79+
log.Errorf("canal save master info to file %s err %v", m.filePath, err)
80+
}
81+
82+
return errors.Trace(err)
83+
}
84+
85+
func (m *masterInfo) Position() mysql.Position {
86+
m.RLock()
87+
defer m.RUnlock()
88+
89+
return mysql.Position{
90+
m.Name,
91+
m.Pos,
92+
}
93+
}
94+
95+
func (m *masterInfo) Close() error {
96+
pos := m.Position()
97+
98+
return m.Save(pos)
99+
}

river/river.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type River struct {
2828
es *elastic.Client
2929

3030
st *stat
31+
32+
master *masterInfo
3133
}
3234

3335
func NewRiver(c *Config) (*River, error) {
@@ -40,6 +42,10 @@ func NewRiver(c *Config) (*River, error) {
4042
r.rules = make(map[string]*Rule)
4143

4244
var err error
45+
if r.master, err = loadMasterInfo(c.DataDir); err != nil {
46+
return nil, errors.Trace(err)
47+
}
48+
4349
if err = r.newCanal(); err != nil {
4450
return nil, errors.Trace(err)
4551
}
@@ -71,7 +77,6 @@ func (r *River) newCanal() error {
7177
cfg.User = r.c.MyUser
7278
cfg.Password = r.c.MyPassword
7379
cfg.Flavor = r.c.Flavor
74-
cfg.DataDir = r.c.DataDir
7580

7681
cfg.ServerID = r.c.ServerID
7782
cfg.Dump.ExecutionPath = r.c.DumpExec
@@ -105,7 +110,7 @@ func (r *River) prepareCanal() error {
105110
r.canal.AddDumpDatabases(keys...)
106111
}
107112

108-
r.canal.RegRowsEventHandler(&rowsEventHandler{r})
113+
r.canal.SetEventHandler(&eventHandler{r})
109114

110115
return nil
111116
}
@@ -235,7 +240,8 @@ func ruleKey(schema string, table string) string {
235240
}
236241

237242
func (r *River) Run() error {
238-
if err := r.canal.Start(); err != nil {
243+
pos := r.master.Position()
244+
if err := r.canal.StartFrom(pos); err != nil {
239245
log.Errorf("start canal err %v", err)
240246
return errors.Trace(err)
241247
}
@@ -249,5 +255,7 @@ func (r *River) Close() {
249255

250256
r.canal.Close()
251257

258+
r.master.Close()
259+
252260
r.wg.Wait()
253261
}

river/sync.go

+25-10
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import (
1010
"github.com/ngaut/log"
1111
"github.com/siddontang/go-mysql-elasticsearch/elastic"
1212
"github.com/siddontang/go-mysql/canal"
13+
"github.com/siddontang/go-mysql/mysql"
14+
"github.com/siddontang/go-mysql/replication"
1315
"github.com/siddontang/go-mysql/schema"
16+
"golang.org/x/net/context"
1417
)
1518

1619
const (
@@ -23,11 +26,28 @@ const (
2326
fieldTypeList = "list"
2427
)
2528

26-
type rowsEventHandler struct {
29+
type eventHandler struct {
2730
r *River
2831
}
2932

30-
func (h *rowsEventHandler) Do(e *canal.RowsEvent) error {
33+
func (h *eventHandler) OnRotate(_ context.Context, e *replication.RotateEvent) error {
34+
pos := mysql.Position{
35+
string(e.NextLogName),
36+
uint32(e.Position),
37+
}
38+
39+
return h.r.master.Save(pos)
40+
}
41+
42+
func (h *eventHandler) OnDDL(_ context.Context, nextPos mysql.Position, _ *replication.QueryEvent) error {
43+
return h.r.master.Save(nextPos)
44+
}
45+
46+
func (h *eventHandler) OnXID(_ context.Context, nextPos mysql.Position) error {
47+
return h.r.master.Save(nextPos)
48+
}
49+
50+
func (h *eventHandler) OnRow(_ context.Context, e *canal.RowsEvent) error {
3151
rule, ok := h.r.rules[ruleKey(e.Table.Schema, e.Table.Name)]
3252
if !ok {
3353
return nil
@@ -50,16 +70,11 @@ func (h *rowsEventHandler) Do(e *canal.RowsEvent) error {
5070
return errors.Errorf("make %s ES request err %v", e.Action, err)
5171
}
5272

53-
if err := h.r.doBulk(reqs); err != nil {
54-
log.Errorf("do ES bulks err %v, stop", err)
55-
return canal.ErrHandleInterrupted
56-
}
57-
58-
return nil
73+
return h.r.doBulk(reqs)
5974
}
6075

61-
func (h *rowsEventHandler) String() string {
62-
return "ESRiverRowsEventHandler"
76+
func (h *eventHandler) String() string {
77+
return "ESRiverEventHandler"
6378
}
6479

6580
// for insert and delete

0 commit comments

Comments
 (0)