Skip to content

Commit 772331f

Browse files
yinzhigangsiddontang
yinzhigang
authored andcommitted
add support of elasticsearch pipeline (#224)
1 parent 4dae2e1 commit 772331f

File tree

4 files changed

+34
-3
lines changed

4 files changed

+34
-3
lines changed

README.md

+15
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,21 @@ You can ignore these tables in the configuration like:
174174
skip_no_pk_table = true
175175
```
176176

177+
## Elasticsearch Pipeline
178+
You can use [Ingest Node Pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html) to pre-process documents before indexing, like JSON string decode, merge fileds and more.
179+
180+
```
181+
[[rule]]
182+
schema = "test"
183+
table = "t1"
184+
index = "t"
185+
type = "_doc"
186+
187+
# pipeline id
188+
pipeline = "my-pipeline-id"
189+
```
190+
Node: you should [create pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html) manually and Elasticsearch >= 5.0.
191+
177192
## Why not other rivers?
178193

179194
Although there are some other MySQL rivers for Elasticsearch, like [elasticsearch-river-jdbc](https://github.com/jprante/elasticsearch-river-jdbc), [elasticsearch-river-mysql](https://github.com/scharron/elasticsearch-river-mysql), I still want to build a new one with Go, why?

elastic/client.go

+4
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ type BulkRequest struct {
7979
Type string
8080
ID string
8181
Parent string
82+
Pipeline string
8283

8384
Data map[string]interface{}
8485
}
@@ -99,6 +100,9 @@ func (r *BulkRequest) bulk(buf *bytes.Buffer) error {
99100
if len(r.Parent) > 0 {
100101
metaData["_parent"] = r.Parent
101102
}
103+
if len(r.Pipeline) > 0 {
104+
metaData["pipeline"] = r.Pipeline
105+
}
102106

103107
meta[r.Action] = metaData
104108

river/rule.go

+4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ type Rule struct {
2727

2828
//only MySQL fields in filter will be synced , default sync all fields
2929
Filter []string `toml:"filter"`
30+
31+
// Elasticsearch pipeline
32+
// To pre-process documents before indexing
33+
Pipeline string `toml:"pipeline"`
3034
}
3135

3236
func newDefaultRule(schema string, table string) *Rule {

river/sync.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (r *River) makeRequest(rule *Rule, action string, rows [][]interface{}) ([]
183183
}
184184
}
185185

186-
req := &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: id, Parent: parentID}
186+
req := &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: id, Parent: parentID, Pipeline: rule.Pipeline}
187187

188188
if action == canal.DeleteAction {
189189
req.Action = elastic.ActionDelete
@@ -242,13 +242,21 @@ func (r *River) makeUpdateRequest(rule *Rule, rows [][]interface{}) ([]*elastic.
242242
req.Action = elastic.ActionDelete
243243
reqs = append(reqs, req)
244244

245-
req = &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: afterID, Parent: afterParentID}
245+
req = &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: afterID, Parent: afterParentID, Pipeline: rule.Pipeline}
246246
r.makeInsertReqData(req, rule, rows[i+1])
247247

248248
r.st.DeleteNum.Add(1)
249249
r.st.InsertNum.Add(1)
250250
} else {
251-
r.makeUpdateReqData(req, rule, rows[i], rows[i+1])
251+
if len(rule.Pipeline) > 0 {
252+
// Pipelines can only be specified on index action
253+
r.makeInsertReqData(req, rule, rows[i+1])
254+
// Make sure action is index, not create
255+
req.Action = elastic.ActionIndex
256+
req.Pipeline = rule.Pipeline
257+
} else {
258+
r.makeUpdateReqData(req, rule, rows[i], rows[i+1])
259+
}
252260
r.st.UpdateNum.Add(1)
253261
}
254262

0 commit comments

Comments
 (0)