13
13
//!
14
14
//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
15
15
16
+ use crate :: sync:: Mutex ;
16
17
use alloc:: sync:: Arc ;
17
18
use core:: mem;
18
- use crate :: sync:: Mutex ;
19
19
20
20
#[ allow( unused_imports) ]
21
21
use crate :: prelude:: * ;
@@ -26,9 +26,8 @@ use crate::sync::Condvar;
26
26
use std:: time:: Duration ;
27
27
28
28
use core:: future:: Future as StdFuture ;
29
- use core:: task:: { Context , Poll } ;
30
29
use core:: pin:: Pin ;
31
-
30
+ use core :: task :: { Context , Poll } ;
32
31
33
32
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
34
33
pub ( crate ) struct Notifier {
@@ -37,9 +36,7 @@ pub(crate) struct Notifier {
37
36
38
37
impl Notifier {
39
38
pub ( crate ) fn new ( ) -> Self {
40
- Self {
41
- notify_pending : Mutex :: new ( ( false , None ) ) ,
42
- }
39
+ Self { notify_pending : Mutex :: new ( ( false , None ) ) }
43
40
}
44
41
45
42
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
@@ -198,7 +195,9 @@ impl Future {
198
195
if state. complete {
199
196
state. callbacks_made = true ;
200
197
true
201
- } else { false }
198
+ } else {
199
+ false
200
+ }
202
201
}
203
202
}
204
203
@@ -251,11 +250,8 @@ impl Sleeper {
251
250
// Note that this is the common case - a ChannelManager, a ChainMonitor, and an
252
251
// OnionMessenger.
253
252
pub fn from_three_futures ( fut_a : & Future , fut_b : & Future , fut_c : & Future ) -> Self {
254
- let notifiers = vec ! [
255
- Arc :: clone( & fut_a. state) ,
256
- Arc :: clone( & fut_b. state) ,
257
- Arc :: clone( & fut_c. state)
258
- ] ;
253
+ let notifiers =
254
+ vec ! [ Arc :: clone( & fut_a. state) , Arc :: clone( & fut_b. state) , Arc :: clone( & fut_c. state) ] ;
259
255
Self { notifiers }
260
256
}
261
257
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
@@ -289,8 +285,11 @@ impl Sleeper {
289
285
/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
290
286
pub fn wait ( & self ) {
291
287
let ( cv, notified_fut_mtx) = self . setup_wait ( ) ;
292
- let notified_fut = cv. wait_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , |fut_opt| fut_opt. is_none ( ) )
293
- . unwrap ( ) . take ( ) . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ;
288
+ let notified_fut = cv
289
+ . wait_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , |fut_opt| fut_opt. is_none ( ) )
290
+ . unwrap ( )
291
+ . take ( )
292
+ . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ;
294
293
notified_fut. lock ( ) . unwrap ( ) . callbacks_made = true ;
295
294
}
296
295
@@ -300,10 +299,13 @@ impl Sleeper {
300
299
pub fn wait_timeout ( & self , max_wait : Duration ) -> bool {
301
300
let ( cv, notified_fut_mtx) = self . setup_wait ( ) ;
302
301
let notified_fut =
303
- match cv. wait_timeout_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , max_wait, |fut_opt| fut_opt. is_none ( ) ) {
302
+ match cv. wait_timeout_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , max_wait, |fut_opt| {
303
+ fut_opt. is_none ( )
304
+ } ) {
304
305
Ok ( ( _, e) ) if e. timed_out ( ) => return false ,
305
- Ok ( ( mut notified_fut, _) ) =>
306
- notified_fut. take ( ) . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ,
306
+ Ok ( ( mut notified_fut, _) ) => notified_fut
307
+ . take ( )
308
+ . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ,
307
309
Err ( _) => panic ! ( "Previous panic while a lock was held led to a lock panic" ) ,
308
310
} ;
309
311
notified_fut. lock ( ) . unwrap ( ) . callbacks_made = true ;
@@ -314,8 +316,8 @@ impl Sleeper {
314
316
#[ cfg( test) ]
315
317
mod tests {
316
318
use super :: * ;
317
- use core:: sync:: atomic:: { AtomicBool , Ordering } ;
318
319
use core:: future:: Future as FutureTrait ;
320
+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
319
321
use core:: task:: { RawWaker , RawWakerVTable } ;
320
322
321
323
#[ test]
@@ -328,7 +330,9 @@ mod tests {
328
330
329
331
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
330
332
let callback_ref = Arc :: clone ( & callback) ;
331
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
333
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
334
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
335
+ } ) ) ;
332
336
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
333
337
}
334
338
@@ -343,15 +347,19 @@ mod tests {
343
347
// a second `notify`.
344
348
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
345
349
let callback_ref = Arc :: clone ( & callback) ;
346
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
350
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
351
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
352
+ } ) ) ;
347
353
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
348
354
349
355
notifier. notify ( ) ;
350
356
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
351
357
352
358
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
353
359
let callback_ref = Arc :: clone ( & callback) ;
354
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
360
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
361
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
362
+ } ) ) ;
355
363
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
356
364
357
365
notifier. notify ( ) ;
@@ -365,12 +373,16 @@ mod tests {
365
373
366
374
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
367
375
let callback_ref = Arc :: clone ( & callback) ;
368
- future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
376
+ future. register_callback ( Box :: new ( move || {
377
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
378
+ } ) ) ;
369
379
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
370
380
371
381
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
372
382
let callback_ref = Arc :: clone ( & callback) ;
373
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
383
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
384
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
385
+ } ) ) ;
374
386
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
375
387
}
376
388
@@ -384,12 +396,16 @@ mod tests {
384
396
385
397
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
386
398
let callback_ref = Arc :: clone ( & callback) ;
387
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
399
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
400
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
401
+ } ) ) ;
388
402
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
389
403
390
404
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
391
405
let callback_ref = Arc :: clone ( & callback) ;
392
- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
406
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
407
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
408
+ } ) ) ;
393
409
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
394
410
395
411
notifier. notify ( ) ;
@@ -407,12 +423,10 @@ mod tests {
407
423
408
424
let exit_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
409
425
let exit_thread_clone = exit_thread. clone ( ) ;
410
- thread:: spawn ( move || {
411
- loop {
412
- thread_notifier. notify ( ) ;
413
- if exit_thread_clone. load ( Ordering :: SeqCst ) {
414
- break
415
- }
426
+ thread:: spawn ( move || loop {
427
+ thread_notifier. notify ( ) ;
428
+ if exit_thread_clone. load ( Ordering :: SeqCst ) {
429
+ break ;
416
430
}
417
431
} ) ;
418
432
@@ -423,7 +437,7 @@ mod tests {
423
437
// available.
424
438
loop {
425
439
if persistence_notifier. get_future ( ) . wait_timeout ( Duration :: from_millis ( 100 ) ) {
426
- break
440
+ break ;
427
441
}
428
442
}
429
443
@@ -433,7 +447,7 @@ mod tests {
433
447
// are available.
434
448
loop {
435
449
if !persistence_notifier. get_future ( ) . wait_timeout ( Duration :: from_millis ( 100 ) ) {
436
- break
450
+ break ;
437
451
}
438
452
}
439
453
}
@@ -493,7 +507,9 @@ mod tests {
493
507
} ;
494
508
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
495
509
let callback_ref = Arc :: clone ( & callback) ;
496
- future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
510
+ future. register_callback ( Box :: new ( move || {
511
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
512
+ } ) ) ;
497
513
498
514
assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
499
515
complete_future ( & future. state ) ;
@@ -518,7 +534,9 @@ mod tests {
518
534
519
535
let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
520
536
let callback_ref = Arc :: clone ( & callback) ;
521
- future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
537
+ future. register_callback ( Box :: new ( move || {
538
+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
539
+ } ) ) ;
522
540
523
541
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
524
542
assert ! ( future. state. lock( ) . unwrap( ) . callbacks. is_empty( ) ) ;
@@ -529,17 +547,27 @@ mod tests {
529
547
// compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
530
548
// waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
531
549
const WAKER_V_TABLE : RawWakerVTable = RawWakerVTable :: new ( waker_clone, wake, wake_by_ref, drop) ;
532
- unsafe fn wake_by_ref ( ptr : * const ( ) ) { let p = ptr as * const Arc < AtomicBool > ; assert ! ( !( * p) . fetch_or( true , Ordering :: SeqCst ) ) ; }
533
- unsafe fn drop ( ptr : * const ( ) ) { let p = ptr as * mut Arc < AtomicBool > ; let _freed = Box :: from_raw ( p) ; }
534
- unsafe fn wake ( ptr : * const ( ) ) { wake_by_ref ( ptr) ; drop ( ptr) ; }
550
+ unsafe fn wake_by_ref ( ptr : * const ( ) ) {
551
+ let p = ptr as * const Arc < AtomicBool > ;
552
+ assert ! ( !( * p) . fetch_or( true , Ordering :: SeqCst ) ) ;
553
+ }
554
+ unsafe fn drop ( ptr : * const ( ) ) {
555
+ let p = ptr as * mut Arc < AtomicBool > ;
556
+ let _freed = Box :: from_raw ( p) ;
557
+ }
558
+ unsafe fn wake ( ptr : * const ( ) ) {
559
+ wake_by_ref ( ptr) ;
560
+ drop ( ptr) ;
561
+ }
535
562
unsafe fn waker_clone ( ptr : * const ( ) ) -> RawWaker {
536
563
let p = ptr as * const Arc < AtomicBool > ;
537
564
RawWaker :: new ( Box :: into_raw ( Box :: new ( Arc :: clone ( & * p) ) ) as * const ( ) , & WAKER_V_TABLE )
538
565
}
539
566
540
567
fn create_waker ( ) -> ( Arc < AtomicBool > , Waker ) {
541
568
let a = Arc :: new ( AtomicBool :: new ( false ) ) ;
542
- let waker = unsafe { Waker :: from_raw ( waker_clone ( ( & a as * const Arc < AtomicBool > ) as * const ( ) ) ) } ;
569
+ let waker =
570
+ unsafe { Waker :: from_raw ( waker_clone ( ( & a as * const Arc < AtomicBool > ) as * const ( ) ) ) } ;
543
571
( a, waker)
544
572
}
545
573
@@ -563,14 +591,20 @@ mod tests {
563
591
assert ! ( !woken. load( Ordering :: SeqCst ) ) ;
564
592
565
593
let ( second_woken, second_waker) = create_waker ( ) ;
566
- assert_eq ! ( Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) , Poll :: Pending ) ;
594
+ assert_eq ! (
595
+ Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) ,
596
+ Poll :: Pending
597
+ ) ;
567
598
assert ! ( !second_woken. load( Ordering :: SeqCst ) ) ;
568
599
569
600
complete_future ( & future. state ) ;
570
601
assert ! ( woken. load( Ordering :: SeqCst ) ) ;
571
602
assert ! ( second_woken. load( Ordering :: SeqCst ) ) ;
572
603
assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
573
- assert_eq ! ( Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) , Poll :: Ready ( ( ) ) ) ;
604
+ assert_eq ! (
605
+ Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) ,
606
+ Poll :: Ready ( ( ) )
607
+ ) ;
574
608
}
575
609
576
610
#[ test]
@@ -713,8 +747,12 @@ mod tests {
713
747
let callback_b = Arc :: new ( AtomicBool :: new ( false ) ) ;
714
748
let callback_a_ref = Arc :: clone ( & callback_a) ;
715
749
let callback_b_ref = Arc :: clone ( & callback_b) ;
716
- notifier_a. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_a_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
717
- notifier_b. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_b_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
750
+ notifier_a. get_future ( ) . register_callback ( Box :: new ( move || {
751
+ assert ! ( !callback_a_ref. fetch_or( true , Ordering :: SeqCst ) )
752
+ } ) ) ;
753
+ notifier_b. get_future ( ) . register_callback ( Box :: new ( move || {
754
+ assert ! ( !callback_b_ref. fetch_or( true , Ordering :: SeqCst ) )
755
+ } ) ) ;
718
756
assert ! ( callback_a. load( Ordering :: SeqCst ) ^ callback_b. load( Ordering :: SeqCst ) ) ;
719
757
720
758
// If we now notify both notifiers again, the other callback will fire, completing the
@@ -739,14 +777,23 @@ mod tests {
739
777
740
778
// Test that simply polling a future twice doesn't result in two pending `Waker`s.
741
779
let mut future_a = notifier. get_future ( ) ;
742
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
780
+ assert_eq ! (
781
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
782
+ Poll :: Pending
783
+ ) ;
743
784
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
744
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
785
+ assert_eq ! (
786
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
787
+ Poll :: Pending
788
+ ) ;
745
789
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
746
790
747
791
// If we poll a second future, however, that will store a second `Waker`.
748
792
let mut future_b = notifier. get_future ( ) ;
749
- assert_eq ! ( Pin :: new( & mut future_b) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
793
+ assert_eq ! (
794
+ Pin :: new( & mut future_b) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
795
+ Poll :: Pending
796
+ ) ;
750
797
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 2 ) ;
751
798
752
799
// but when we drop the `Future`s, the pending Wakers will also be dropped.
@@ -757,13 +804,22 @@ mod tests {
757
804
758
805
// Further, after polling a future twice, if the notifier is woken all Wakers are dropped.
759
806
let mut future_a = notifier. get_future ( ) ;
760
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
807
+ assert_eq ! (
808
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
809
+ Poll :: Pending
810
+ ) ;
761
811
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
762
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
812
+ assert_eq ! (
813
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
814
+ Poll :: Pending
815
+ ) ;
763
816
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
764
817
notifier. notify ( ) ;
765
818
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 0 ) ;
766
- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Ready ( ( ) ) ) ;
819
+ assert_eq ! (
820
+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
821
+ Poll :: Ready ( ( ) )
822
+ ) ;
767
823
assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 0 ) ;
768
824
}
769
825
}
0 commit comments