Skip to content

Commit 2e43546

Browse files
First commit
1 parent 4fbb86a commit 2e43546

10 files changed

+207
-0
lines changed

.env

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
RDS_URL=mysql://admin:[email protected]:3306/magento

Gemfile

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
gem 'poseidon'

README.md

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Mysql-kafka-replication
2+
3+
Demonstration of using Mysql replication protocol to stream Mysql changes into Kafka queues, then
4+
read from those queues and process Mysql changes in a Node.js script.
5+
6+
## Installation
7+
8+
pip install -r requirements.txt
9+
npm install
10+
11+
Oh, and go setup [Kafka](http://kafka.apache.org/) and get it running on `localhost:9092`.
12+
13+
## Running
14+
15+
Set `RDS_URL` in your environment, or add to a local `.env` file and run with `foreman`.
16+
17+
First, run the Mysql replication listener:
18+
19+
python mysql-replicate-to-kafka.py
20+
21+
and then run the Node.js app:
22+
23+
node consumer.js
24+
25+
Now go make some changes to your database and see them appear in the console of the node app.
26+
27+

config.js

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module.exports = {
2+
databases: {
3+
db1: 'postgres://localhost/herokuconnect'
4+
}
5+
}

consumer.js

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
var kafka = require('kafka-node'),
2+
Consumer = kafka.Consumer,
3+
knex = require('knex'),
4+
glob = require("glob"),
5+
path = require("path"),
6+
client = new kafka.Client();
7+
8+
var config = require('./config.js');
9+
for (var key in config.databases) {
10+
config[key] = knex({client:'pg', connection:config.databases[key], debug:true});
11+
}
12+
13+
var handlers = {};
14+
var topics = [];
15+
16+
glob("./scripts/*.js", function(er, files) {
17+
console.log(files);
18+
files.forEach(function(f) {
19+
var mod = require(f);
20+
if (typeof(mod) == "object") {
21+
for (var topic in mod) {
22+
topics.push({topic: topic, partition:0});
23+
handlers[topic] = mod[topic];
24+
}
25+
} else {
26+
var topic = path.basename(f, '.js');
27+
topics.push({topic: topic, partition:0});
28+
handlers[topic] = mod;
29+
}
30+
});
31+
32+
console.log("Topics ", topics);
33+
34+
var consumer = new Consumer(client, topics);
35+
36+
consumer.on('message', function(message) {
37+
try {
38+
handlers[message.topic](config, JSON.parse(message.value));
39+
} catch (e) {
40+
console.log(e);
41+
}
42+
});
43+
});
44+
45+

consumer.rb

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
require 'poseidon'
2+
3+
consumer = Poseidon::PartitionConsumer.new("my_test_consumer", "localhost", 9092,
4+
"herokutest.tutors", 0, :earliest_offset)
5+
6+
loop do
7+
messages = consumer.fetch
8+
messages.each do |m|
9+
puts m.value
10+
end
11+
end

mysql-replicate-to-kafka.py

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import os
2+
import json
3+
import time
4+
import urlparse
5+
from datetime import datetime, date
6+
from decimal import Decimal
7+
8+
from pymysqlreplication import BinLogStreamReader
9+
from pymysqlreplication.row_event import BINLOG
10+
from kafka import SimpleProducer, KafkaClient
11+
from kafka.common import LeaderNotAvailableError
12+
13+
14+
def json_serial(obj):
15+
"""JSON serializer for objects not serializable by default json code"""
16+
17+
if isinstance(obj, (datetime, date)):
18+
serial = obj.isoformat()
19+
return serial
20+
if isinstance(obj, Decimal):
21+
return float(obj)
22+
else:
23+
print "Type '{}' for '{}' not serializable".format(obj.__class__, obj)
24+
return None
25+
26+
def build_message(binlog_evt):
27+
schema = {'table': getattr(binlog_evt, 'schema', '') + "." + getattr(binlog_evt, 'table', '')};
28+
29+
if binlog_evt.event_type == BINLOG.WRITE_ROWS_EVENT_V2:
30+
# Insert
31+
return {'event':'INSERT', 'headers':schema, 'data':binlog_evt.rows[0]['values']}
32+
33+
elif binlog_evt.event_type == BINLOG.UPDATE_ROWS_EVENT_V2:
34+
# Update
35+
return {'event':'UPDATE', 'headers':schema, 'data':binlog_evt.rows[0]['after_values']}
36+
elif binlog_evt.event_type == BINLOG.DELETE_ROWS_EVENT_V2:
37+
# Delete
38+
return {'event':'DELETE', 'headers':schema, 'data':binlog_evt.rows[0]['values']}
39+
40+
else:
41+
return None
42+
43+
44+
kafka = KafkaClient("localhost:9092")
45+
46+
producer = SimpleProducer(kafka)
47+
producer.send_messages("test", "test msg")
48+
49+
# To wait for acknowledgements
50+
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
51+
# a local log before sending response
52+
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
53+
# by all in sync replicas before sending a response
54+
producer = SimpleProducer(kafka, async=False,
55+
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
56+
ack_timeout=2000)
57+
58+
59+
conf = urlparse.urlparse(os.environ['RDS_URL'])
60+
mysql_settings = {'host': conf.hostname,
61+
'port': conf.port,
62+
'user': conf.username,
63+
'passwd': conf.password}
64+
65+
# Connect to Mysql replication stream
66+
print "Connecting to Mysql at {}...".format(mysql_settings['host'])
67+
stream = BinLogStreamReader(connection_settings = mysql_settings, server_id=100, resume_stream=False,
68+
blocking=True)
69+
print "connected. Listening for changes..."
70+
71+
for evt in stream:
72+
evt.dump()
73+
msg = build_message(evt)
74+
if msg:
75+
try:
76+
response = producer.send_messages(msg['headers']['table'], json.dumps(msg, default=json_serial))
77+
except LeaderNotAvailableError:
78+
time.sleep(1)
79+
response = producer.send_messages(msg['headers']['table'], json.dumps(msg, default=json_serial))
80+
# TODO: Test response.error
81+
# TODO: Store replication stream pos
82+
83+
stream.close()

package.json

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"name": "kafka-data-listener",
3+
"version": "1.0.0",
4+
"description": "Simple Mysql replication stream consumer which publishes change events to Kafka.",
5+
"main": "consumer.js",
6+
"scripts": {
7+
"test": "echo \"Error: no test specified\" && exit 1"
8+
},
9+
"author": "",
10+
"license": "ISC",
11+
"dependencies": {
12+
"express": "^4.12.3",
13+
"glob": "^5.0.5",
14+
"kafka-node": "^0.2.26",
15+
"knex": "^0.8.3",
16+
"pg": "^4.3.0",
17+
"pg-hstore": "^2.3.2",
18+
"socket.io": "^1.3.5"
19+
}
20+
}

requirements.txt

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
kafka-python==0.9.3
2+
mysql-replication==0.5

scripts/magento.core_session.js

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
module.exports = {
2+
'magento.core_session': function(context, msg) {
3+
console.log("Session hit: id: " + msg.data.session_id + ", expires: " + new Date(msg.data.session_expires * 1000));
4+
},
5+
'magento.sales_flat_quote_item': function(context, msg) {
6+
console.log("Added to cart: " + msg.data.name);
7+
},
8+
'magento.log_url': function(context, msg) {
9+
console.log("Page hit on url " + msg.data.url_id + " by visitor " + msg.data.visitor_id + " at " +
10+
msg.data.visit_time);
11+
}
12+
}

0 commit comments

Comments
 (0)