@@ -12,8 +12,8 @@ use graph::components::ethereum::{EthereumAdapter as EthereumAdapterTrait, *};
12
12
use graph:: prelude:: {
13
13
debug, err_msg, error, ethabi, format_err,
14
14
futures03:: { self , compat:: Future01CompatExt , FutureExt , StreamExt , TryStreamExt } ,
15
- hex, retry, stream, tiny_keccak, trace, warn, web3, ChainStore , DynTryFuture , Error ,
16
- EthereumCallCache , Logger , TimeoutError ,
15
+ hex, retry, stream, tiny_keccak, trace, warn, web3, ChainStore , CheapClone , DynTryFuture ,
16
+ Error , EthereumCallCache , Logger , TimeoutError ,
17
17
} ;
18
18
use web3:: api:: Web3 ;
19
19
use web3:: transports:: batch:: Batch ;
@@ -62,6 +62,15 @@ lazy_static! {
62
62
. expect( "invalid GRAPH_ETHEREUM_REQUEST_RETRIES env var" ) ;
63
63
}
64
64
65
+ impl < T : web3:: Transport > CheapClone for EthereumAdapter < T > {
66
+ fn cheap_clone ( & self ) -> Self {
67
+ Self {
68
+ web3 : self . web3 . cheap_clone ( ) ,
69
+ metrics : self . metrics . cheap_clone ( ) ,
70
+ }
71
+ }
72
+ }
73
+
65
74
impl < T > EthereumAdapter < T >
66
75
where
67
76
T : web3:: BatchTransport + Send + Sync + ' static ,
@@ -168,7 +177,7 @@ where
168
177
subgraph_metrics : Arc < SubgraphEthRpcMetrics > ,
169
178
from : u64 ,
170
179
to : u64 ,
171
- filter : EthGetLogsFilter ,
180
+ filter : Arc < EthGetLogsFilter > ,
172
181
too_many_logs_fingerprints : & ' static [ & ' static str ] ,
173
182
) -> impl Future < Item = Vec < Log > , Error = TimeoutError < web3:: error:: Error > > {
174
183
let eth_adapter = self . clone ( ) ;
@@ -276,7 +285,9 @@ where
276
285
}
277
286
278
287
// Collect all event sigs
279
- let eth = self . clone ( ) ;
288
+ let eth = self . cheap_clone ( ) ;
289
+ let filter = Arc :: new ( filter) ;
290
+
280
291
let step = match filter. contracts . is_empty ( ) {
281
292
// `to - from + 1` blocks will be scanned.
282
293
false => to - from,
@@ -287,10 +298,10 @@ where
287
298
// node returns an error that signifies the request is to heavy to process, the range will
288
299
// be broken down to smaller steps.
289
300
futures03:: stream:: try_unfold ( ( from, step) , move |( start, step) | {
290
- let logger = logger. clone ( ) ;
291
- let filter = filter. clone ( ) ;
292
- let eth = eth. clone ( ) ;
293
- let subgraph_metrics = subgraph_metrics. clone ( ) ;
301
+ let logger = logger. cheap_clone ( ) ;
302
+ let filter = filter. cheap_clone ( ) ;
303
+ let eth = eth. cheap_clone ( ) ;
304
+ let subgraph_metrics = subgraph_metrics. cheap_clone ( ) ;
294
305
295
306
async move {
296
307
if start > to {
@@ -305,10 +316,10 @@ where
305
316
let res = eth
306
317
. logs_with_sigs (
307
318
& logger,
308
- subgraph_metrics. clone ( ) ,
319
+ subgraph_metrics. cheap_clone ( ) ,
309
320
start,
310
321
end,
311
- filter. clone ( ) ,
322
+ filter. cheap_clone ( ) ,
312
323
TOO_MANY_LOGS_FINGERPRINTS ,
313
324
)
314
325
. compat ( )
@@ -1035,12 +1046,18 @@ where
1035
1046
to : u64 ,
1036
1047
log_filter : EthereumLogFilter ,
1037
1048
) -> DynTryFuture < ' static , Vec < Log > , Error > {
1038
- let eth: Self = self . clone ( ) ;
1049
+ let eth: Self = self . cheap_clone ( ) ;
1039
1050
let logger = logger. clone ( ) ;
1040
1051
1041
1052
futures03:: stream:: iter ( log_filter. eth_get_logs_filters ( ) . map ( move |filter| {
1042
- eth. clone ( )
1043
- . log_stream ( logger. clone ( ) , subgraph_metrics. clone ( ) , from, to, filter)
1053
+ eth. cheap_clone ( )
1054
+ . log_stream (
1055
+ logger. cheap_clone ( ) ,
1056
+ subgraph_metrics. cheap_clone ( ) ,
1057
+ from,
1058
+ to,
1059
+ filter,
1060
+ )
1044
1061
. into_stream ( )
1045
1062
} ) )
1046
1063
. flatten ( )
0 commit comments