@@ -5,7 +5,7 @@ use std::{
5
5
pin:: Pin ,
6
6
sync:: atomic:: { AtomicUsize , Ordering } ,
7
7
task:: { Context , Poll } ,
8
- thread,
8
+ thread, usize ,
9
9
} ;
10
10
11
11
use futures_core:: ready;
@@ -80,42 +80,79 @@ impl ArbiterHandle {
80
80
}
81
81
}
82
82
83
- /// An Arbiter represents a thread that provides an asynchronous execution environment for futures
84
- /// and functions.
85
- ///
86
- /// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
87
- #[ derive( Debug ) ]
88
- pub struct Arbiter {
89
- tx : mpsc:: UnboundedSender < ArbiterCommand > ,
90
- thread_handle : thread:: JoinHandle < ( ) > ,
83
+ /// A builder for configuring and spawning a new [Arbiter] thread.
84
+ pub struct ArbiterBuilder {
85
+ name_factory : Option < Box < dyn Fn ( usize , usize ) -> String + ' static > > ,
86
+ #[ cfg( not( all( target_os = "linux" , feature = "io-uring" ) ) ) ]
87
+ runtime_factory : Option < Box < dyn Fn ( ) -> tokio:: runtime:: Runtime + Send + ' static > > ,
91
88
}
92
89
93
- impl Arbiter {
94
- /// Spawn a new Arbiter thread and start its event loop.
95
- ///
96
- /// # Panics
97
- /// Panics if a [System] is not registered on the current thread.
98
- #[ cfg( not( all( target_os = "linux" , feature = "io-uring" ) ) ) ]
90
+ impl ArbiterBuilder {
91
+ /// Create a new [ArbiterBuilder].
99
92
#[ allow( clippy:: new_without_default) ]
100
- pub fn new ( ) -> Arbiter {
101
- Self :: with_tokio_rt ( || {
102
- crate :: runtime:: default_tokio_runtime ( ) . expect ( "Cannot create new Arbiter's Runtime." )
103
- } )
93
+ pub fn new ( ) -> Self {
94
+ Self {
95
+ name_factory : None ,
96
+ #[ cfg( not( all( target_os = "linux" , feature = "io-uring" ) ) ) ]
97
+ runtime_factory : None ,
98
+ }
104
99
}
105
100
106
- /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
101
+ /// Specify a factory function for generating the name of the Arbiter thread.
102
+ ///
103
+ /// Defaults to `actix-rt|system:<system_id>|arbiter:<arb_id>`
104
+ ///
105
+ /// # Example
106
+ ///
107
+ /// ```no_run
108
+ /// let _ = actix_rt::System::new();
109
+ /// actix_rt::ArbiterBuilder::new()
110
+ /// .name(|system_id, arb_id| {
111
+ /// format!("some-prefix|system:{}|arbiter:{}", system_id, arb_id)
112
+ /// })
113
+ /// .build();
114
+ /// ```
115
+ pub fn name < N > ( mut self , name_factory : N ) -> Self
116
+ where
117
+ N : Fn ( usize , usize ) -> String + ' static ,
118
+ {
119
+ self . name_factory = Some ( Box :: new ( name_factory) ) ;
120
+ self
121
+ }
122
+
123
+ /// Specify a factory function for generating the [Tokio Runtime](tokio-runtime) used by the Arbiter.
107
124
///
108
125
/// [tokio-runtime]: tokio::runtime::Runtime
109
126
#[ cfg( not( all( target_os = "linux" , feature = "io-uring" ) ) ) ]
110
- pub fn with_tokio_rt < F > ( runtime_factory : F ) -> Arbiter
127
+ pub fn runtime < R > ( mut self , runtime_factory : R ) -> Self
111
128
where
112
- F : Fn ( ) -> tokio:: runtime:: Runtime + Send + ' static ,
129
+ R : Fn ( ) -> tokio:: runtime:: Runtime + Send + ' static ,
113
130
{
131
+ self . runtime_factory = Some ( Box :: new ( runtime_factory) ) ;
132
+ self
133
+ }
134
+
135
+ /// Spawn a new Arbiter thread and start its event loop.
136
+ ///
137
+ /// # Panics
138
+ /// Panics if a [System] is not registered on the current thread.
139
+ #[ cfg( not( all( target_os = "linux" , feature = "io-uring" ) ) ) ]
140
+ pub fn build ( self ) -> Arbiter {
114
141
let sys = System :: current ( ) ;
115
142
let system_id = sys. id ( ) ;
116
143
let arb_id = COUNT . fetch_add ( 1 , Ordering :: Relaxed ) ;
117
144
118
- let name = format ! ( "actix-rt|system:{}|arbiter:{}" , system_id, arb_id) ;
145
+ let name = self . name_factory . unwrap_or_else ( || {
146
+ Box :: new ( |system_id, arb_id| {
147
+ format ! ( "actix-rt|system:{}|arbiter:{}" , system_id, arb_id)
148
+ } )
149
+ } ) ( system_id, arb_id) ;
150
+ let runtime_factory = self . runtime_factory . unwrap_or_else ( || {
151
+ Box :: new ( || {
152
+ crate :: runtime:: default_tokio_runtime ( )
153
+ . expect ( "Cannot create new Arbiter's Runtime." )
154
+ } )
155
+ } ) ;
119
156
let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
120
157
121
158
let ( ready_tx, ready_rx) = std:: sync:: mpsc:: channel :: < ( ) > ( ) ;
@@ -160,13 +197,16 @@ impl Arbiter {
160
197
/// # Panics
161
198
/// Panics if a [System] is not registered on the current thread.
162
199
#[ cfg( all( target_os = "linux" , feature = "io-uring" ) ) ]
163
- #[ allow( clippy:: new_without_default) ]
164
- pub fn new ( ) -> Arbiter {
200
+ pub fn build ( self ) -> Arbiter {
165
201
let sys = System :: current ( ) ;
166
202
let system_id = sys. id ( ) ;
167
203
let arb_id = COUNT . fetch_add ( 1 , Ordering :: Relaxed ) ;
168
204
169
- let name = format ! ( "actix-rt|system:{}|arbiter:{}" , system_id, arb_id) ;
205
+ let name = self . name_factory . unwrap_or_else ( || {
206
+ Box :: new ( |system_id, arb_id| {
207
+ format ! ( "actix-rt|system:{}|arbiter:{}" , system_id, arb_id)
208
+ } )
209
+ } ) ( system_id, arb_id) ;
170
210
let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
171
211
172
212
let ( ready_tx, ready_rx) = std:: sync:: mpsc:: channel :: < ( ) > ( ) ;
@@ -204,6 +244,54 @@ impl Arbiter {
204
244
205
245
Arbiter { tx, thread_handle }
206
246
}
247
+ }
248
+
249
+ /// An Arbiter represents a thread that provides an asynchronous execution environment for futures
250
+ /// and functions.
251
+ ///
252
+ /// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
253
+ #[ derive( Debug ) ]
254
+ pub struct Arbiter {
255
+ tx : mpsc:: UnboundedSender < ArbiterCommand > ,
256
+ thread_handle : thread:: JoinHandle < ( ) > ,
257
+ }
258
+
259
+ impl Arbiter {
260
+ /// Create an [ArbiterBuilder] to configure and spawn a new Arbiter thread.
261
+ pub fn builder ( ) -> ArbiterBuilder {
262
+ ArbiterBuilder :: new ( )
263
+ }
264
+
265
+ /// Spawn a new Arbiter thread and start its event loop.
266
+ ///
267
+ /// # Panics
268
+ /// Panics if a [System] is not registered on the current thread.
269
+ #[ cfg( not( all( target_os = "linux" , feature = "io-uring" ) ) ) ]
270
+ #[ allow( clippy:: new_without_default) ]
271
+ pub fn new ( ) -> Arbiter {
272
+ ArbiterBuilder :: new ( ) . build ( )
273
+ }
274
+
275
+ /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
276
+ ///
277
+ /// [tokio-runtime]: tokio::runtime::Runtime
278
+ #[ cfg( not( all( target_os = "linux" , feature = "io-uring" ) ) ) ]
279
+ pub fn with_tokio_rt < F > ( runtime_factory : F ) -> Arbiter
280
+ where
281
+ F : Fn ( ) -> tokio:: runtime:: Runtime + Send + ' static ,
282
+ {
283
+ ArbiterBuilder :: new ( ) . runtime ( runtime_factory) . build ( )
284
+ }
285
+
286
+ /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
287
+ ///
288
+ /// # Panics
289
+ /// Panics if a [System] is not registered on the current thread.
290
+ #[ cfg( all( target_os = "linux" , feature = "io-uring" ) ) ]
291
+ #[ allow( clippy:: new_without_default) ]
292
+ pub fn new ( ) -> Arbiter {
293
+ ArbiterBuilder :: new ( ) . build ( )
294
+ }
207
295
208
296
/// Sets up an Arbiter runner in a new System using the environment's local set.
209
297
pub ( crate ) fn in_new_system ( ) -> ArbiterHandle {
0 commit comments