Skip to content

Threadpool Manipulation APIs #354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/qt_threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ typedef enum {

hw_pool_init_status hw_pool_init(uint32_t num_threads);
void hw_pool_destroy();
void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg);
void run_on_current_pool(qt_threadpool_func_type func, void *arg);
uint32_t get_num_delegated_threads();

#endif
124 changes: 89 additions & 35 deletions src/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ typedef struct {
#endif
} pool_header;

_Thread_local pool_header *delegated_pool;

typedef struct {
// 16 byte aligned to allow loading it in one atomic instruction
// on architectures where that makes sense (most of them).
Expand Down Expand Up @@ -252,7 +254,6 @@ static int pooled_thread_func(void *void_arg) {
API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) {
if unlikely (!num_threads) return POOL_INIT_NO_THREADS_SPECIFIED;
uint32_t old = 0u;
assert(num_threads < UINT32_MAX);
if unlikely (!atomic_compare_exchange_strong_explicit(&hw_pool.num_threads,
&old,
num_threads,
Expand Down Expand Up @@ -281,35 +282,38 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) {
while (i < num_threads) {
pooled_thread_control *thread_control =
(pooled_thread_control *)(buffer + alignment * (size_t)i);
// Initialize the thread control struct in two 128b atomic writes.
// TODO: It's possible to just do this in a single 256b atomic write on most
// x86 platforms. That may also require increasing the alignment constraints
// for the control_slice.
// TODO: also ifdef in an implementation for platforms that can't do
// lock-free 128b writes or that don't handle mixed-size atomic writes.
// TODO: making some kind of ifunc to handle this initialization is probably
// actually the right way to do it because it's hard to know enough about
// the CPU at compile-time.
init_thread_control(thread_control, i, &hw_pool);
int status;
if (i) {
int status;
#ifdef QPOOL_USE_PTHREADS
status = pthread_create(
&thread_control->thread, &attr, pooled_thread_func, thread_control);
if unlikely (status) goto cleanup_threads;
status = pthread_create(
&thread_control->thread, &attr, pooled_thread_func, thread_control);
if unlikely (status) goto cleanup_threads;
#else
status =
thrd_create(&thread_control->thread, pooled_thread_func, thread_control);
if unlikely (status != thrd_success) goto cleanup_threads;
status = thrd_create(
&thread_control->thread, pooled_thread_func, thread_control);
if unlikely (status != thrd_success) goto cleanup_threads;
#endif
}
// Leave the thread object uninitialized for thread 0.
// It needs to be there for the sake of alignment,
// but other than that it's unused.
++i;
}
#ifdef QPOOL_USE_PTHREADS
pthread_attr_destroy(&attr);
#endif
delegated_pool = &hw_pool;
return POOL_INIT_SUCCESS;
cleanup_threads:
if (i) {
// Last thread failed to launch, so no need to clean it up.
// If an error was raised it would have been at an iteration
// higher than 0 for the thread create loop since no thread is
// created at 0.
uint32_t j = --i;
// current thread does the work of worker zero so
// no need to signal or join for that one.
while (i) {
// TODO: fix deinit to match new layout and interrupt mechanism.
pooled_thread_control *thread_control =
Expand Down Expand Up @@ -348,21 +352,22 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) {
}

API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() {
delegated_pool = NULL;
uint32_t num_threads =
atomic_load_explicit(&hw_pool.num_threads, memory_order_relaxed);
char *buffer = atomic_load_explicit(&hw_pool.threads, memory_order_relaxed);
size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size());
uint32_t i = num_threads;
uint32_t i = num_threads - 1u;
// Current thread is thread 0 so no need to notify/join that one.
while (i) {
--i;
// TODO: fix deinit to match new layout and interrupt mechanism.
pooled_thread_control *thread_control =
(pooled_thread_control *)(buffer + alignment * (size_t)i);
notify_worker_of_termination(thread_control);
--i;
}
i = num_threads;
i = num_threads - 1u;
while (i) {
--i;
pooled_thread_control *thread_control =
(pooled_thread_control *)(buffer + alignment * (size_t)i);
// TODO: crash informatively if join fails somehow.
Expand All @@ -371,36 +376,85 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() {
#else
thrd_join(thread_control->thread, NULL);
#endif
--i;
}

atomic_store_explicit(&hw_pool.threads, NULL, memory_order_relaxed);
free(buffer);
atomic_store_explicit(&hw_pool.num_threads, 0, memory_order_release);
}

API_FUNC uint32_t get_num_delegated_threads() {
if (delegated_pool) return delegated_pool->num_threads;
// Every thread at least has itself available for work.
return 1;
}

// Note: current thread fills the role of thread zero in the pool.

API_FUNC void
pool_run_on_all(pool_header *pool, qt_threadpool_func_type func, void *arg) {
uint32_t num_threads =
atomic_load_explicit(&pool->num_threads, memory_order_relaxed);
assert(num_threads);
assert(num_threads < UINT32_MAX);
char *buffer =
(char *)atomic_load_explicit(&pool->threads, memory_order_relaxed);
atomic_store_explicit(
&pool->num_active_threads, num_threads, memory_order_relaxed);
init_main_sync(pool);
size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size());
for (uint32_t i = 0u;
i < atomic_load_explicit(&pool->num_threads, memory_order_relaxed);
i++) {
pooled_thread_control *thread_control =
(pooled_thread_control *)(buffer + alignment * (size_t)i);
launch_work_on_thread(thread_control, func, arg);
if (num_threads > 1u) {
char *buffer =
(char *)atomic_load_explicit(&pool->threads, memory_order_relaxed);
atomic_store_explicit(
&pool->num_active_threads, num_threads - 1u, memory_order_relaxed);
init_main_sync(pool);
size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size());
for (uint32_t i = 1u;
i < atomic_load_explicit(&pool->num_threads, memory_order_relaxed);
i++) {
pooled_thread_control *thread_control =
(pooled_thread_control *)(buffer + alignment * (size_t)i);
launch_work_on_thread(thread_control, func, arg);
}
}
uint32_t outer_index = context_index;
context_index = 0u;
pool_header *outer_delegated_pool = delegated_pool;
delegated_pool = NULL;
func(arg);
delegated_pool = outer_delegated_pool;
context_index = outer_index;
if (num_threads > 1u) {
// some loops may have threads that take dramatically longer
// so we still suspend, but it's potentially for much less time.
suspend_main_while_working(pool);
}
}

API_FUNC void run_on_current_pool(qt_threadpool_func_type func, void *arg) {
if (delegated_pool) {
pool_run_on_all(delegated_pool, func, arg);
} else {
uint32_t outer_index = context_index;
context_index = 0;
func(arg);
context_index = outer_index;
}
suspend_main_while_working(pool);
}

API_FUNC void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg) {
pool_run_on_all(&hw_pool, func, arg);
}

API_FUNC void divide_pool(uint32_t num_groups, ...) {
// TODO: for each group:
// make a new threadpool header for the group
// wake the leader thread and have it:
// update its own thread-local thread pool and index
// re-wake and launch a new iteration loop on its delegated worker
// threads, having them:
// update their thread-local indices then launch their own iteration
// loops
// wait for the other threads in the group to finish (busy or futex?)
// restore its own thread-locals
// signal completion to main via the atomic on the outer pool
// have the main thread act as leader for the first group
// wait for the groups to finish (busy or futex?)
;
}

6 changes: 5 additions & 1 deletion test/internal/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
#include "qt_threadpool.h"

static int on_thread_test(void *arg) {
test_check(get_num_delegated_threads() == 1);
printf("hello from thread\n");
return 0;
}

int main() {
test_check(get_num_delegated_threads() == 1);
hw_pool_init(2);
test_check(get_num_delegated_threads() == 2);
hw_pool_destroy();
test_check(get_num_delegated_threads() == 1);
hw_pool_init(2);
hw_pool_run_on_all(&on_thread_test, NULL);
run_on_current_pool(&on_thread_test, NULL);
hw_pool_destroy();
printf("exited successfully\n");
fflush(stdout);
Expand Down
Loading