Skip to content

Commit ad810c1

Browse files
committed
Merge branch 'feature-datasource-asynciterator' into develop
* feature-datasource-asynciterator: Pass datasource metadata as a property. Switch datasource to AsyncIterator.
2 parents d7530dc + 315cf87 commit ad810c1

21 files changed

+162
-176
lines changed

bin/generate-summary

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ function fromDataSource(uri, datasource, callback, end, chunksize) {
8585
var count = 0;
8686

8787
var stream = datasource.select({ limit: limit, offset: offset }, console.error)
88-
.on('metadata', function (metadata) {
88+
.getProperty('metadata', function (metadata) {
8989
var progress = Math.round((offset / metadata.totalCount) * 100);
9090
console.log(progress);
9191

lib/controllers/SummaryController.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ SummaryController.prototype._handleRequest = function (request, response, next)
4242

4343
// Render the summary
4444
var view = this._negotiateView('Summary', request, response);
45-
view.render({ prefixes: this._prefixes, resultStream: streamParser }, request, response);
45+
view.render({ prefixes: this._prefixes, results: streamParser }, request, response);
4646
}
4747
else
4848
next();

lib/controllers/TriplePatternFragmentsController.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ TriplePatternFragmentsController.prototype._handleRequest = function (request, r
3636
// Generate the query result
3737
var view = this._negotiateView('TriplePatternFragments', request, response),
3838
settings = this._createFragmentMetadata(request, query, datasourceSettings);
39-
settings.resultStream = datasourceSettings.datasource.select(query,
40-
function (error) { error && next(error); });
39+
settings.results = datasourceSettings.datasource.select(query,
40+
function (error) { error && next(error); });
4141

4242
// Execute the extensions and render the query result
4343
var extensions = this._extensions, extensionId = 0;

lib/datasources/CompositeDatasource.js

+62-66
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,16 @@ CompositeDatasource.prototype._getExactCount = function (datasource, query, call
5959
// Otherwise, count all the triples manually
6060
var emptyQuery = { offset: 0, subject: query.subject, predicate: query.predicate, object: query.object };
6161
var exactCount = 0;
62-
var countingTripleStream = {
63-
push: function (triple) {
64-
if (triple)
65-
exactCount++;
66-
else {
67-
// Cache large values; small ones are calculated fast anyway
68-
if (exactCount > 1000)
69-
cache.set(cacheKey, exactCount);
70-
callback(exactCount);
71-
}
62+
var triplesCounter = {
63+
_push: function (triple) { exactCount++; },
64+
close: function () {
65+
// Cache large values; small ones are calculated fast anyway
66+
if (exactCount > 1000)
67+
cache.set(cacheKey, exactCount);
68+
callback(exactCount);
7269
},
7370
};
74-
datasource._executeQuery(emptyQuery, countingTripleStream, noop);
71+
datasource._executeQuery(emptyQuery, triplesCounter, noop);
7572
};
7673

7774
// Recursively find all required datasource composition info to perform a query.
@@ -92,61 +89,65 @@ CompositeDatasource.prototype._getDatasourceInfo = function (query, absoluteOffs
9289
// We checked all datasources, return our accumulated information
9390
callback(chosenDatasource, chosenOffset, totalCount, hasExactCount);
9491
else {
95-
var emptyTripleStream = { push: noop };
9692
var datasource = self._getDatasourceById(datasourceIndex);
97-
datasource._executeQuery(emptyQuery, emptyTripleStream, function (metadata) {
98-
var count = metadata.totalCount;
99-
var exact = metadata.hasExactCount;
100-
// If we are still looking for an appropriate datasource, we need exact counts!
101-
if (offset > 0 && !exact) {
102-
self._getExactCount(datasource, query, function (exactCount) {
103-
count = exactCount;
104-
exact = true;
105-
continueRecursion();
106-
});
107-
}
108-
else
109-
continueRecursion();
110-
111-
function continueRecursion() {
112-
if (chosenDatasource < 0 && offset < count) {
113-
// We can start querying from this datasource
114-
setImmediate(function () {
115-
findRecursive(datasourceIndex + 1, offset - count, datasourceIndex, offset,
116-
totalCount + count, hasExactCount && exact);
93+
var metadataReader = {
94+
_push: noop,
95+
close: noop,
96+
setProperty: function (name, metadata) {
97+
if (name !== 'metadata') return;
98+
// If we are still looking for an appropriate datasource, we need exact counts
99+
var count = metadata.totalCount, exact = metadata.hasExactCount;
100+
if (offset > 0 && !exact) {
101+
self._getExactCount(datasource, query, function (exactCount) {
102+
count = exactCount;
103+
exact = true;
104+
continueRecursion();
117105
});
118106
}
119-
else {
120-
// We forward our accumulated information and go check the next datasource
121-
setImmediate(function () {
122-
findRecursive(datasourceIndex + 1, offset - count, chosenDatasource, chosenOffset,
123-
totalCount + count, hasExactCount && exact);
124-
});
107+
else
108+
continueRecursion();
109+
110+
function continueRecursion() {
111+
if (chosenDatasource < 0 && offset < count) {
112+
// We can start querying from this datasource
113+
setImmediate(function () {
114+
findRecursive(datasourceIndex + 1, offset - count, datasourceIndex, offset,
115+
totalCount + count, hasExactCount && exact);
116+
});
117+
}
118+
else {
119+
// We forward our accumulated information and go check the next datasource
120+
setImmediate(function () {
121+
findRecursive(datasourceIndex + 1, offset - count, chosenDatasource, chosenOffset,
122+
totalCount + count, hasExactCount && exact);
123+
});
124+
}
125125
}
126-
}
127-
});
126+
},
127+
};
128+
datasource._executeQuery(emptyQuery, metadataReader);
128129
}
129130
}
130131
};
131132

132133
// Writes the results of the query to the given triple stream
133-
CompositeDatasource.prototype._executeQuery = function (query, tripleStream, metadataCallback) {
134+
CompositeDatasource.prototype._executeQuery = function (query, destination) {
134135
var offset = query.offset || 0, limit = query.limit || Infinity;
135136
var self = this;
136137
this._getDatasourceInfo(query, offset, function (datasourceIndex, relativeOffset, totalCount, hasExactCount) {
137138
if (datasourceIndex < 0) {
138139
// No valid datasource has been found
139-
metadataCallback({ totalCount: totalCount, hasExactCount: hasExactCount });
140-
tripleStream.push(null);
140+
destination.setProperty('metadata', { totalCount: totalCount, hasExactCount: hasExactCount });
141+
destination.close();
141142
}
142143
else {
143144
// Send query to first applicable datasource and optionally emit triples from consecutive datasources
144-
metadataCallback({ totalCount: totalCount, hasExactCount: hasExactCount });
145-
var emitted = 0;
145+
destination.setProperty('metadata', { totalCount: totalCount, hasExactCount: hasExactCount });
146146

147147
// Modify our triple stream so that if all results from one datasource have arrived,
148148
// check if we haven't reached the limit and if so, trigger a new query for the next datasource.
149-
tripleStream.push = makeSmartPush(tripleStream, function (localEmittedCount) {
149+
var emitted = 0;
150+
countItems(destination, function (localEmittedCount) {
150151
// This is called after the last element has been pushed
151152

152153
// If we haven't reached our limit, try to fill it with other datasource query results.
@@ -156,36 +157,31 @@ CompositeDatasource.prototype._executeQuery = function (query, tripleStream, met
156157
var localLimit = limit - emitted;
157158
var subQuery = { offset: 0, limit: localLimit,
158159
subject: query.subject, predicate: query.predicate, object: query.object };
159-
self._getDatasourceById(datasourceIndex)._executeQuery(subQuery, tripleStream, noop);
160-
return true;
160+
self._getDatasourceById(datasourceIndex)._executeQuery(subQuery, destination, noop);
161+
return false;
161162
}
162163
else
163-
return false;
164+
return true;
164165
});
165166

166167
// Initiate query to the first datasource.
167168
var subQuery = { offset: relativeOffset, limit: limit,
168169
subject: query.subject, predicate: query.predicate, object: query.object };
169-
self._getDatasourceById(datasourceIndex)._executeQuery(subQuery, tripleStream, noop);
170+
self._getDatasourceById(datasourceIndex)._executeQuery(subQuery, destination, noop);
170171
}
171172
});
172173

173-
// Replaces a tripleStream.push
174-
// It takes the tripleStream as first argument and a callback as second argument.
175-
// The callback will be called when the push function is called with a falsy value.
176-
// Returning a falsy value inside the callback will delegate the falsy value to the original
177-
// push function anyways.
178-
function makeSmartPush(self, nullCb) {
179-
var count = 0, originalPush = self.push;
180-
return function (element) {
181-
if (!element) {
182-
if (!nullCb(count))
183-
originalPush.call(self, element);
184-
}
185-
else {
186-
count++;
187-
originalPush.call(self, element);
188-
}
174+
// Counts the number of triples and sends them through the callback,
175+
// only closing the iterator when the callback returns true.
176+
function countItems(destination, closeCallback) {
177+
var count = 0, originalPush = destination._push, originalClose = destination.close;
178+
destination._push = function (element) {
179+
count++;
180+
originalPush.call(destination, element);
181+
};
182+
destination.close = function () {
183+
if (closeCallback(count))
184+
originalClose.call(destination);
189185
};
190186
}
191187
};

lib/datasources/Datasource.js

+15-26
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
var fs = require('fs'),
55
_ = require('lodash'),
6-
Readable = require('stream').Readable,
6+
BufferedIterator = require('asynciterator').BufferedIterator,
77
EventEmitter = require('events');
88

99
// Creates a new Datasource
@@ -105,32 +105,24 @@ Datasource.prototype.select = function (query, onError) {
105105
if (query.object && query.object.indexOf(blankNodePrefix) === 0)
106106
(query = _.clone(query)).object = '_:' + query.object.substr(blankNodePrefixLength);
107107

108-
// Create the triple stream
109-
var tripleStream = new Readable({ objectMode: true });
110-
tripleStream._read = noop;
111-
tripleStream._push = tripleStream.push;
112-
tripleStream.push = function (triple) {
113-
// Translate blank nodes to IRIs
114-
if (triple) {
115-
if (triple.subject[0] === '_') triple.subject = blankNodePrefix + triple.subject.substr(2);
116-
if (triple.object[0] === '_') triple.object = blankNodePrefix + triple.object.substr(2);
117-
}
118-
this._push(triple);
119-
};
120-
onError && tripleStream.on('error', onError);
108+
// Translate blank nodes in the result to blank node IRIs
109+
var destination = new BufferedIterator(), outputTriples;
110+
outputTriples = destination.map(function (triple) {
111+
if (triple.subject[0] === '_') triple.subject = blankNodePrefix + triple.subject.substr(2);
112+
if (triple.object[0] === '_') triple.object = blankNodePrefix + triple.object.substr(2);
113+
return triple;
114+
});
115+
outputTriples.copyProperties(destination, ['metadata']);
116+
onError && outputTriples.on('error', onError);
121117

122118
// Execute the query
123-
try {
124-
this._executeQuery(query, tripleStream, function (metadata) {
125-
setImmediate(function () { tripleStream.emit('metadata', metadata); });
126-
});
127-
}
128-
catch (error) { tripleStream.emit('error', error); }
129-
return tripleStream;
119+
try { this._executeQuery(query, destination); }
120+
catch (error) { outputTriples.emit('error', error); }
121+
return outputTriples;
130122
};
131123

132-
// Writes the results of the query to the given triple stream
133-
Datasource.prototype._executeQuery = function (query, tripleStream, metadataCallback) {
124+
// Writes the results of the query to the given destination
125+
Datasource.prototype._executeQuery = function (query, destination) {
134126
throw new Error('_executeQuery has not been implemented');
135127
};
136128

@@ -165,7 +157,4 @@ Datasource.prototype.close = function (callback) {
165157
callback && callback();
166158
};
167159

168-
// The empty function
169-
function noop() {}
170-
171160
module.exports = Datasource;

lib/datasources/ExternalHdtDatasource.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ ExternalHdtDatasource.prototype._initialize = function (done) {
3333
};
3434

3535
// Writes the results of the query to the given triple stream
36-
ExternalHdtDatasource.prototype._executeQuery = function (query, tripleStream, metadataCallback) {
36+
ExternalHdtDatasource.prototype._executeQuery = function (query, destination) {
3737
// Execute the external HDT utility
3838
var hdtFile = this._hdtFile, offset = query.offset || 0, limit = query.limit || Infinity,
3939
hdt = spawn(hdtUtility, ['--query', (query.subject || '?s') + ' ' +
@@ -45,15 +45,15 @@ ExternalHdtDatasource.prototype._executeQuery = function (query, tripleStream, m
4545
var parser = new N3Parser(), tripleCount = 0, estimatedTotalCount = 0, hasExactCount = true;
4646
parser.parse(hdt.stdout, function (error, triple) {
4747
if (error)
48-
tripleStream.emit('error', new Error('Invalid query result: ' + error.message));
48+
destination.emit('error', new Error('Invalid query result: ' + error.message));
4949
else if (triple)
50-
tripleCount++, tripleStream.push(triple);
50+
tripleCount++, destination._push(triple);
5151
else {
5252
// Ensure the estimated total count is as least as large as the number of triples
5353
if (tripleCount && estimatedTotalCount < offset + tripleCount)
5454
estimatedTotalCount = offset + (tripleCount < query.limit ? tripleCount : 2 * tripleCount);
55-
metadataCallback({ totalCount: estimatedTotalCount, hasExactCount: hasExactCount });
56-
tripleStream.push(null);
55+
destination.setProperty('metadata', { totalCount: estimatedTotalCount, hasExactCount: hasExactCount });
56+
destination.close();
5757
}
5858
});
5959
parser._prefixes._ = '_:'; // Ensure blank nodes are named consistently
@@ -66,7 +66,7 @@ ExternalHdtDatasource.prototype._executeQuery = function (query, tripleStream, m
6666

6767
// Report query errors
6868
hdt.on('exit', function (exitCode) {
69-
exitCode && tripleStream.emit('error', new Error('Could not query ' + hdtFile));
69+
exitCode && destination.emit('error', new Error('Could not query ' + hdtFile));
7070
});
7171
};
7272

lib/datasources/HdtDatasource.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,20 @@ HdtDatasource.prototype._initialize = function (done) {
2828
};
2929

3030
// Writes the results of the query to the given triple stream
31-
HdtDatasource.prototype._executeQuery = function (query, tripleStream, metadataCallback) {
31+
HdtDatasource.prototype._executeQuery = function (query, destination) {
3232
this._hdtDocument.search(query.subject, query.predicate, query.object,
3333
{ limit: query.limit, offset: query.offset },
3434
function (error, triples, estimatedTotalCount, hasExactCount) {
35-
if (error) return tripleStream.emit('error', error);
35+
if (error) return destination.emit('error', error);
3636
// Ensure the estimated total count is as least as large as the number of triples
3737
var tripleCount = triples.length, offset = query.offset || 0;
3838
if (tripleCount && estimatedTotalCount < offset + tripleCount)
3939
estimatedTotalCount = offset + (tripleCount < query.limit ? tripleCount : 2 * tripleCount);
40-
metadataCallback({ totalCount: estimatedTotalCount, hasExactCount: hasExactCount });
41-
// Add the triples to the stream
40+
destination.setProperty('metadata', { totalCount: estimatedTotalCount, hasExactCount: hasExactCount });
41+
// Add the triples to the output
4242
for (var i = 0; i < tripleCount; i++)
43-
tripleStream.push(triples[i]);
44-
tripleStream.push(null);
43+
destination._push(triples[i]);
44+
destination.close();
4545
});
4646
};
4747

lib/datasources/MemoryDatasource.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ MemoryDatasource.prototype._getAllTriples = function (addTriple, done) {
2424
};
2525

2626
// Writes the results of the query to the given triple stream
27-
MemoryDatasource.prototype._executeQuery = function (query, tripleStream, metadataCallback) {
28-
var offset = query.offset || 0, limit = query.limit || Infinity,
27+
MemoryDatasource.prototype._executeQuery = function (query, destination) {
28+
var offset = query.offset || 0, limit = query.limit || Infinity,
2929
triples = this._tripleStore.findByIRI(query.subject, query.predicate, query.object);
3030
// Send the metadata
31-
metadataCallback({ totalCount: triples.length, hasExactCount: true });
31+
destination.setProperty('metadata', { totalCount: triples.length, hasExactCount: true });
3232
// Send the requested subset of triples
3333
for (var i = offset, l = Math.min(offset + limit, triples.length); i < l; i++)
34-
tripleStream.push(triples[i]);
35-
tripleStream.push(null);
34+
destination._push(triples[i]);
35+
destination.close();
3636
};
3737

3838
module.exports = MemoryDatasource;

0 commit comments

Comments
 (0)