From 7dfd04266a8aed5255c9419166c1971a1779e959 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Fri, 16 Nov 2018 19:09:56 +0800
Subject: [PATCH 01/13] Update sync.go

---
 river/sync.go | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/river/sync.go b/river/sync.go
index 3b3854ee..8965e8f4 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -514,6 +514,15 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 			case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
 				fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
 			}
+		} else {
+		    if col.Type == schema.TYPE_STRING {
+		        col.Type = schema.TYPE_DATETIME
+		        v := r.makeReqColumnData(col, value)
+                        str, _ := v.(string)
+		        stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local)  
+		        t := int64(stamp.Unix())
+			fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))
+		    }
 		}
 	}
 

From 7350dff007cacb8b31543bbb489c081d5110e624 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Tue, 20 Nov 2018 11:30:16 +0800
Subject: [PATCH 02/13] Update README.md

---
 README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/README.md b/README.md
index 4992d48b..b2d4e7c5 100644
--- a/README.md
+++ b/README.md
@@ -203,7 +203,7 @@ Although there are some other MySQL rivers for Elasticsearch, like [elasticsearc
 ## Todo
 
 + MySQL 8
-+ ES 6
++ ES 6 (After verification (version 6.4.2), it is now supported. Delete and update are supported)
 + Statistic.
 
 ## Donate

From c31ee199f2f28702c9b0d800e35d616bfd09422b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Tue, 20 Nov 2018 11:56:42 +0800
Subject: [PATCH 03/13] Update river.toml

---
 etc/river.toml | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/etc/river.toml b/etc/river.toml
index a5db6b7a..d69da4fe 100644
--- a/etc/river.toml
+++ b/etc/river.toml
@@ -105,6 +105,8 @@ id="es_id"
 tags="es_tags,list"
 # Map column `keywords` to ES with array type
 keywords=",list"
+# Map column (mysql type varchar(255) "2018-11-20 10:10:10") `time`  to ES with date type
+time="time,date"
 
 # Filter rule 
 #

From 99e1dd0930bab454d4f4c9b6e72456725bfadaa1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Wed, 21 Nov 2018 15:58:58 +0800
Subject: [PATCH 04/13] Update sync.go

---
 river/sync.go | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/river/sync.go b/river/sync.go
index 8965e8f4..885b39f4 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -519,7 +519,10 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 		        col.Type = schema.TYPE_DATETIME
 		        v := r.makeReqColumnData(col, value)
                         str, _ := v.(string)
-		        stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local)  
+			  
+		        //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) 
+			// time.RFC3339 解析的时间格式是 2018-10-30T01:45:00Z
+			stamp, _ := time.ParseInLocation(time.RFC3339, str, time.Local)
 		        t := int64(stamp.Unix())
 			fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))
 		    }

From d2c9fb4f0c604de4e1e0927dd046e5152905f2f0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Wed, 21 Nov 2018 18:24:57 +0800
Subject: [PATCH 05/13] Update sync.go

---
 river/sync.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/river/sync.go b/river/sync.go
index 885b39f4..4b4604c7 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -521,7 +521,7 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
                         str, _ := v.(string)
 			  
 		        //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) 
-			// time.RFC3339 解析的时间格式是 2018-10-30T01:45:00Z
+			// time.RFC3339 time style is 2018-10-30T01:45:00Z
 			stamp, _ := time.ParseInLocation(time.RFC3339, str, time.Local)
 		        t := int64(stamp.Unix())
 			fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))

From a23070da213b109c3e574b7692e0cb213d661172 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Wed, 21 Nov 2018 18:52:31 +0800
Subject: [PATCH 06/13] Update river.toml

---
 etc/river.toml | 136 +++++++++++++++++++------------------------------
 1 file changed, 51 insertions(+), 85 deletions(-)

diff --git a/etc/river.toml b/etc/river.toml
index d69da4fe..cc14b97d 100644
--- a/etc/river.toml
+++ b/etc/river.toml
@@ -1,17 +1,17 @@
 # MySQL address, user and password
 # user must have replication privilege in MySQL.
-my_addr = "127.0.0.1:3306"
+my_addr = "mysqlhost:3306"
 my_user = "root"
-my_pass = ""
+my_pass = "root"
 my_charset = "utf8"
 
 # Set true when elasticsearch use https
 #es_https = false
 # Elasticsearch address
-es_addr = "127.0.0.1:9200"
+es_addr = "eshost:9200"
 # Elasticsearch user and password, maybe set by shield, nginx, or x-pack
-es_user = ""
-es_pass = ""
+#es_user = ""
+#es_pass = ""
 
 # Path to store data, like master.info, if not set or empty,
 # we must use this to support breakpoint resume syncing. 
@@ -22,10 +22,10 @@ data_dir = "./var"
 stat_addr = "127.0.0.1:12800"
 
 # pseudo server id like a slave 
-server_id = 1001
+server_id = 123454
 
 # mysql or mariadb
-flavor = "mysql"
+flavor = "mariadb"
 
 # mysqldump execution path
 # if not set or empty, ignore mysqldump.
@@ -46,13 +46,13 @@ skip_no_pk_table = false
 
 # MySQL data source
 [[source]]
-schema = "test"
+schema = "nfvofcaps"
 
 # Only below tables will be synced into Elasticsearch.
 # "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
 # I don't think it is necessary to sync all tables in a database.
-tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
-
+#tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
+tables = ["FMALARM"]
 # Below is for special rule mapping
 
 # Very simple example
@@ -67,84 +67,50 @@ tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
 # 
 # The table `t` will be synced to ES index `test` and type `t`.
 [[rule]]
-schema = "test"
-table = "t"
-index = "test"
-type = "t"
-
-# Wildcard table rule, the wildcard table must be in source tables 
-# All tables which match the wildcard format will be synced to ES index `test` and type `t`.
-# In this example, all tables must have same schema with above table `t`;
-[[rule]]
-schema = "test"
-table = "t_[0-9]{4}"
-index = "test"
-type = "t"
-
-# Simple field rule 
-#
-# desc tfield;
-# +----------+--------------+------+-----+---------+-------+
-# | Field    | Type         | Null | Key | Default | Extra |
-# +----------+--------------+------+-----+---------+-------+
-# | id       | int(11)      | NO   | PRI | NULL    |       |
-# | tags     | varchar(256) | YES  |     | NULL    |       |
-# | keywords | varchar(256) | YES  |     | NULL    |       |
-# +----------+--------------+------+-----+---------+-------+
-#
-[[rule]]
-schema = "test"
-table = "tfield"
-index = "test"
-type = "tfield"
+schema = "nfvofcaps"
+table = "FMALARM"
+index = "nfvomysql"
+type = "mysqltable"
+
+# The es doc's id will be `id`:`tag`
+# It is useful for merge muliple table into one type while theses tables have same PK 
+id = ["ID","ALARMID"]
+
 
 [rule.field]
 # Map column `id` to ES field `es_id`
-id="es_id"
+#id="es_id"
 # Map column `tags` to ES field `es_tags` with array type 
-tags="es_tags,list"
+#tags="es_tags,list"
 # Map column `keywords` to ES with array type
-keywords=",list"
-# Map column (mysql type varchar(255) "2018-11-20 10:10:10") `time`  to ES with date type
-time="time,date"
+#keywords=",list"
+
+
+ORIGIN="origin"
+ID="id"
+ALARMID="alarmid"
+ALARMTITLE="alarmtitle"
+ALARMSTATUS="alarmstatus"
+ORIGSEVERITY="origseverity"
+ALARMTYPE="alarmtype"
+EVENTTIME="eventtime,date"
+MSGSEQ="msgseq"
+CLEARTIME="cleartime,date"
+CLEARFLAG="clearflag"
+CLEARMSGSEQ="clearmsgseq"
+SPECIFICPROBLEMID="specificproblemid"
+SPECIFICPROBLEM="specificproblem"
+NEUID="neuid"
+NENAME="nename"
+NETYPE="netype"
+OBJECTUID="objectuid"
+OBJECTNAME="objectname"
+OBJECTTYPE="objecttype"
+LOCATIONINFO="locationinfo"
+ADDINFO="addinfo"
+PVFLAG="pvflag"
+CONFIRMFLAG="confirmflag"
+CONFIRMTIME="confirmtime,date"
+REMARK="remark"
+REMARKTIME="remarktime,date"
 
-# Filter rule 
-#
-# desc tfilter;
-# +-------+--------------+------+-----+---------+-------+
-# | Field | Type         | Null | Key | Default | Extra |
-# +-------+--------------+------+-----+---------+-------+
-# | id    | int(11)      | NO   | PRI | NULL    |       |
-# | c1    | int(11)      | YES  |     | 0       |       |
-# | c2    | int(11)      | YES  |     | 0       |       |
-# | name  | varchar(256) | YES  |     | NULL    |       |
-# +-------+--------------+------+-----+---------+-------+
-#
-[[rule]]
-schema = "test"
-table = "tfilter"
-index = "test"
-type = "tfilter"
-
-# Only sync following columns
-filter = ["id", "name"]
-
-# id rule
-#
-# desc tid_[0-9]{4};
-# +----------+--------------+------+-----+---------+-------+
-# | Field    | Type         | Null | Key | Default | Extra |
-# +----------+--------------+------+-----+---------+-------+
-# | id       | int(11)      | NO   | PRI | NULL    |       |
-# | tag      | varchar(256) | YES  |     | NULL    |       |
-# | desc     | varchar(256) | YES  |     | NULL    |       |
-# +----------+--------------+------+-----+---------+-------+
-#
-[[rule]]
-schema = "test"
-table = "tid_[0-9]{4}"
-index = "test"
-type = "t"
-# The es doc's id will be `id`:`tag`
-# It is useful for merge muliple table into one type while theses tables have same PK 
-id = ["id", "tag"]

From df184c69b9354d2b1b8540af374d2a5f469f100d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Wed, 21 Nov 2018 19:12:00 +0800
Subject: [PATCH 07/13] Update sync.go

---
 river/sync.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/river/sync.go b/river/sync.go
index 4b4604c7..1829cccf 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -522,7 +522,7 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 			  
 		        //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) 
 			// time.RFC3339 time style is 2018-10-30T01:45:00Z
-			stamp, _ := time.ParseInLocation(time.RFC3339, str, time.Local)
+			stamp, _ := time.Parse(time.RFC3339, str)
 		        t := int64(stamp.Unix())
 			fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))
 		    }

From 7fd71572cc56f584549be0af9ff6b9abd6025c91 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Wed, 21 Nov 2018 19:27:56 +0800
Subject: [PATCH 08/13] Update sync.go

---
 river/sync.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/river/sync.go b/river/sync.go
index 1829cccf..b56bddd0 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -522,7 +522,7 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 			  
 		        //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) 
 			// time.RFC3339 time style is 2018-10-30T01:45:00Z
-			stamp, _ := time.Parse(time.RFC3339, str)
+			stamp, _ := time.ParseInLocation("2006-01-02T15:04:05Z", str, time.Local) 
 		        t := int64(stamp.Unix())
 			fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))
 		    }

From 94620e3debe5dad22014f2e77e6884f438c1f4f3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Wed, 21 Nov 2018 19:45:19 +0800
Subject: [PATCH 09/13] Update sync.go

---
 river/sync.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/river/sync.go b/river/sync.go
index b56bddd0..a307f882 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -520,7 +520,7 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 		        v := r.makeReqColumnData(col, value)
                         str, _ := v.(string)
 			  
-		        //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) 
+		        stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) 
 			// time.RFC3339 time style is 2018-10-30T01:45:00Z
 			stamp, _ := time.ParseInLocation("2006-01-02T15:04:05Z", str, time.Local) 
 		        t := int64(stamp.Unix())

From 1e4a7ab619ddc55bb826f3b64594ae9bf32081d3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Thu, 22 Nov 2018 00:14:09 +0800
Subject: [PATCH 10/13] Update sync.go

---
 river/sync.go | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git a/river/sync.go b/river/sync.go
index a307f882..66325abd 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -515,18 +515,15 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 				fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
 			}
 		} else {
-		    if col.Type == schema.TYPE_STRING {
-		        col.Type = schema.TYPE_DATETIME
-		        v := r.makeReqColumnData(col, value)
+                    if col.Type == schema.TYPE_STRING {
+                        v := r.makeReqColumnData(col, value)
                         str, _ := v.(string)
-			  
-		        stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) 
-			// time.RFC3339 time style is 2018-10-30T01:45:00Z
-			stamp, _ := time.ParseInLocation("2006-01-02T15:04:05Z", str, time.Local) 
-		        t := int64(stamp.Unix())
-			fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))
-		    }
-		}
+                        stamp, _ := time.ParseInLocation(time.RFC3339 , str, time.Local) 
+                        t := int64(stamp.Unix())
+                        col.Type = schema.TYPE_DATETIME
+                        fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))
+                    }
+                }
 	}
 
 	if fieldValue == nil {

From c2db5d70cfb7fe1a675ffa3fbc551936e203e64f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Thu, 22 Nov 2018 00:15:08 +0800
Subject: [PATCH 11/13] Update river.toml

---
 etc/river.toml | 110 +++++--------------------------------------------
 1 file changed, 10 insertions(+), 100 deletions(-)

diff --git a/etc/river.toml b/etc/river.toml
index cc14b97d..bdb8e7db 100644
--- a/etc/river.toml
+++ b/etc/river.toml
@@ -1,116 +1,26 @@
-# MySQL address, user and password
-# user must have replication privilege in MySQL.
 my_addr = "mysqlhost:3306"
 my_user = "root"
 my_pass = "root"
 my_charset = "utf8"
-
-# Set true when elasticsearch use https
-#es_https = false
-# Elasticsearch address
 es_addr = "eshost:9200"
-# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
-#es_user = ""
-#es_pass = ""
-
-# Path to store data, like master.info, if not set or empty,
-# we must use this to support breakpoint resume syncing. 
-# TODO: support other storage, like etcd. 
 data_dir = "./var"
-
-# Inner Http status address
 stat_addr = "127.0.0.1:12800"
-
-# pseudo server id like a slave 
-server_id = 123454
-
-# mysql or mariadb
-flavor = "mariadb"
-
-# mysqldump execution path
-# if not set or empty, ignore mysqldump.
+server_id = 1234
+flavor = "mysql"
 mysqldump = "mysqldump"
-
-# if we have no privilege to use mysqldump with --master-data,
-# we must skip it.
-#skip_master_data = false
-
-# minimal items to be inserted in one bulk
 bulk_size = 128
-
-# force flush the pending requests if we don't have enough items >= bulk_size
 flush_bulk_time = "200ms"
-
-# Ignore table without primary key
 skip_no_pk_table = false
-
-# MySQL data source
 [[source]]
-schema = "nfvofcaps"
-
-# Only below tables will be synced into Elasticsearch.
-# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
-# I don't think it is necessary to sync all tables in a database.
-#tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
-tables = ["FMALARM"]
-# Below is for special rule mapping
-
-# Very simple example
-# 
-# desc t;
-# +-------+--------------+------+-----+---------+-------+
-# | Field | Type         | Null | Key | Default | Extra |
-# +-------+--------------+------+-----+---------+-------+
-# | id    | int(11)      | NO   | PRI | NULL    |       |
-# | name  | varchar(256) | YES  |     | NULL    |       |
-# +-------+--------------+------+-----+---------+-------+
-# 
-# The table `t` will be synced to ES index `test` and type `t`.
+schema = "RUNOOB"
+tables = ["runoob_tbl"]
 [[rule]]
-schema = "nfvofcaps"
-table = "FMALARM"
-index = "nfvomysql"
-type = "mysqltable"
-
-# The es doc's id will be `id`:`tag`
-# It is useful for merge muliple table into one type while theses tables have same PK 
-id = ["ID","ALARMID"]
+schema = "RUNOOB"
+table = "runoob_tbl"
+index = "gomysql110"
+type = "go"
+id = ["runoob_id"]
 
 
 [rule.field]
-# Map column `id` to ES field `es_id`
-#id="es_id"
-# Map column `tags` to ES field `es_tags` with array type 
-#tags="es_tags,list"
-# Map column `keywords` to ES with array type
-#keywords=",list"
-
-
-ORIGIN="origin"
-ID="id"
-ALARMID="alarmid"
-ALARMTITLE="alarmtitle"
-ALARMSTATUS="alarmstatus"
-ORIGSEVERITY="origseverity"
-ALARMTYPE="alarmtype"
-EVENTTIME="eventtime,date"
-MSGSEQ="msgseq"
-CLEARTIME="cleartime,date"
-CLEARFLAG="clearflag"
-CLEARMSGSEQ="clearmsgseq"
-SPECIFICPROBLEMID="specificproblemid"
-SPECIFICPROBLEM="specificproblem"
-NEUID="neuid"
-NENAME="nename"
-NETYPE="netype"
-OBJECTUID="objectuid"
-OBJECTNAME="objectname"
-OBJECTTYPE="objecttype"
-LOCATIONINFO="locationinfo"
-ADDINFO="addinfo"
-PVFLAG="pvflag"
-CONFIRMFLAG="confirmflag"
-CONFIRMTIME="confirmtime,date"
-REMARK="remark"
-REMARKTIME="remarktime,date"
-
+runoob_title="runoob_title,date"

From d8f6d36047a8760e8abfd097b5630991fab67ba1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Thu, 22 Nov 2018 00:25:51 +0800
Subject: [PATCH 12/13] Update river.toml

---
 etc/river.toml | 109 ++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 99 insertions(+), 10 deletions(-)

diff --git a/etc/river.toml b/etc/river.toml
index bdb8e7db..0817690d 100644
--- a/etc/river.toml
+++ b/etc/river.toml
@@ -1,26 +1,115 @@
+# MySQL address, user and password
+# user must have replication privilege in MySQL.
 my_addr = "mysqlhost:3306"
 my_user = "root"
 my_pass = "root"
 my_charset = "utf8"
+
+# Set true when elasticsearch use https
+#es_https = false
+# Elasticsearch address
 es_addr = "eshost:9200"
+# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
+#es_user = ""
+#es_pass = ""
+
+# Path to store data, like master.info, if not set or empty,
+# we must use this to support breakpoint resume syncing. 
+# TODO: support other storage, like etcd. 
 data_dir = "./var"
+
+# Inner Http status address
 stat_addr = "127.0.0.1:12800"
-server_id = 1234
-flavor = "mysql"
+
+# pseudo server id like a slave 
+server_id = 1001
+
+# mysql or mariadb
+flavor = "mariadb"
+
+# mysqldump execution path
+# if not set or empty, ignore mysqldump.
 mysqldump = "mysqldump"
+
+# if we have no privilege to use mysqldump with --master-data,
+# we must skip it.
+#skip_master_data = false
+
+# minimal items to be inserted in one bulk
 bulk_size = 128
+
+# force flush the pending requests if we don't have enough items >= bulk_size
 flush_bulk_time = "200ms"
+
+# Ignore table without primary key
 skip_no_pk_table = false
+
+# MySQL data source
 [[source]]
-schema = "RUNOOB"
-tables = ["runoob_tbl"]
+schema = "nfvofcaps"
+
+# Only below tables will be synced into Elasticsearch.
+# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
+# I don't think it is necessary to sync all tables in a database.
+#tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
+tables = ["FMALARM"]
+# Below is for special rule mapping
+
+# Very simple example
+# 
+# desc t;
+# +-------+--------------+------+-----+---------+-------+
+# | Field | Type         | Null | Key | Default | Extra |
+# +-------+--------------+------+-----+---------+-------+
+# | id    | int(11)      | NO   | PRI | NULL    |       |
+# | name  | varchar(256) | YES  |     | NULL    |       |
+# +-------+--------------+------+-----+---------+-------+
+# 
+# The table `t` will be synced to ES index `test` and type `t`.
 [[rule]]
-schema = "RUNOOB"
-table = "runoob_tbl"
-index = "gomysql110"
-type = "go"
-id = ["runoob_id"]
+schema = "nfvofcaps"
+table = "FMALARM"
+index = "nfvomysql"
+type = "mysqltable"
+
+# The es doc's id will be `id`:`tag`
+# It is useful for merge muliple table into one type while theses tables have same PK 
+id = ["ID","ALARMID"]
 
 
 [rule.field]
-runoob_title="runoob_title,date"
+# Map column `id` to ES field `es_id`
+#id="es_id"
+# Map column `tags` to ES field `es_tags` with array type 
+#tags="es_tags,list"
+# Map column `keywords` to ES with array type
+#keywords=",list"
+
+
+ORIGIN="origin"
+ID="id"
+ALARMID="alarmid"
+ALARMTITLE="alarmtitle"
+ALARMSTATUS="alarmstatus"
+ORIGSEVERITY="origseverity"
+ALARMTYPE="alarmtype"
+EVENTTIME="eventtime,date"
+MSGSEQ="msgseq"
+CLEARTIME="cleartime,date"
+CLEARFLAG="clearflag"
+CLEARMSGSEQ="clearmsgseq"
+SPECIFICPROBLEMID="specificproblemid"
+SPECIFICPROBLEM="specificproblem"
+NEUID="neuid"
+NENAME="nename"
+NETYPE="netype"
+OBJECTUID="objectuid"
+OBJECTNAME="objectname"
+OBJECTTYPE="objecttype"
+LOCATIONINFO="locationinfo"
+ADDINFO="addinfo"
+PVFLAG="pvflag"
+CONFIRMFLAG="confirmflag"
+CONFIRMTIME="confirmtime,date"
+REMARK="remark"
+REMARKTIME="remarktime,date"

From 3086cd2c971c903238a9a1753cbcb260031ac543 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= <tiankonghewo@gmail.com>
Date: Wed, 9 Jan 2019 17:12:35 +0800
Subject: [PATCH 13/13] Update sync.go

---
 river/sync.go | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/river/sync.go b/river/sync.go
index 66325abd..d0c57bfc 100644
--- a/river/sync.go
+++ b/river/sync.go
@@ -514,15 +514,14 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
 			case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
 				fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
 			}
-		} else {
-                    if col.Type == schema.TYPE_STRING {
+		} else if col.Type == schema.TYPE_STRING {
                         v := r.makeReqColumnData(col, value)
                         str, _ := v.(string)
                         stamp, _ := time.ParseInLocation(time.RFC3339 , str, time.Local) 
                         t := int64(stamp.Unix())
                         col.Type = schema.TYPE_DATETIME
                         fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))
-                    }
+                 
                 }
 	}