2
2
use std:: cell:: { Cell , UnsafeCell } ;
3
3
use std:: iter;
4
4
#[ allow( unused) ]
5
- use std:: sync:: { Arc , RwLock , RwLockReadGuard , RwLockWriteGuard , TryLockError } ;
5
+ use std:: sync:: { Arc , Mutex , RwLock , RwLockReadGuard , RwLockWriteGuard , TryLockError } ;
6
6
#[ allow( unused) ]
7
7
use std:: thread;
8
8
use std:: time:: Duration ;
9
9
10
+ #[ cfg( feature = "with_client_implementation" ) ]
11
+ use fragile:: Sticky ;
12
+
10
13
#[ allow( unused) ]
11
14
use std:: sync:: atomic:: { AtomicBool , Ordering } ;
12
15
@@ -75,25 +78,16 @@ impl<F: FnOnce() -> I, I: IntoBreadcrumbs> IntoBreadcrumbs for F {
75
78
}
76
79
}
77
80
78
- #[ doc( hidden) ]
79
- #[ cfg( feature = "with_client_implementation" ) ]
80
- pub struct PendingProcessors (
81
- Vec < (
82
- Option < thread:: ThreadId > ,
83
- Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > > ,
84
- ) > ,
85
- ) ;
86
-
87
- // this is not actually sync but our uses of it are
88
- #[ cfg( feature = "with_client_implementation" ) ]
89
- unsafe impl Sync for PendingProcessors { }
90
81
#[ cfg( feature = "with_client_implementation" ) ]
91
- unsafe impl Send for PendingProcessors { }
82
+ enum PendingProcessor {
83
+ Send ( Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > + Send + Sync > ) ,
84
+ NonSend ( Sticky < Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > > > ) ,
85
+ }
92
86
93
87
#[ cfg( feature = "with_client_implementation" ) ]
94
88
struct HubImpl {
95
89
stack : RwLock < Stack > ,
96
- pending_processors : RwLock < PendingProcessors > ,
90
+ pending_processors : Mutex < Vec < PendingProcessor > > ,
97
91
has_pending_processors : AtomicBool ,
98
92
}
99
93
@@ -109,9 +103,9 @@ impl HubImpl {
109
103
f ( & mut * guard)
110
104
}
111
105
112
- fn with_processors_mut < F : FnOnce ( & mut PendingProcessors ) -> R , R > ( & self , f : F ) -> R {
106
+ fn with_processors_mut < F : FnOnce ( & mut Vec < PendingProcessor > ) -> R , R > ( & self , f : F ) -> R {
113
107
f ( & mut * self . pending_processors
114
- . write ( )
108
+ . lock ( )
115
109
. unwrap_or_else ( |x| x. into_inner ( ) ) )
116
110
}
117
111
@@ -160,7 +154,7 @@ impl Hub {
160
154
Hub {
161
155
inner : HubImpl {
162
156
stack : RwLock :: new ( Stack :: from_client_and_scope ( client, scope) ) ,
163
- pending_processors : RwLock :: new ( PendingProcessors ( vec ! [ ] ) ) ,
157
+ pending_processors : Mutex :: new ( vec ! [ ] ) ,
164
158
has_pending_processors : AtomicBool :: new ( false ) ,
165
159
} ,
166
160
}
@@ -423,7 +417,7 @@ impl Hub {
423
417
pub fn add_event_processor ( & self , f : Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > > ) {
424
418
with_client_impl ! { {
425
419
self . inner. with_processors_mut( |pending| {
426
- pending. 0 . push( ( Some ( thread :: current ( ) . id ( ) ) , f ) ) ;
420
+ pending. push( PendingProcessor :: NonSend ( Sticky :: new ( f ) ) ) ;
427
421
} ) ;
428
422
self . inner. has_pending_processors. store( true , Ordering :: Release ) ;
429
423
} }
@@ -438,12 +432,12 @@ impl Hub {
438
432
#[ allow( unused) ]
439
433
pub fn add_send_event_processor (
440
434
& self ,
441
- f : Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > + Send > ,
435
+ f : Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > + Send + Sync > ,
442
436
) {
443
437
with_client_impl ! { {
444
438
use std:: mem;
445
439
self . inner. with_processors_mut( |pending| {
446
- pending. 0 . push( ( None , unsafe { mem :: transmute ( f ) } ) ) ;
440
+ pending. push( PendingProcessor :: Send ( f ) ) ;
447
441
} ) ;
448
442
self . inner. has_pending_processors. store( true , Ordering :: Release ) ;
449
443
} }
@@ -455,25 +449,27 @@ impl Hub {
455
449
return ;
456
450
}
457
451
let mut new_processors = vec![ ] ;
458
- let this_thread = thread:: current( ) . id( ) ;
459
- let any_left = self . inner. with_processors_mut( |pending| {
460
- let vec = & mut pending. 0 ;
452
+ let any_left = self . inner. with_processors_mut( |vec| {
461
453
let mut i = 0 ;
462
454
while i < vec. len( ) {
463
- let is_safe_call = match vec[ i] . 0 {
464
- None => true ,
465
- Some ( thread_id ) => thread_id == this_thread ,
455
+ let is_safe_call = match vec[ i] {
456
+ PendingProcessor :: Send ( .. ) => true ,
457
+ PendingProcessor :: NonSend ( ref sticky ) => sticky . is_valid ( ) ,
466
458
} ;
467
- if is_safe_call {
468
- let mut item = vec. remove( i) ;
469
- let processor: & mut Box <FnMut ( ) -> Box <Fn ( & mut Event ) + Send + Sync >> = unsafe {
470
- use std:: mem;
471
- mem:: transmute( & mut item. 1 )
472
- } ;
473
- new_processors. push( processor( ) ) ;
474
- } else {
459
+ if !is_safe_call {
475
460
i += 1 ;
461
+ continue ;
476
462
}
463
+
464
+ let mut item = vec. remove( i) ;
465
+ let processor: & mut Box <FnMut ( ) -> Box <Fn ( & mut Event ) + Send + Sync >> = unsafe {
466
+ use std:: mem;
467
+ match item {
468
+ PendingProcessor :: Send ( ref mut func) => mem:: transmute( & mut * func) ,
469
+ PendingProcessor :: NonSend ( ref mut func) => mem:: transmute( func. get_mut( ) ) ,
470
+ }
471
+ } ;
472
+ new_processors. push( processor( ) ) ;
477
473
}
478
474
!vec. is_empty( )
479
475
} ) ;
0 commit comments