86
86
87
87
/// Handles the next batch index in the batch index queue, pushing the future in the pipeline
88
88
/// futures.
89
- fn handle_next_batch <
90
- F : FnMut ( & mut FuturesOrdered < DerivationPipelineFuture > , DerivationPipelineFuture ) ,
91
- > (
92
- & mut self ,
93
- mut queue_fut : F ,
94
- ) {
89
+ fn handle_next_batch ( & mut self ) -> Option < DerivationPipelineFuture > {
95
90
let database = self . database . clone ( ) ;
96
91
let provider = self . l1_provider . clone ( ) ;
97
92
@@ -105,8 +100,9 @@ where
105
100
106
101
derive ( batch, provider) . await . map_err ( |err| ( index, err) )
107
102
} ) ;
108
- queue_fut ( & mut self . pipeline_futures , fut) ;
103
+ return Some ( fut) ;
109
104
}
105
+ None
110
106
}
111
107
}
112
108
@@ -133,7 +129,9 @@ where
133
129
134
130
// if the futures can still grow, handle the next batch.
135
131
if this. pipeline_futures . len ( ) < MAX_CONCURRENT_DERIVATION_PIPELINE_FUTS {
136
- this. handle_next_batch ( |queue, fut| queue. push_back ( fut) ) ;
132
+ if let Some ( fut) = this. handle_next_batch ( ) {
133
+ this. pipeline_futures . push_back ( fut)
134
+ }
137
135
}
138
136
139
137
// poll the futures and handle result.
@@ -147,7 +145,9 @@ where
147
145
tracing:: error!( target: "scroll::node::derivation_pipeline" , ?index, ?err, "failed to derive payload attributes for batch" ) ;
148
146
// retry polling the same batch index.
149
147
this. batch_index_queue . push_front ( index) ;
150
- this. handle_next_batch ( |queue, fut| queue. push_front ( fut) ) ;
148
+ if let Some ( fut) = this. handle_next_batch ( ) {
149
+ this. pipeline_futures . push_front ( fut)
150
+ }
151
151
}
152
152
}
153
153
}
0 commit comments