Skip to content

Commit 4d7bb34

Browse files
authored
update vendor (#171)
1 parent 74eb602 commit 4d7bb34

File tree

13 files changed

+262
-71
lines changed

13 files changed

+262
-71
lines changed

cmd/go-mysql-elasticsearch/main.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,11 @@ func main() {
8383
return
8484
}
8585

86-
r.Start()
86+
done := make(chan struct{}, 1)
87+
go func() {
88+
r.Run()
89+
done <- struct{}{}
90+
}()
8791

8892
select {
8993
case n := <-sc:
@@ -93,4 +97,5 @@ func main() {
9397
}
9498

9599
r.Close()
100+
<-done
96101
}

river/river.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import (
77
"sync"
88

99
"github.com/juju/errors"
10-
log "github.com/sirupsen/logrus"
1110
"github.com/siddontang/go-mysql-elasticsearch/elastic"
1211
"github.com/siddontang/go-mysql/canal"
12+
log "github.com/sirupsen/logrus"
1313
)
1414

1515
// In Elasticsearch, river is a pluggable service within Elasticsearch pulling data then indexing it into Elasticsearch.
@@ -249,7 +249,7 @@ func (r *River) prepareRule() error {
249249
log.Errorf("ignored table without a primary key: %s\n", rule.TableInfo.Name)
250250
}
251251
} else {
252-
rules[key] = rule;
252+
rules[key] = rule
253253
}
254254
}
255255
r.rules = rules
@@ -261,12 +261,13 @@ func ruleKey(schema string, table string) string {
261261
return fmt.Sprintf("%s:%s", schema, table)
262262
}
263263

264-
func (r *River) Start() error {
264+
// Run syncs the data from MySQL and inserts to ES.
265+
func (r *River) Run() error {
265266
r.wg.Add(1)
266267
go r.syncLoop()
267268

268269
pos := r.master.Position()
269-
if err := r.canal.StartFrom(pos); err != nil {
270+
if err := r.canal.RunFrom(pos); err != nil {
270271
log.Errorf("start canal err %v", err)
271272
return errors.Trace(err)
272273
}
@@ -290,7 +291,7 @@ func (r *River) Close() {
290291
r.wg.Wait()
291292
}
292293

293-
func isValidTables(tables[] string) bool {
294+
func isValidTables(tables []string) bool {
294295
if len(tables) > 1 {
295296
for _, table := range tables {
296297
if table == "*" {

river/river_extra_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (s *riverTestSuite) TestRiverWithParent(c *C) {
113113

114114
s.testPrepareExtraData(c)
115115

116-
river.Start()
116+
go func() { river.Run() }()
117117

118118
testWaitSyncDone(c, river)
119119

river/river_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func testWaitSyncDone(c *C, r *River) {
267267
func (s *riverTestSuite) TestRiver(c *C) {
268268
s.testPrepareData(c)
269269

270-
s.r.Start()
270+
go func() { s.r.Run() }()
271271

272272
testWaitSyncDone(c, s.r)
273273

@@ -356,7 +356,7 @@ func TestTableValidation(t *testing.T) {
356356
tables := []struct {
357357
Tables []string
358358
Expect bool
359-
} {
359+
}{
360360
{[]string{"*"}, true},
361361
{[]string{"table", "table2"}, true},
362362
{[]string{"*", "table"}, false},
@@ -371,9 +371,9 @@ func TestTableValidation(t *testing.T) {
371371

372372
func TestBuildTable(t *testing.T) {
373373
tables := []struct {
374-
Table string
374+
Table string
375375
Expect string
376-
} {
376+
}{
377377
{"*", ".*"},
378378
{"table2", "table2"},
379379
}

vendor/github.com/siddontang/go-mysql/canal/canal.go

+139-19
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/siddontang/go-mysql/canal/config.go

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)