@@ -89,60 +89,64 @@ CompositeDatasource.prototype._getDatasourceInfo = function (query, absoluteOffs
89
89
// We checked all datasources, return our accumulated information
90
90
callback ( chosenDatasource , chosenOffset , totalCount , hasExactCount ) ;
91
91
else {
92
- var ignoreTriples = { _push : noop , close : noop } ;
93
92
var datasource = self . _getDatasourceById ( datasourceIndex ) ;
94
- datasource . _executeQuery ( emptyQuery , ignoreTriples , function ( metadata ) {
95
- var count = metadata . totalCount ;
96
- var exact = metadata . hasExactCount ;
97
- // If we are still looking for an appropriate datasource, we need exact counts!
98
- if ( offset > 0 && ! exact ) {
99
- self . _getExactCount ( datasource , query , function ( exactCount ) {
100
- count = exactCount ;
101
- exact = true ;
102
- continueRecursion ( ) ;
103
- } ) ;
104
- }
105
- else
106
- continueRecursion ( ) ;
107
-
108
- function continueRecursion ( ) {
109
- if ( chosenDatasource < 0 && offset < count ) {
110
- // We can start querying from this datasource
111
- setImmediate ( function ( ) {
112
- findRecursive ( datasourceIndex + 1 , offset - count , datasourceIndex , offset ,
113
- totalCount + count , hasExactCount && exact ) ;
93
+ var metadataReader = {
94
+ _push : noop ,
95
+ close : noop ,
96
+ setProperty : function ( name , metadata ) {
97
+ if ( name !== 'metadata' ) return ;
98
+ // If we are still looking for an appropriate datasource, we need exact counts
99
+ var count = metadata . totalCount , exact = metadata . hasExactCount ;
100
+ if ( offset > 0 && ! exact ) {
101
+ self . _getExactCount ( datasource , query , function ( exactCount ) {
102
+ count = exactCount ;
103
+ exact = true ;
104
+ continueRecursion ( ) ;
114
105
} ) ;
115
106
}
116
- else {
117
- // We forward our accumulated information and go check the next datasource
118
- setImmediate ( function ( ) {
119
- findRecursive ( datasourceIndex + 1 , offset - count , chosenDatasource , chosenOffset ,
120
- totalCount + count , hasExactCount && exact ) ;
121
- } ) ;
107
+ else
108
+ continueRecursion ( ) ;
109
+
110
+ function continueRecursion ( ) {
111
+ if ( chosenDatasource < 0 && offset < count ) {
112
+ // We can start querying from this datasource
113
+ setImmediate ( function ( ) {
114
+ findRecursive ( datasourceIndex + 1 , offset - count , datasourceIndex , offset ,
115
+ totalCount + count , hasExactCount && exact ) ;
116
+ } ) ;
117
+ }
118
+ else {
119
+ // We forward our accumulated information and go check the next datasource
120
+ setImmediate ( function ( ) {
121
+ findRecursive ( datasourceIndex + 1 , offset - count , chosenDatasource , chosenOffset ,
122
+ totalCount + count , hasExactCount && exact ) ;
123
+ } ) ;
124
+ }
122
125
}
123
- }
124
- } ) ;
126
+ } ,
127
+ } ;
128
+ datasource . _executeQuery ( emptyQuery , metadataReader ) ;
125
129
}
126
130
}
127
131
} ;
128
132
129
133
// Writes the results of the query to the given triple stream
130
- CompositeDatasource . prototype . _executeQuery = function ( query , destination , metadataCallback ) {
134
+ CompositeDatasource . prototype . _executeQuery = function ( query , destination ) {
131
135
var offset = query . offset || 0 , limit = query . limit || Infinity ;
132
136
var self = this ;
133
137
this . _getDatasourceInfo ( query , offset , function ( datasourceIndex , relativeOffset , totalCount , hasExactCount ) {
134
138
if ( datasourceIndex < 0 ) {
135
139
// No valid datasource has been found
136
- metadataCallback ( { totalCount : totalCount , hasExactCount : hasExactCount } ) ;
140
+ destination . setProperty ( 'metadata' , { totalCount : totalCount , hasExactCount : hasExactCount } ) ;
137
141
destination . close ( ) ;
138
142
}
139
143
else {
140
144
// Send query to first applicable datasource and optionally emit triples from consecutive datasources
141
- metadataCallback ( { totalCount : totalCount , hasExactCount : hasExactCount } ) ;
142
- var emitted = 0 ;
145
+ destination . setProperty ( 'metadata' , { totalCount : totalCount , hasExactCount : hasExactCount } ) ;
143
146
144
147
// Modify our triple stream so that if all results from one datasource have arrived,
145
148
// check if we haven't reached the limit and if so, trigger a new query for the next datasource.
149
+ var emitted = 0 ;
146
150
countItems ( destination , function ( localEmittedCount ) {
147
151
// This is called after the last element has been pushed
148
152
0 commit comments