@@ -157,7 +157,7 @@ pub(crate) struct LdkLiteEventQueue<K: Deref>
157
157
where
158
158
K :: Target : KVStorePersister ,
159
159
{
160
- queue : Mutex < EventQueueSerWrapper > ,
160
+ queue : Mutex < VecDeque < Arc < Event > > > ,
161
161
notifier : Condvar ,
162
162
persister : K ,
163
163
}
@@ -167,15 +167,15 @@ where
167
167
K :: Target : KVStorePersister ,
168
168
{
169
169
pub ( crate ) fn new ( persister : K ) -> Self {
170
- let queue: Mutex < EventQueueSerWrapper > = Mutex :: new ( EventQueueSerWrapper ( VecDeque :: new ( ) ) ) ;
170
+ let queue: Mutex < VecDeque < Arc < Event > > > = Mutex :: new ( VecDeque :: new ( ) ) ;
171
171
let notifier = Condvar :: new ( ) ;
172
172
Self { queue, notifier, persister }
173
173
}
174
174
pub ( crate ) fn add_event ( & self , event : LdkLiteEvent ) -> Result < ( ) , Error > {
175
175
{
176
176
let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
177
- locked_queue. 0 . push_back ( Arc :: new ( event) ) ;
178
- self . persister . persist ( EVENTS_PERSISTENCE_KEY , & * locked_queue) ?;
177
+ locked_queue. push_back ( Arc :: new ( event) ) ;
178
+ self . persist_queue ( & * locked_queue) ?;
179
179
}
180
180
181
181
self . notifier . notify_one ( ) ;
@@ -185,20 +185,27 @@ where
185
185
pub ( crate ) fn next_event ( & self ) -> Arc < LdkLiteEvent > {
186
186
let locked_queue = self
187
187
. notifier
188
- . wait_while ( self . queue . lock ( ) . unwrap ( ) , |queue| queue. 0 . is_empty ( ) )
188
+ . wait_while ( self . queue . lock ( ) . unwrap ( ) , |queue| queue. is_empty ( ) )
189
189
. unwrap ( ) ;
190
- Arc :: clone ( & locked_queue. 0 . front ( ) . unwrap ( ) )
190
+ Arc :: clone ( & locked_queue. front ( ) . unwrap ( ) )
191
191
}
192
192
193
193
pub ( crate ) fn event_handled ( & self ) -> Result < ( ) , Error > {
194
194
{
195
195
let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
196
- locked_queue. 0 . pop_front ( ) ;
197
- self . persister . persist ( EVENTS_PERSISTENCE_KEY , & * locked_queue) ?;
196
+ locked_queue. pop_front ( ) ;
197
+ self . persist_queue ( & * locked_queue) ?;
198
198
}
199
199
self . notifier . notify_one ( ) ;
200
200
Ok ( ( ) )
201
201
}
202
+
203
+ fn persist_queue ( & self , locked_queue : & VecDeque < Arc < Event > > ) -> Result < ( ) , Error > {
204
+ self . persister
205
+ . persist ( EVENTS_PERSISTENCE_KEY , & EventQueueSerWrapper ( locked_queue) )
206
+ . map_err ( |_| Error :: PersistenceFailed ) ?;
207
+ Ok ( ( ) )
208
+ }
202
209
}
203
210
204
211
impl < K : Deref > ReadableArgs < K > for LdkLiteEventQueue < K >
@@ -209,15 +216,16 @@ where
209
216
fn read < R : lightning:: io:: Read > (
210
217
reader : & mut R , persister : K ,
211
218
) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
212
- let queue: Mutex < EventQueueSerWrapper > = Mutex :: new ( Readable :: read ( reader) ?) ;
219
+ let read_queue: EventQueueDeserWrapper = Readable :: read ( reader) ?;
220
+ let queue: Mutex < VecDeque < Arc < Event > > > = Mutex :: new ( read_queue. 0 ) ;
213
221
let notifier = Condvar :: new ( ) ;
214
222
Ok ( Self { queue, notifier, persister } )
215
223
}
216
224
}
217
225
218
- struct EventQueueSerWrapper ( VecDeque < Arc < LdkLiteEvent > > ) ;
226
+ struct EventQueueDeserWrapper ( VecDeque < Arc < LdkLiteEvent > > ) ;
219
227
220
- impl Readable for EventQueueSerWrapper {
228
+ impl Readable for EventQueueDeserWrapper {
221
229
fn read < R : lightning:: io:: Read > (
222
230
reader : & mut R ,
223
231
) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
@@ -226,11 +234,13 @@ impl Readable for EventQueueSerWrapper {
226
234
for _ in 0 ..len {
227
235
queue. push_back ( Arc :: new ( Readable :: read ( reader) ?) ) ;
228
236
}
229
- Ok ( EventQueueSerWrapper ( queue) )
237
+ Ok ( Self ( queue) )
230
238
}
231
239
}
232
240
233
- impl Writeable for EventQueueSerWrapper {
241
+ struct EventQueueSerWrapper < ' a > ( & ' a VecDeque < Arc < Event > > ) ;
242
+
243
+ impl Writeable for EventQueueSerWrapper < ' _ > {
234
244
fn write < W : Writer > ( & self , writer : & mut W ) -> Result < ( ) , lightning:: io:: Error > {
235
245
( self . 0 . len ( ) as u16 ) . write ( writer) ?;
236
246
for e in self . 0 . iter ( ) {
@@ -601,7 +611,7 @@ mod tests {
601
611
602
612
// Check we get the expected event and that it is returned until we mark it handled.
603
613
for _ in 0 ..5 {
604
- assert_eq ! ( event_queue. next_event( ) , expected_event) ;
614
+ assert_eq ! ( * event_queue. next_event( ) , expected_event) ;
605
615
assert_eq ! ( false , test_persister. get_and_clear_pending_persist( ) ) ;
606
616
}
607
617
0 commit comments