Skip to content

Commit 548a699

Browse files
wj32numtel
authored andcommitted
Better caching that fixes two major bugs
Bugs: LiveMysql._resultsBuffer[query] never gets invalidated when no LiveMysqlSelect objects for that query are active; Even slightly large values of minInterval drastically slow down binlog event processing when there are many LiveMysqlSelect objects active
1 parent e3f9d0a commit 548a699

File tree

4 files changed

+250
-159
lines changed

4 files changed

+250
-159
lines changed

lib/LiveMysql.js

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ var mysql = require('mysql');
77
var ZONGJI_INIT_TIMEOUT = 1500;
88

99
var LiveMysqlSelect = require('./LiveMysqlSelect');
10+
var QueryCache = require('./QueryCache');
1011

1112
function LiveMysql(settings, callback){
1213
var self = this;
@@ -16,8 +17,7 @@ function LiveMysql(settings, callback){
1617
self.zongji = null;
1718
self.db = db;
1819
self._select = [];
19-
// Cache query results for any new, duplicate SELECT statements
20-
self._resultsBuffer = {};
20+
self._queryCache = {};
2121
self._schemaCache = {};
2222

2323
self.zongjiSettings = {
@@ -34,36 +34,16 @@ function LiveMysql(settings, callback){
3434

3535
zongji.on('binlog', function(event) {
3636
if(event.getEventName() === 'tablemap') return;
37-
if(self._select.length === 0) return;
38-
39-
// Cache query results within this update event
40-
var eventResults = {};
41-
42-
// Update each select statement if matches event
43-
function _nextSelect(index){
44-
var select;
45-
if(index < self._select.length){
46-
select = self._select[index];
47-
if(select.matchRowEvent(event)){
48-
if(select.query in eventResults){
49-
select._setRows(eventResults[select.query]);
50-
_nextSelect(index + 1);
51-
}else{
52-
select.update(function(error, rows){
53-
if(error === undefined){
54-
eventResults[select.query] = rows;
55-
}
56-
_nextSelect(index + 1);
57-
});
58-
}
59-
}else{
60-
_nextSelect(index + 1);
61-
}
37+
38+
for(var query in self._queryCache){
39+
if (!self._queryCache.hasOwnProperty(query)){
40+
continue;
41+
}
42+
var queryCache = self._queryCache[query];
43+
if(!queryCache.canSkipRowEvent() && queryCache.matchRowEvent(event)){
44+
queryCache.invalidate();
6245
}
6346
}
64-
65-
_nextSelect(0);
66-
6747
});
6848

6949
// Wait for Zongji to be ready before executing callback
@@ -107,12 +87,56 @@ LiveMysql.prototype.select = function(query, triggers){
10787
includeSchema[triggerDatabase].push(triggers[i].table);
10888
}
10989
}
90+
91+
query = self._escapeQueryFun(query);
92+
93+
var queryCache;
94+
if(self._queryCache.hasOwnProperty(query)){
95+
queryCache = self._queryCache[query];
96+
}else{
97+
queryCache = new QueryCache(query, this);
98+
self._queryCache[query] = queryCache;
99+
}
110100

111-
var newSelect = new LiveMysqlSelect(query, triggers, this);
101+
var newSelect = new LiveMysqlSelect(queryCache, triggers, this);
112102
self._select.push(newSelect);
113103
return newSelect;
114104
};
115105

106+
LiveMysql.prototype._escapeQueryFun = function(query){
107+
var self = this;
108+
if(typeof query === 'function'){
109+
var escId = self.db.escapeId;
110+
var esc = self.db.escape.bind(self.db);
111+
return query(esc, escId);
112+
}
113+
return query;
114+
};
115+
116+
LiveMysql.prototype._removeSelect = function(select){
117+
var self = this;
118+
var index = self._select.indexOf(select);
119+
if(index !== -1){
120+
// Remove the select object from our list
121+
self._select.splice(index, 1);
122+
123+
var queryCache = select.queryCache;
124+
var queryCacheIndex = queryCache.selects.indexOf(select);
125+
if(queryCacheIndex !== -1){
126+
// Remove the select object from the query cache's list and remove the
127+
// query cache if no select objects are using it.
128+
queryCache.selects.splice(queryCacheIndex, 1);
129+
if(queryCache.selects.length === 0){
130+
delete self._queryCache[queryCache.query];
131+
}
132+
}
133+
134+
return true;
135+
}else{
136+
return false;
137+
}
138+
}
139+
116140
LiveMysql.prototype.pause = function(){
117141
var self = this;
118142
self.zongjiSettings.includeSchema = {};

lib/LiveMysqlSelect.js

Lines changed: 34 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
var EventEmitter = require('events').EventEmitter;
44
var util = require('util');
55

6-
function LiveMysqlSelect(query, triggers, base){
7-
if(!query)
8-
throw new Error('query required');
6+
function LiveMysqlSelect(queryCache, triggers, base){
7+
if(!queryCache)
8+
throw new Error('queryCache required');
99
if(!(triggers instanceof Array))
1010
throw new Error('triggers array required');
1111
if(typeof base !== 'object')
@@ -15,32 +15,44 @@ function LiveMysqlSelect(query, triggers, base){
1515
EventEmitter.call(self);
1616
self.triggers = triggers;
1717
self.base = base;
18-
self.lastUpdate = 0;
19-
self.query = self._escapeQueryFun(query);
2018
self.data = [];
21-
self.initialized = false;
19+
self.queryCache = queryCache;
20+
queryCache.selects.push(self);
21+
22+
if(queryCache.initialized){
23+
var refLastUpdate = queryCache.lastUpdate;
24+
25+
// Trigger events for existing data
26+
setTimeout(function() {
27+
if(queryCache.lastUpdate !== refLastUpdate){
28+
// Query cache has been updated since this select object was created;
29+
// our data would've been updated already.
30+
return;
31+
}
32+
33+
self.emit('update', queryCache.data);
34+
35+
if(queryCache.data.length !== 0 && !self.base.settings.skipDiff){
36+
var diff = queryCache.data.map(function(row, index) {
37+
return [ 'added', row, index ];
38+
});
2239

23-
if(self.query in base._resultsBuffer){
24-
setTimeout(function(){
25-
self._setRows(base._resultsBuffer[self.query]);
26-
}, 1);
40+
diff.forEach(function(evt){
41+
self.emit.apply(self, evt);
42+
// New row added to end
43+
self.data[evt[2]] = evt[1];
44+
});
45+
// Output all difference events in a single event
46+
self.emit('diff', diff);
47+
}
48+
}, 50);
2749
}else{
28-
self.update();
50+
queryCache.invalidate();
2951
}
3052
}
3153

3254
util.inherits(LiveMysqlSelect, EventEmitter);
3355

34-
LiveMysqlSelect.prototype._escapeQueryFun = function(query){
35-
var self = this;
36-
if(typeof query === 'function'){
37-
var escId = self.base.db.escapeId;
38-
var esc = self.base.db.escape.bind(self.base.db);
39-
return query(esc, escId);
40-
}
41-
return query;
42-
};
43-
4456
LiveMysqlSelect.prototype.matchRowEvent = function(event){
4557
var self = this;
4658
var tableMap = event.tableMap[event.tableId];
@@ -72,114 +84,9 @@ LiveMysqlSelect.prototype.matchRowEvent = function(event){
7284
return false;
7385
};
7486

75-
LiveMysqlSelect.prototype._setRows = function(rows){
76-
var self = this;
77-
var diff = [];
78-
79-
// Determine what changes before updating cache in order to
80-
// be able to skip all event emissions if no change
81-
// TODO update this algorithm to use less data
82-
rows.forEach(function(row, index){
83-
if(self.data.length - 1 < index){
84-
diff.push([ 'added', row, index ]);
85-
}else if(JSON.stringify(self.data[index]) !== JSON.stringify(row)){
86-
diff.push([ 'changed', self.data[index], row, index ]);
87-
}
88-
});
89-
90-
if(self.data.length > rows.length){
91-
for(var i = self.data.length - 1; i >= rows.length; i--){
92-
diff.push([ 'removed', self.data[i], i ]);
93-
}
94-
}
95-
96-
if(diff.length !== 0){
97-
self.emit('update', rows);
98-
99-
diff.forEach(function(evt){
100-
if(!self.base.settings.skipDiff){
101-
self.emit.apply(self, evt);
102-
}
103-
switch(evt[0]){
104-
case 'added':
105-
// New row added to end
106-
self.data[evt[2]] = evt[1];
107-
break;
108-
case 'changed':
109-
// Update row data reference
110-
self.data[evt[3]] = evt[2];
111-
break;
112-
case 'removed':
113-
// Remove extra rows off the end
114-
self.data.splice(evt[2], 1);
115-
break;
116-
}
117-
});
118-
if(!self.base.settings.skipDiff){
119-
// Output all difference events in a single event
120-
self.emit('diff', diff);
121-
}
122-
}else if(self.initialized === false){
123-
// If the result set initializes to 0 rows, it still needs to output an
124-
// update event.
125-
self.emit('update', rows);
126-
}
127-
128-
self.initialized = true;
129-
130-
self.lastUpdate = Date.now();
131-
};
132-
133-
LiveMysqlSelect.prototype.update = function(callback){
134-
var self = this;
135-
136-
function _update(){
137-
self.base.db.query(self.query, function(error, rows){
138-
if(error){
139-
self.emit('error', error);
140-
callback && callback.call(self, error);
141-
}else{
142-
self.base._resultsBuffer[self.query] = rows;
143-
self._setRows(rows);
144-
callback && callback.call(self, undefined, rows);
145-
}
146-
});
147-
}
148-
149-
if(self.base.settings.minInterval === undefined){
150-
_update();
151-
}else if(self.lastUpdate + self.base.settings.minInterval < Date.now()){
152-
_update();
153-
}else{ // Before minInterval
154-
if(!self._updateTimeout){
155-
self._updateTimeout = setTimeout(function(){
156-
delete self._updateTimeout;
157-
_update();
158-
}, self.lastUpdate + self.base.settings.minInterval - Date.now());
159-
}
160-
}
161-
};
162-
16387
LiveMysqlSelect.prototype.stop = function(){
16488
var self = this;
165-
166-
var index = self.base._select.indexOf(self);
167-
if(index !== -1){
168-
self.base._select.splice(index, 1);
169-
170-
// If no other instance of the same query string, remove the resultsBuffer
171-
var sameCount = self.base._select.filter(function(select) {
172-
return select.query === self.query;
173-
}).length;
174-
175-
if(sameCount === 0) {
176-
delete self.base._resultsBuffer[self.query];
177-
}
178-
179-
return true;
180-
}else{
181-
return false;
182-
}
89+
return self.base._removeSelect(self);
18390
};
18491

18592
LiveMysqlSelect.prototype.active = function(){

0 commit comments

Comments
 (0)