@@ -254,7 +254,6 @@ static int pooled_thread_func(void *void_arg) {
254
254
API_FUNC hw_pool_init_status hw_pool_init (uint32_t num_threads ) {
255
255
if unlikely (!num_threads ) return POOL_INIT_NO_THREADS_SPECIFIED ;
256
256
uint32_t old = 0u ;
257
- assert (num_threads < UINT32_MAX );
258
257
if unlikely (!atomic_compare_exchange_strong_explicit (& hw_pool .num_threads ,
259
258
& old ,
260
259
num_threads ,
@@ -284,16 +283,24 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) {
284
283
pooled_thread_control * thread_control =
285
284
(pooled_thread_control * )(buffer + alignment * (size_t )i );
286
285
init_thread_control (thread_control , i , & hw_pool );
287
- int status ;
286
+ if (i ) {
287
+ int status ;
288
288
#ifdef QPOOL_USE_PTHREADS
289
- status = pthread_create (
290
- & thread_control -> thread , & attr , pooled_thread_func , thread_control );
291
- if unlikely (status ) goto cleanup_threads ;
289
+ status = pthread_create (
290
+ & thread_control -> thread , & attr , pooled_thread_func , thread_control );
291
+ if unlikely (status ) goto cleanup_threads ;
292
292
#else
293
- status =
294
- thrd_create ( & thread_control -> thread , pooled_thread_func , thread_control );
295
- if unlikely (status != thrd_success ) goto cleanup_threads ;
293
+ status = thrd_create (
294
+ & thread_control -> thread , pooled_thread_func , thread_control );
295
+ if unlikely (status != thrd_success ) goto cleanup_threads ;
296
296
#endif
297
+ } else {
298
+ #ifndef NDEBUG
299
+ // Zero out the allocated thread object to make extra sure we never access
300
+ // it. It still has to be there to satisfy alignment constraints.
301
+ & thread_control -> thread = 0ul ;
302
+ #endif
303
+ }
297
304
++ i ;
298
305
}
299
306
#ifdef QPOOL_USE_PTHREADS
@@ -303,7 +310,13 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) {
303
310
return POOL_INIT_SUCCESS ;
304
311
cleanup_threads :
305
312
if (i ) {
313
+ // Last thread failed to launch, so no need to clean it up.
314
+ // If an error was raised it would have been at an iteration
315
+ // higher than 0 for the thread create loop since no thread is
316
+ // created at 0.
306
317
uint32_t j = -- i ;
318
+ // current thread does the work of worker zero so
319
+ // no need to signal or join for that one.
307
320
while (i ) {
308
321
// TODO: fix deinit to match new layout and interrupt mechanism.
309
322
pooled_thread_control * thread_control =
@@ -347,17 +360,17 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() {
347
360
atomic_load_explicit (& hw_pool .num_threads , memory_order_relaxed );
348
361
char * buffer = atomic_load_explicit (& hw_pool .threads , memory_order_relaxed );
349
362
size_t alignment = QTHREAD_MAX ((size_t )64u , get_cache_line_size ());
350
- uint32_t i = num_threads ;
363
+ uint32_t i = num_threads - 1u ;
364
+ // Current thread is thread 0 so no need to notify/join that one.
351
365
while (i ) {
352
- -- i ;
353
366
// TODO: fix deinit to match new layout and interrupt mechanism.
354
367
pooled_thread_control * thread_control =
355
368
(pooled_thread_control * )(buffer + alignment * (size_t )i );
356
369
notify_worker_of_termination (thread_control );
370
+ -- i ;
357
371
}
358
- i = num_threads ;
372
+ i = num_threads - 1u ;
359
373
while (i ) {
360
- -- i ;
361
374
pooled_thread_control * thread_control =
362
375
(pooled_thread_control * )(buffer + alignment * (size_t )i );
363
376
// TODO: crash informatively if join fails somehow.
@@ -366,6 +379,7 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() {
366
379
#else
367
380
thrd_join (thread_control -> thread , NULL );
368
381
#endif
382
+ -- i ;
369
383
}
370
384
371
385
atomic_store_explicit (& hw_pool .threads , NULL , memory_order_relaxed );
@@ -379,31 +393,40 @@ API_FUNC uint32_t get_num_delegated_threads() {
379
393
return 1 ;
380
394
}
381
395
382
- // TODO: have the main thread fill the role of thread 0.
383
- // Instead of having the main thread wait/resume, swap in its thread-locals
384
- // then have it run the per-thread function.
385
- // This will avoid the suspend/resume OS overheads for at least that thread.
396
+ // Note: current thread fills the role of thread zero in the pool.
386
397
387
398
API_FUNC void
388
399
pool_run_on_all (pool_header * pool , qt_threadpool_func_type func , void * arg ) {
389
400
uint32_t num_threads =
390
401
atomic_load_explicit (& pool -> num_threads , memory_order_relaxed );
391
402
assert (num_threads );
392
- assert (num_threads < UINT32_MAX );
393
- char * buffer =
394
- (char * )atomic_load_explicit (& pool -> threads , memory_order_relaxed );
395
- atomic_store_explicit (
396
- & pool -> num_active_threads , num_threads , memory_order_relaxed );
397
- init_main_sync (pool );
398
- size_t alignment = QTHREAD_MAX ((size_t )64u , get_cache_line_size ());
399
- for (uint32_t i = 0u ;
400
- i < atomic_load_explicit (& pool -> num_threads , memory_order_relaxed );
401
- i ++ ) {
402
- pooled_thread_control * thread_control =
403
- (pooled_thread_control * )(buffer + alignment * (size_t )i );
404
- launch_work_on_thread (thread_control , func , arg );
403
+ if (num_threads > 1u ) {
404
+ char * buffer =
405
+ (char * )atomic_load_explicit (& pool -> threads , memory_order_relaxed );
406
+ atomic_store_explicit (
407
+ & pool -> num_active_threads , num_threads - 1u , memory_order_relaxed );
408
+ init_main_sync (pool );
409
+ size_t alignment = QTHREAD_MAX ((size_t )64u , get_cache_line_size ());
410
+ for (uint32_t i = 1u ;
411
+ i < atomic_load_explicit (& pool -> num_threads , memory_order_relaxed );
412
+ i ++ ) {
413
+ pooled_thread_control * thread_control =
414
+ (pooled_thread_control * )(buffer + alignment * (size_t )i );
415
+ launch_work_on_thread (thread_control , func , arg );
416
+ }
417
+ }
418
+ uint32_t outer_index = context_index ;
419
+ context_index = 0u ;
420
+ pool_header * outer_delegated_pool = delegated_pool ;
421
+ delegated_pool = NULL ;
422
+ func (arg );
423
+ delegated_pool = outer_delegated_pool ;
424
+ context_index = outer_index ;
425
+ if (num_threads > 1u ) {
426
+ // some loops may have threads that take dramatically longer
427
+ // so we still suspend, but it's potentially for much less time.
428
+ suspend_main_while_working (pool );
405
429
}
406
- suspend_main_while_working (pool );
407
430
}
408
431
409
432
API_FUNC void run_on_current_pool (qt_threadpool_func_type func , void * arg ) {
0 commit comments