From 0c7404b0200835a1c2407bd36c6c124482e9bb11 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Wed, 30 Apr 2025 16:13:02 -0500 Subject: [PATCH 01/21] stopping point --- src/conv-taskQ.cpp | 59 +++++++++++++++++++++++ src/conv-taskQ.h | 19 ++++++++ src/convcore.cpp | 15 +++++- src/converse_internal.h | 3 ++ src/scheduler.cpp | 20 ++++++-- src/taskqueue.h | 103 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 212 insertions(+), 7 deletions(-) create mode 100644 src/conv-taskQ.cpp create mode 100644 src/conv-taskQ.h create mode 100644 src/taskqueue.h diff --git a/src/conv-taskQ.cpp b/src/conv-taskQ.cpp new file mode 100644 index 0000000..b40bf1c --- /dev/null +++ b/src/conv-taskQ.cpp @@ -0,0 +1,59 @@ +#include "conv-taskQ.h" + +void StealTask() { +// start up timer if trace is enabled +#if CMK_TRACE_ENABLED + double _start = CmiWallTimer(); +#endif + + //steal from a random PE on the same node + int random_rank = CrnRand() % (CmiMyNodeSize()-1); + if (random_rank == CmiMyRank()) { + random_rank++; + if (random_rank >= CmiMyNodeSize()) { + random_rank = 0; // wrap around if our random_selected node is the same as our own + // and we are the last PE on our node + } + } + +#if CMK_TRACE_ENABLED + char s[10]; + snprintf( s, sizeof(s), "%d", random_rank ); + traceUserSuppliedBracketedNote(s, TASKQ_QUEUE_STEAL_EVENTID, _start, CmiWallTimer()); +#endif + void* msg = TaskQueueSteal((TaskQueue*)CpvAccessOther(task_q, random_rank)); + if (msg != NULL) { + TaskQueuePush((TaskQueue*)CpvAccess(task_q), msg); + } +#if CMK_TRACE_ENABLED + traceUserSuppliedBracketedNote(s, TASKQ_STEAL_EVENTID, _start, CmiWallTimer()); +#endif +} + +// this function is passed into CcdCallOnConditionKeep +static void TaskStealBeginIdle() { + // can discuss whether we need to add the isHelper csv variable that is in old converse. + // not going to add it for now, because it's turned/left on by default in old converse + if (CmiMyNodeSize() > 1) { + StealTask(); + } +} + +// each pe will call this function because each pe has its own task queue +void CmiTaskQueueInit() { + // makes sure that the node has more than one PE, because we can only steal + // from other PE's that share the same node + if(CmiMyNodeSize() > 1) { + CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); + + CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); + } + +#if CMK_TRACE_ENABLED + traceRegisterUserEvent("taskq create work", TASKQ_CREATE_EVENTID); + traceRegisterUserEvent("taskq work", TASKQ_WORK_EVENTID); + traceRegisterUserEvent("taskq steal", TASKQ_STEAL_EVENTID); + traceRegisterUserEvent("taskq from queue steal", TASKQ_QUEUE_STEAL_EVENTID); +#endif + +} \ No newline at end of file diff --git a/src/conv-taskQ.h b/src/conv-taskQ.h new file mode 100644 index 0000000..8be6674 --- /dev/null +++ b/src/conv-taskQ.h @@ -0,0 +1,19 @@ +#ifndef _CK_TASKQ_H_ +#define _CK_TASKQ_H_ + +#include "src/converse_internal.h" +#include "include/converse.h" +#include "taskqueue.h" + +#if CMK_TRACE_ENABLED + #include "conv-trace.h" + #define TASKQ_CREATE_EVENTID 145 + #define TASKQ_WORK_EVENTID 147 + #define TASKQ_STEAL_EVENTID 149 + #define TASKQ_QUEUE_STEAL_EVENTID 151 +#endif + +void StealTask(); +void CmiTaskQueueInit(); + +#endif \ No newline at end of file diff --git a/src/convcore.cpp b/src/convcore.cpp index dfe7d1d..95df47c 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -1,6 +1,7 @@ //+pe threads, each running a scheduler #include "barrier.h" #include "converse_internal.h" +#include "conv-taskQ.h" #include "queue.h" #include "scheduler.h" @@ -77,8 +78,13 @@ void converseRunPe(int rank) { #endif Cmi_exitHandler = CmiRegisterHandler(CmiExitHandler); + + //initlialize the task queue for this PE + CpvInitialize(TaskQueue*, task_q); + CpvAccess(task_q) = TaskQueueCreate(); + + //initalize collective operations/arrays/handlers/etc collectiveInit(); - // Cmi_multicastHandler = CmiRegisterHandler(CmiMulticastHandler); // barrier to ensure all global structs are initialized CmiNodeBarrier(); @@ -89,6 +95,10 @@ void converseRunPe(int rank) { // call initial function and start scheduler Cmi_startfn(Cmi_argc, Cmi_argv); CsdScheduler(); + + // cleanup of threads? : destroy each threads task queue and reduction table struct(not the struct itself) + TaskQueueDestroy(CpvAccess(task_q)); + free(CpvAccess(_reduction_info)); } void CmiStartThreads() { @@ -186,7 +196,6 @@ void CmiInitState(int rank) { // allocate global entries ConverseQueue *queue = new ConverseQueue(); std::vector *handlerTable = new std::vector(); - Cmi_queues[Cmi_myrank] = queue; CmiHandlerTable[Cmi_myrank] = handlerTable; @@ -194,6 +203,8 @@ void CmiInitState(int rank) { CrnInit(); CcdModuleInit(); + + CmiTaskQueueInit(); } ConverseQueue *CmiGetQueue(int rank) { return Cmi_queues[rank]; } diff --git a/src/converse_internal.h b/src/converse_internal.h index a77e850..591e018 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -8,6 +8,7 @@ #include "converse.h" #include "converse_config.h" #include "queue.h" +#include "conv-taskQ.h" #include "comm_backend/comm_backend.h" #include "comm_backend/comm_backend_internal.h" @@ -116,6 +117,8 @@ CpvStaticDeclare(CmiReductionID, _reduction_counter); CpvStaticDeclare(CmiReduction **, _reduction_info); // an array of pointers to reduction structs +CpvStaticDeclare(TaskQueue*, task_q); + void CmiReductionsInit(void); // helper function to get the next reduction ID diff --git a/src/scheduler.cpp b/src/scheduler.cpp index c336657..412f03d 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -4,6 +4,7 @@ #include "queue.h" #include + /** * The main scheduler loop for the Charm++ runtime. */ @@ -14,6 +15,11 @@ void CsdScheduler() { // get node level queue ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); + //get task level queue + TaskQueue* taskQueue = (TaskQueue*)CpvAccess(task_q); + + void* msg = NULL; + while (CmiStopFlag() == 0) { CcdRaiseCondition(CcdSCHEDLOOP); @@ -22,7 +28,7 @@ void CsdScheduler() { if (!nodeQueue->empty()) { QueueResult result = nodeQueue->pop(); if (result) { - void *msg = result.msg; + msg = result.msg; // process event CmiHandleMessage(msg); @@ -32,12 +38,16 @@ void CsdScheduler() { CcdRaiseCondition(CcdPROCESSOR_END_IDLE); } } - } + } else if (taskQueue && (msg = taskQueuePop(taskQueue))) { //taskqueue pop handles all possible queue cases arleady so we only need to check if it exists or not + CmiHandleMessage(msg); + // idle stuff + + - // poll thread queue - else if (!queue->empty()) { + + } else (!queue->empty()) { // poll thread queue // get next event (guaranteed to be there because only single consumer) - void *msg = queue->pop(); + msg = queue->pop(); // process event CmiHandleMessage(msg); diff --git a/src/taskqueue.h b/src/taskqueue.h new file mode 100644 index 0000000..c917c50 --- /dev/null +++ b/src/taskqueue.h @@ -0,0 +1,103 @@ +/** + * this header file defines functinos/declarations/definitions for the task queue system that belongs to converse + * specifically the work-stealing queue of Cilk (THE protocol). + * For more details: visit this link from the readthedocs from the old converse documentation: https://charm.readthedocs.io/en/latest/charm++/manual.html#stealable-tasks-for-within-node-load-balancing + * ALSO we will not be supporitng windows support currently, maybe print out an error message if someone tries to compile on windows? + */ +#ifndef _CKTASKQUEUE_H +#define _CKTASKQUEUE_H + +#include +#include +#include + +#define TASKQUEUE_SIZE 1024 + +#if CMK_SMP + #define CmiMemoryWriteFence() __sync_synchronize() // This is a memory fence to ensure that writes are visible to other threads/cores +#else + #define CmiMemoryWriteFence() // No-op if not in SMP mode +#endif + + +typedef int taskq_idx; + +typedef struct TaskQueueStruct { + taskq_idx head; // This pointer indicates the first task in the queue + taskq_idx tail; // The tail indicates the array element next to the last available task in the queue. So, if head == tail, the queue is empty + void *data[TASKQUEUE_SIZE]; +} TaskQueue; + +// Function to create a new TaskQueue and initialize its members +inline static TaskQueue* TaskQueueCreate() { + TaskQueue* taskqueue = (TaskQueue*)malloc(sizeof(TaskQueue)); + taskqueue->head = 0; + taskqueue->tail = 0; + for (int i = 0; i < TASKQUEUE_SIZE; i++) { + taskqueue->data[i] = NULL; + } + return taskqueue; +} + +// Function to push a task onto the TaskQueue +inline static void TaskQueuePush(TaskQueue* queue, void* data) { + queue->data[queue->tail % TASKQUEUE_SIZE] = data; + CmiMemoryWriteFence(); //makes sure the data is fully written before updating the tail pointer + queue->tail++; + if (queue->tail >= TASKQUEUE_SIZE) { + fprintf(stderr, "TaskQueue overflow: possible corruption/overwrite possibility of data\n"); + } +} + +// Function to pop a task from the TaskQueue. Victims pop from the tail +inline static void* TaskQueuePop(TaskQueue* queue) { + queue->tail = queue->tail - 1; + CmiMemoryWriteFence(); + taskq_idx head = queue->head; + taskq_idx tail = queue->tail; + if (tail > head) { // there are more than two tasks in the queue, so it is safe to pop a task from the queue. + return queue->data[tail % TASKQUEUE_SIZE]; + } + + if (tail < head) { // The taskqueue is empty and the last task has been stolen by a thief. + queue->tail = head; // reset the tail pointer to the head pointer + return NULL; + } + + // head==tail case: there is only one task so thieves and victim can try to obtain this task simultaneously. + queue->tail = head + 1; + if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the last task has already stolen. + return NULL; + } + return queue->data[tail % TASKQUEUE_SIZE]; +} + +// Function to steal a task from another TaskQueue. Other PEs/Threads steal from the head +inline static void* TaskQueueSteal(TaskQueue* queue) { + taskq_idx head, tail; + while (1) { + head = queue->head; + tail = queue->tail; + if (head >= tail) { + // The queue is empty + // or the last element has been stolen by other thieves + // or popped by the victim. + return NULL; + } + + if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the task this thief is trying to steal is still in the queue and not stolen by the other thieves. + continue; + } + return queue->data[head % TASKQUEUE_SIZE]; + } +} + +// Function to destroy the TaskQueue and free its memory +inline static void TaskQueueDestroy(TaskQueue* queue) { + if (queue != NULL) { + free(queue); + } +} + + +#endif \ No newline at end of file From d830a5d6d4d86a06843bdae07d910689222f508d Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Wed, 30 Apr 2025 21:55:13 -0500 Subject: [PATCH 02/21] resolved linker errors --- src/CMakeLists.txt | 2 +- src/conv-taskQ.cpp | 5 +++++ src/conv-taskQ.h | 10 ++++++---- src/convcore.cpp | 4 ---- src/converse_internal.h | 2 -- src/scheduler.cpp | 16 ++++++++++------ 6 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b29897a..12c6b09 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,7 @@ target_include_directories(reconverse PRIVATE .) target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp queue.cpp random.cpp scheduler.cpp cpuaffinity.cpp collectives.cpp - comm_backend/comm_backend_internal.cpp threads.cpp cldb.none.cpp cldb.cpp) + comm_backend/comm_backend_internal.cpp threads.cpp cldb.none.cpp cldb.cpp conv-taskQ.cpp) target_include_directories( reconverse PRIVATE $ $) diff --git a/src/conv-taskQ.cpp b/src/conv-taskQ.cpp index b40bf1c..602c39d 100644 --- a/src/conv-taskQ.cpp +++ b/src/conv-taskQ.cpp @@ -43,6 +43,11 @@ static void TaskStealBeginIdle() { void CmiTaskQueueInit() { // makes sure that the node has more than one PE, because we can only steal // from other PE's that share the same node + + //initlialize the task queue for this PE + CpvInitialize(TaskQueue*, task_q); + CpvAccess(task_q) = TaskQueueCreate(); + if(CmiMyNodeSize() > 1) { CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); diff --git a/src/conv-taskQ.h b/src/conv-taskQ.h index 8be6674..fb80bc1 100644 --- a/src/conv-taskQ.h +++ b/src/conv-taskQ.h @@ -1,8 +1,8 @@ #ifndef _CK_TASKQ_H_ #define _CK_TASKQ_H_ -#include "src/converse_internal.h" -#include "include/converse.h" +#include "converse_internal.h" +#include "converse.h" #include "taskqueue.h" #if CMK_TRACE_ENABLED @@ -13,7 +13,9 @@ #define TASKQ_QUEUE_STEAL_EVENTID 151 #endif -void StealTask(); -void CmiTaskQueueInit(); +CpvStaticDeclare(TaskQueue*, task_q); + +void StealTask(void); +void CmiTaskQueueInit(void); #endif \ No newline at end of file diff --git a/src/convcore.cpp b/src/convcore.cpp index 95df47c..01ad7b6 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -79,10 +79,6 @@ void converseRunPe(int rank) { Cmi_exitHandler = CmiRegisterHandler(CmiExitHandler); - //initlialize the task queue for this PE - CpvInitialize(TaskQueue*, task_q); - CpvAccess(task_q) = TaskQueueCreate(); - //initalize collective operations/arrays/handlers/etc collectiveInit(); diff --git a/src/converse_internal.h b/src/converse_internal.h index 591e018..58c45ef 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -117,8 +117,6 @@ CpvStaticDeclare(CmiReductionID, _reduction_counter); CpvStaticDeclare(CmiReduction **, _reduction_info); // an array of pointers to reduction structs -CpvStaticDeclare(TaskQueue*, task_q); - void CmiReductionsInit(void); // helper function to get the next reduction ID diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 412f03d..c1ddd99 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -38,14 +38,19 @@ void CsdScheduler() { CcdRaiseCondition(CcdPROCESSOR_END_IDLE); } } - } else if (taskQueue && (msg = taskQueuePop(taskQueue))) { //taskqueue pop handles all possible queue cases arleady so we only need to check if it exists or not + } else if (taskQueue && (msg = TaskQueuePop(taskQueue))) { //taskqueue pop handles all possible queue cases arleady so we only need to check if it exists or not + //process event CmiHandleMessage(msg); - // idle stuff - + + // release idle if necessary + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } + //do we want idle check to be here? - - } else (!queue->empty()) { // poll thread queue + } else if (!queue->empty()) { // poll thread queue // get next event (guaranteed to be there because only single consumer) msg = queue->pop(); @@ -58,7 +63,6 @@ void CsdScheduler() { CcdRaiseCondition(CcdPROCESSOR_END_IDLE); } } - // the processor is idle else { // if not already idle, set idle and raise condition From 9214c34d85a09f699b9189945481b23f7ee35c69 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Sun, 4 May 2025 18:25:35 -0500 Subject: [PATCH 03/21] consildate everything into just converse_itnernal and convcore --- src/CMakeLists.txt | 2 +- src/conv-taskQ.cpp | 64 ---------------------- src/conv-taskQ.h | 21 ------- src/convcore.cpp | 118 +++++++++++++++++++++++++++++++++++++++- src/converse_internal.h | 31 ++++++++++- src/scheduler.cpp | 8 ++- src/taskqueue.h | 103 ----------------------------------- temp.cpp | 48 ++++++++++++++++ 8 files changed, 201 insertions(+), 194 deletions(-) delete mode 100644 src/conv-taskQ.cpp delete mode 100644 src/conv-taskQ.h delete mode 100644 src/taskqueue.h create mode 100644 temp.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c6e0219..32bfad1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,7 @@ target_include_directories(reconverse PRIVATE .) target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp random.cpp scheduler.cpp cpuaffinity.cpp collectives.cpp - comm_backend/comm_backend_internal.cpp threads.cpp cldb.none.cpp cldb.cpp conv-taskQ.cpp) + comm_backend/comm_backend_internal.cpp threads.cpp cldb.none.cpp cldb.cpp) target_include_directories( reconverse PRIVATE $ $) diff --git a/src/conv-taskQ.cpp b/src/conv-taskQ.cpp deleted file mode 100644 index 602c39d..0000000 --- a/src/conv-taskQ.cpp +++ /dev/null @@ -1,64 +0,0 @@ -#include "conv-taskQ.h" - -void StealTask() { -// start up timer if trace is enabled -#if CMK_TRACE_ENABLED - double _start = CmiWallTimer(); -#endif - - //steal from a random PE on the same node - int random_rank = CrnRand() % (CmiMyNodeSize()-1); - if (random_rank == CmiMyRank()) { - random_rank++; - if (random_rank >= CmiMyNodeSize()) { - random_rank = 0; // wrap around if our random_selected node is the same as our own - // and we are the last PE on our node - } - } - -#if CMK_TRACE_ENABLED - char s[10]; - snprintf( s, sizeof(s), "%d", random_rank ); - traceUserSuppliedBracketedNote(s, TASKQ_QUEUE_STEAL_EVENTID, _start, CmiWallTimer()); -#endif - void* msg = TaskQueueSteal((TaskQueue*)CpvAccessOther(task_q, random_rank)); - if (msg != NULL) { - TaskQueuePush((TaskQueue*)CpvAccess(task_q), msg); - } -#if CMK_TRACE_ENABLED - traceUserSuppliedBracketedNote(s, TASKQ_STEAL_EVENTID, _start, CmiWallTimer()); -#endif -} - -// this function is passed into CcdCallOnConditionKeep -static void TaskStealBeginIdle() { - // can discuss whether we need to add the isHelper csv variable that is in old converse. - // not going to add it for now, because it's turned/left on by default in old converse - if (CmiMyNodeSize() > 1) { - StealTask(); - } -} - -// each pe will call this function because each pe has its own task queue -void CmiTaskQueueInit() { - // makes sure that the node has more than one PE, because we can only steal - // from other PE's that share the same node - - //initlialize the task queue for this PE - CpvInitialize(TaskQueue*, task_q); - CpvAccess(task_q) = TaskQueueCreate(); - - if(CmiMyNodeSize() > 1) { - CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); - - CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); - } - -#if CMK_TRACE_ENABLED - traceRegisterUserEvent("taskq create work", TASKQ_CREATE_EVENTID); - traceRegisterUserEvent("taskq work", TASKQ_WORK_EVENTID); - traceRegisterUserEvent("taskq steal", TASKQ_STEAL_EVENTID); - traceRegisterUserEvent("taskq from queue steal", TASKQ_QUEUE_STEAL_EVENTID); -#endif - -} \ No newline at end of file diff --git a/src/conv-taskQ.h b/src/conv-taskQ.h deleted file mode 100644 index fb80bc1..0000000 --- a/src/conv-taskQ.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef _CK_TASKQ_H_ -#define _CK_TASKQ_H_ - -#include "converse_internal.h" -#include "converse.h" -#include "taskqueue.h" - -#if CMK_TRACE_ENABLED - #include "conv-trace.h" - #define TASKQ_CREATE_EVENTID 145 - #define TASKQ_WORK_EVENTID 147 - #define TASKQ_STEAL_EVENTID 149 - #define TASKQ_QUEUE_STEAL_EVENTID 151 -#endif - -CpvStaticDeclare(TaskQueue*, task_q); - -void StealTask(void); -void CmiTaskQueueInit(void); - -#endif \ No newline at end of file diff --git a/src/convcore.cpp b/src/convcore.cpp index 4976c9f..cd35328 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -1,7 +1,6 @@ //+pe threads, each running a scheduler #include "barrier.h" #include "converse_internal.h" -#include "conv-taskQ.h" #include "queue.h" #include "scheduler.h" @@ -207,6 +206,7 @@ void CmiInitState(int rank) { CcdModuleInit(); CmiTaskQueueInit(); + printf("task queue created at pointer: %p\n", (void*)CpvAccess(task_q)); } ConverseQueue *CmiGetQueue(int rank) { return Cmi_queues[rank]; } @@ -887,3 +887,119 @@ void CmiLock(CmiNodeLock lock) { pthread_mutex_lock(lock); } void CmiUnlock(CmiNodeLock lock) { pthread_mutex_unlock(lock); } int CmiTryLock(CmiNodeLock lock) { return pthread_mutex_trylock(lock); } + + +//Task Queue Functions/Definitions +// Function to create a new TaskQueue and initialize its members +TaskQueue* TaskQueueCreate() { + TaskQueue* taskqueue = (TaskQueue*)malloc(sizeof(TaskQueue)); + taskqueue->head = 0; + taskqueue->tail = 0; + for (int i = 0; i < TASKQUEUE_SIZE; i++) { + taskqueue->data[i] = NULL; + } + return taskqueue; +} + +// Function to push a task onto the TaskQueue +void TaskQueuePush(TaskQueue* queue, void* data) { + queue->data[queue->tail % TASKQUEUE_SIZE] = data; + CmiMemoryWriteFence(); //makes sure the data is fully written before updating the tail pointer + queue->tail++; + if (queue->tail >= TASKQUEUE_SIZE) { + fprintf(stderr, "TaskQueue overflow: possible corruption/overwrite possibility of data\n"); + } +} + +// Function to pop a task from the TaskQueue. Victims pop from the tail +void* TaskQueuePop(TaskQueue* queue) { + queue->tail = queue->tail - 1; + CmiMemoryWriteFence(); + taskq_idx head = queue->head; + taskq_idx tail = queue->tail; + if (tail > head) { // there are more than two tasks in the queue, so it is safe to pop a task from the queue. + return queue->data[tail % TASKQUEUE_SIZE]; + } + + if (tail < head) { // The taskqueue is empty and the last task has been stolen by a thief. + queue->tail = head; // reset the tail pointer to the head pointer + return NULL; + } + + // head==tail case: there is only one task so thieves and victim can try to obtain this task simultaneously. + queue->tail = head + 1; + if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the last task has already stolen. + return NULL; + } + return queue->data[tail % TASKQUEUE_SIZE]; +} + +// Function to steal a task from another TaskQueue. Other PEs/Threads steal from the head +void* TaskQueueSteal(TaskQueue* queue) { + taskq_idx head, tail; + while (1) { + head = queue->head; + tail = queue->tail; + if (head >= tail) { + // The queue is empty + // or the last element has been stolen by other thieves + // or popped by the victim. + return NULL; + } + + if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the task this thief is trying to steal is still in the queue and not stolen by the other thieves. + continue; + } + return queue->data[head % TASKQUEUE_SIZE]; + } +} + +// Function to destroy the TaskQueue and free its memory +void TaskQueueDestroy(TaskQueue* queue) { + if (queue != NULL) { + free(queue); + } +} + +void StealTask() { + // start up timer if trace is enabled + //steal from a random PE on the same node + int random_rank = CrnRand() % (CmiMyNodeSize()-1); + if (random_rank == CmiMyRank()) { + random_rank++; + if (random_rank >= CmiMyNodeSize()) { + random_rank = 0; // wrap around if our random_selected node is the same as our own + // and we are the last PE on our node + } + } + + void* msg = TaskQueueSteal((TaskQueue*)CpvAccessOther(task_q, random_rank)); + if (msg != NULL) { + TaskQueuePush((TaskQueue*)CpvAccess(task_q), msg); + } +} + + // this function is passed into CcdCallOnConditionKeep + static void TaskStealBeginIdle() { + // can discuss whether we need to add the isHelper csv variable that is in old converse. + // not going to add it for now, because it's turned/left on by default in old converse + if (CmiMyNodeSize() > 1) { + StealTask(); + } + } + + // each pe will call this function because each pe has its own task queue + void CmiTaskQueueInit() { + // makes sure that the node has more than one PE, because we can only steal + // from other PE's that share the same node + + //initlialize the task queue for this PE + CpvInitialize(TaskQueue*, task_q); + CpvAccess(task_q) = TaskQueueCreate(); + + if(CmiMyNodeSize() > 1) { + CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); + + CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); + } + } \ No newline at end of file diff --git a/src/converse_internal.h b/src/converse_internal.h index 58c45ef..95e0c9c 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -8,7 +8,6 @@ #include "converse.h" #include "converse_config.h" #include "queue.h" -#include "conv-taskQ.h" #include "comm_backend/comm_backend.h" #include "comm_backend/comm_backend_internal.h" @@ -140,4 +139,34 @@ void CmiSetRedID(void *msg, CmiReductionID redID); void CmiSetBcastSource(void *msg, CmiBroadcastSource source); CmiBroadcastSource CmiGetBcastSource(void *msg); + +// TASK QUEUE RELATED FUNCTIONS/DEFINITIONS +#define TASKQUEUE_SIZE 1024 +#if CMK_SMP + #define CmiMemoryWriteFence() __sync_synchronize() // This is a memory fence to ensure that writes are visible to other threads/cores +#else + #define CmiMemoryWriteFence() // No-op if not in SMP mode +#endif + +typedef int taskq_idx; + +typedef struct TaskQueueStruct { + taskq_idx head; // This pointer indicates the first task in the queue + taskq_idx tail; // The tail indicates the array element next to the last available task in the queue. So, if head == tail, the queue is empty + void *data[TASKQUEUE_SIZE]; +} TaskQueue; + +CpvStaticDeclare(TaskQueue*, task_q); + +TaskQueue* TaskQueueCreate(); +void TaskQueuePush(TaskQueue* queue, void* data); +void* TaskQueuePop(TaskQueue* queue); +void* TaskQueueSteal(TaskQueue* queue); +void TaskQueueDestroy(TaskQueue* queue); + +void StealTask(void); +void CmiTaskQueueInit(void); + + + #endif diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 75991dd..979a3c4 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -39,6 +39,7 @@ void CsdScheduler() { } } } else if (taskQueue && (msg = TaskQueuePop(taskQueue))) { //taskqueue pop handles all possible queue cases arleady so we only need to check if it exists or not + assert(msg != NULL); //process event CmiHandleMessage(msg); @@ -48,8 +49,6 @@ void CsdScheduler() { CcdRaiseCondition(CcdPROCESSOR_END_IDLE); } - //do we want idle check to be here? - } else if (!queue->empty()) { // poll thread queue // get next event (guaranteed to be there because only single consumer) msg = queue->pop().value(); @@ -71,6 +70,8 @@ void CsdScheduler() { CmiSetIdleTime(CmiWallTimer()); CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); } + // at this point the condition should be raised and the pe should be called. + // if already idle, call still idle and (maybe) long idle else { CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); @@ -78,6 +79,8 @@ void CsdScheduler() { CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); } } + // at this point the condition should be raised and the pe should be called. + // poll the communication layer comm_backend::progress(); } @@ -87,5 +90,4 @@ void CsdScheduler() { // TODO: suspend? or spin? } } - // TODO: implement CsdEnqueue/Dequeue (why are these necessary?) diff --git a/src/taskqueue.h b/src/taskqueue.h deleted file mode 100644 index c917c50..0000000 --- a/src/taskqueue.h +++ /dev/null @@ -1,103 +0,0 @@ -/** - * this header file defines functinos/declarations/definitions for the task queue system that belongs to converse - * specifically the work-stealing queue of Cilk (THE protocol). - * For more details: visit this link from the readthedocs from the old converse documentation: https://charm.readthedocs.io/en/latest/charm++/manual.html#stealable-tasks-for-within-node-load-balancing - * ALSO we will not be supporitng windows support currently, maybe print out an error message if someone tries to compile on windows? - */ -#ifndef _CKTASKQUEUE_H -#define _CKTASKQUEUE_H - -#include -#include -#include - -#define TASKQUEUE_SIZE 1024 - -#if CMK_SMP - #define CmiMemoryWriteFence() __sync_synchronize() // This is a memory fence to ensure that writes are visible to other threads/cores -#else - #define CmiMemoryWriteFence() // No-op if not in SMP mode -#endif - - -typedef int taskq_idx; - -typedef struct TaskQueueStruct { - taskq_idx head; // This pointer indicates the first task in the queue - taskq_idx tail; // The tail indicates the array element next to the last available task in the queue. So, if head == tail, the queue is empty - void *data[TASKQUEUE_SIZE]; -} TaskQueue; - -// Function to create a new TaskQueue and initialize its members -inline static TaskQueue* TaskQueueCreate() { - TaskQueue* taskqueue = (TaskQueue*)malloc(sizeof(TaskQueue)); - taskqueue->head = 0; - taskqueue->tail = 0; - for (int i = 0; i < TASKQUEUE_SIZE; i++) { - taskqueue->data[i] = NULL; - } - return taskqueue; -} - -// Function to push a task onto the TaskQueue -inline static void TaskQueuePush(TaskQueue* queue, void* data) { - queue->data[queue->tail % TASKQUEUE_SIZE] = data; - CmiMemoryWriteFence(); //makes sure the data is fully written before updating the tail pointer - queue->tail++; - if (queue->tail >= TASKQUEUE_SIZE) { - fprintf(stderr, "TaskQueue overflow: possible corruption/overwrite possibility of data\n"); - } -} - -// Function to pop a task from the TaskQueue. Victims pop from the tail -inline static void* TaskQueuePop(TaskQueue* queue) { - queue->tail = queue->tail - 1; - CmiMemoryWriteFence(); - taskq_idx head = queue->head; - taskq_idx tail = queue->tail; - if (tail > head) { // there are more than two tasks in the queue, so it is safe to pop a task from the queue. - return queue->data[tail % TASKQUEUE_SIZE]; - } - - if (tail < head) { // The taskqueue is empty and the last task has been stolen by a thief. - queue->tail = head; // reset the tail pointer to the head pointer - return NULL; - } - - // head==tail case: there is only one task so thieves and victim can try to obtain this task simultaneously. - queue->tail = head + 1; - if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the last task has already stolen. - return NULL; - } - return queue->data[tail % TASKQUEUE_SIZE]; -} - -// Function to steal a task from another TaskQueue. Other PEs/Threads steal from the head -inline static void* TaskQueueSteal(TaskQueue* queue) { - taskq_idx head, tail; - while (1) { - head = queue->head; - tail = queue->tail; - if (head >= tail) { - // The queue is empty - // or the last element has been stolen by other thieves - // or popped by the victim. - return NULL; - } - - if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the task this thief is trying to steal is still in the queue and not stolen by the other thieves. - continue; - } - return queue->data[head % TASKQUEUE_SIZE]; - } -} - -// Function to destroy the TaskQueue and free its memory -inline static void TaskQueueDestroy(TaskQueue* queue) { - if (queue != NULL) { - free(queue); - } -} - - -#endif \ No newline at end of file diff --git a/temp.cpp b/temp.cpp new file mode 100644 index 0000000..92c7ba4 --- /dev/null +++ b/temp.cpp @@ -0,0 +1,48 @@ +#include "converse.h" +#include + +// number of tasks to fire +static const int N = 10000; + +// thread‐local counter of completed tasks +thread_local static std::atomic doneCount; + +// the handler called when a task runs +void TaskHandler(void *msg) { + // free our little “message” + free(msg); + + // bump and check if we’re done + if (doneCount.fetch_add(1, std::memory_order_relaxed) + 1 == N) { + // shut down scheduler + CsdExitScheduler(); + } +} + +// module‐init (called once, before scheduler starts) +void ModuleInit(char **argv) { + // register our handler + int h = CmiRegisterHandler(TaskHandler); + + // initialise counter + doneCount = 0; + + // enqueue N tasks + for (int i = 0; i < N; ++i) { + // allocate a “Converse message” header only + void *m = malloc(CmiMsgHeaderSizeBytes); + // tag it with our handler + CmiSetHandler(m, h); + // enqueue as a task + CsdTaskEnqueue(m); + } +} + +int main(int argc, char **argv) { + // register module init + CmiRegisterModuleInit(ModuleInit); + + // now start up the PEs/threads and scheduler + CmiStartThreads(); + return 0; +} \ No newline at end of file From 4439108a3ac3c81278ca158ab381094cf0b28a3d Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Mon, 5 May 2025 12:36:09 -0500 Subject: [PATCH 04/21] print statements --- src/convcore.cpp | 3 +++ src/scheduler.cpp | 1 + 2 files changed, 4 insertions(+) diff --git a/src/convcore.cpp b/src/convcore.cpp index cd35328..4af5f68 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -75,6 +75,7 @@ void converseRunPe(int rank) { #ifdef SET_CPU_AFFINITY CmiSetCPUAffinity(rank); #endif + printf("%d: taskqueue pointer: %p\n", __LINE__, CpvAccess(task_q)); Cmi_exitHandler = CmiRegisterHandler(CmiExitHandler); @@ -89,6 +90,8 @@ void converseRunPe(int rank) { // call initial function and start scheduler Cmi_startfn(Cmi_argc, Cmi_argv); + + printf("%d: taskqueue pointer: %p\n", __LINE__, CpvAccess(task_q)); CsdScheduler(); // cleanup of threads? : destroy each threads task queue and reduction table struct(not the struct itself) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 979a3c4..f42bb3b 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -10,6 +10,7 @@ */ void CsdScheduler() { // get pthread level queue + printf("scheduler %d: taskq pointer %p", __LINE__, CpvAccess(task_q)); ConverseQueue *queue = CmiGetQueue(CmiMyRank()); // get node level queue From 0fd0f7bd896a674734dbbaa8ab539c5f58773701 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Mon, 5 May 2025 14:13:31 -0500 Subject: [PATCH 05/21] working with other examples now --- src/convcore.cpp | 64 +++++++++++++++++++++++++---------------- src/converse_internal.h | 13 ++++----- src/scheduler.cpp | 5 ++-- 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/src/convcore.cpp b/src/convcore.cpp index a6a4789..2b851b0 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -34,7 +34,8 @@ int CharmLibInterOperate; void CldModuleInit(char **); // PE LOCALS that need global access sometimes -static ConverseQueue **Cmi_queues; // array of queue pointers +ConverseQueue **Cmi_queues = nullptr; // array of queue pointers +TaskQueue** Cmi_taskqueues = nullptr; // PE LOCALS thread_local int Cmi_myrank; @@ -76,7 +77,6 @@ void converseRunPe(int rank) { #ifdef SET_CPU_AFFINITY CmiSetCPUAffinity(rank); #endif - printf("%d: taskqueue pointer: %p\n", __LINE__, CpvAccess(task_q)); Cmi_exitHandler = CmiRegisterHandler(CmiExitHandler); @@ -92,12 +92,12 @@ void converseRunPe(int rank) { // call initial function and start scheduler Cmi_startfn(Cmi_argc, Cmi_argv); - printf("%d: taskqueue pointer: %p\n", __LINE__, CpvAccess(task_q)); + //printf("%d: taskqueue pointer: %p\n", __LINE__, CsvAccess(task_q)[Cmi_myrank]); CsdScheduler(); // cleanup of threads? : destroy each threads task queue and reduction table struct(not the struct itself) - TaskQueueDestroy(CpvAccess(task_q)); - free(CpvAccess(_reduction_info)); + TaskQueueDestroy(Cmi_taskqueues[Cmi_myrank]); + //free(CpvAccess(_reduction_info)); } void CmiStartThreads() { @@ -106,6 +106,8 @@ void CmiStartThreads() { CmiHandlerTable = new std::vector *[Cmi_mynodesize]; CmiNodeQueue = new ConverseNodeQueue(); + Cmi_taskqueues = new TaskQueue*[Cmi_mynodesize]; + //CmiTaskQueueInit(); // make sure the queues are allocated before PEs start sending messages around comm_backend::barrier(); @@ -124,9 +126,12 @@ void CmiStartThreads() { delete[] Cmi_queues; delete CmiNodeQueue; delete[] CmiHandlerTable; + delete [] Cmi_taskqueues; + Cmi_queues = nullptr; CmiNodeQueue = nullptr; CmiHandlerTable = nullptr; + Cmi_taskqueues = nullptr; } // argument form: ./prog +pe @@ -205,13 +210,18 @@ void CmiInitState(int rank) { Cmi_queues[Cmi_myrank] = queue; CmiHandlerTable[Cmi_myrank] = handlerTable; + //task queue stuff + Cmi_taskqueues[Cmi_myrank] = TaskQueueCreate(); + //printf("task queue created at pointer: %p\n", Cmi_taskqueues[Cmi_myrank]); + if(CmiMyNodeSize() > 1) { + CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); + CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); + } + // random CrnInit(); CcdModuleInit(); - - CmiTaskQueueInit(); - printf("task queue created at pointer: %p\n", (void*)CpvAccess(task_q)); } ConverseQueue *CmiGetQueue(int rank) { return Cmi_queues[rank]; } @@ -980,14 +990,14 @@ void StealTask() { } } - void* msg = TaskQueueSteal((TaskQueue*)CpvAccessOther(task_q, random_rank)); + void* msg = TaskQueueSteal((TaskQueue*)(Cmi_taskqueues[random_rank])); if (msg != NULL) { - TaskQueuePush((TaskQueue*)CpvAccess(task_q), msg); + TaskQueuePush((TaskQueue*)(Cmi_taskqueues[Cmi_myrank]), msg); } } // this function is passed into CcdCallOnConditionKeep - static void TaskStealBeginIdle() { + void TaskStealBeginIdle() { // can discuss whether we need to add the isHelper csv variable that is in old converse. // not going to add it for now, because it's turned/left on by default in old converse if (CmiMyNodeSize() > 1) { @@ -995,18 +1005,24 @@ void StealTask() { } } - // each pe will call this function because each pe has its own task queue - void CmiTaskQueueInit() { - // makes sure that the node has more than one PE, because we can only steal - // from other PE's that share the same node + // // each pe will call this function because each pe has its own task queue + // void CmiTaskQueueInit() { + // // makes sure that the node has more than one PE, because we can only steal + // // from other PE's that share the same node - //initlialize the task queue for this PE - CpvInitialize(TaskQueue*, task_q); - CpvAccess(task_q) = TaskQueueCreate(); - - if(CmiMyNodeSize() > 1) { - CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); + // //initlialize the task queue for this PE + // CsvInitialize(TaskQueue**, task_q); + // auto taskqueues = (TaskQueue** )malloc(Cmi_mynodesize * sizeof(TaskQueue* )); + // for (int i = 0; i < Cmi_mynodesize; ++i) + // taskqueues[i] = TaskQueueCreate(); - CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); - } - } \ No newline at end of file + // CsvAccess(task_q) = taskqueues; + // for (int i = 0; i < Cmi_mynodesize; i++) { + // printf("task queue created at pointer: %p\n", (void*)CsvAccess(task_q)[i]); + // } + + // if(CmiMyNodeSize() > 1) { + // CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); + // CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); + // } + // } \ No newline at end of file diff --git a/src/converse_internal.h b/src/converse_internal.h index ddc37e1..57ab183 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -172,11 +172,7 @@ CmiBroadcastSource CmiGetBcastSource(void *msg); // TASK QUEUE RELATED FUNCTIONS/DEFINITIONS #define TASKQUEUE_SIZE 1024 -#if CMK_SMP - #define CmiMemoryWriteFence() __sync_synchronize() // This is a memory fence to ensure that writes are visible to other threads/cores -#else - #define CmiMemoryWriteFence() // No-op if not in SMP mode -#endif +#define CmiMemoryWriteFence() // No-op if not in SMP mode typedef int taskq_idx; @@ -186,17 +182,20 @@ typedef struct TaskQueueStruct { void *data[TASKQUEUE_SIZE]; } TaskQueue; -CpvStaticDeclare(TaskQueue*, task_q); +//CsvStaticDeclare(TaskQueue**, task_q); TaskQueue* TaskQueueCreate(); void TaskQueuePush(TaskQueue* queue, void* data); void* TaskQueuePop(TaskQueue* queue); void* TaskQueueSteal(TaskQueue* queue); void TaskQueueDestroy(TaskQueue* queue); +void TaskStealBeginIdle(void); void StealTask(void); -void CmiTaskQueueInit(void); +// void CmiTaskQueueInit(void); +extern ConverseQueue **Cmi_queues; // array of queue pointers +extern TaskQueue** Cmi_taskqueues; #endif diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 1852299..24e6a47 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -9,15 +9,16 @@ * The main scheduler loop for the Charm++ runtime. */ void CsdScheduler() { + //printf("my rank: %d\n", CmiMyRank()); // get pthread level queue - printf("scheduler %d: taskq pointer %p", __LINE__, CpvAccess(task_q)); ConverseQueue *queue = CmiGetQueue(CmiMyRank()); // get node level queue ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); //get task level queue - TaskQueue* taskQueue = (TaskQueue*)CpvAccess(task_q); + TaskQueue* taskQueue = (TaskQueue*)(Cmi_taskqueues[CmiMyRank()]); + //printf("scheduler %d: taskq pointer %p", __LINE__, taskQueue); void* msg = NULL; From 9c9f4c07c687bf92c4cfa9a11ae8b1181ef3a3c0 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Mon, 5 May 2025 18:50:18 -0500 Subject: [PATCH 06/21] general layout --- examples/CMakeLists.txt | 1 + examples/taskqueue/CMakeLists.txt | 2 ++ examples/taskqueue/Makefile | 23 ++++++++++++ examples/taskqueue/taskqueue.cpp | 60 +++++++++++++++++++++++++++++++ 4 files changed, 86 insertions(+) create mode 100644 examples/taskqueue/CMakeLists.txt create mode 100644 examples/taskqueue/Makefile create mode 100644 examples/taskqueue/taskqueue.cpp diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 82eb011..4a817f1 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -15,3 +15,4 @@ add_subdirectory(reduction_node) add_subdirectory(cth) add_subdirectory(self_send) add_subdirectory(ctv) +add_subdirectory(taskqueue) diff --git a/examples/taskqueue/CMakeLists.txt b/examples/taskqueue/CMakeLists.txt new file mode 100644 index 0000000..db59981 --- /dev/null +++ b/examples/taskqueue/CMakeLists.txt @@ -0,0 +1,2 @@ +add_reconverse_executable(taskqueue taskqueue.cpp) +add_test(NAME task_queue COMMAND task_queue +pe 7) \ No newline at end of file diff --git a/examples/taskqueue/Makefile b/examples/taskqueue/Makefile new file mode 100644 index 0000000..0fbfcf2 --- /dev/null +++ b/examples/taskqueue/Makefile @@ -0,0 +1,23 @@ +# Compiler and flags +CXX := g++ +CXXFLAGS := -std=c++11 -pthread -I ../.. + +# Source files +SRCS := taskqueue.cpp ../../convcore.cpp ../../scheduler.cpp ../../queue.cpp ../../conv-conds.cpp +HDRS := ../../converse_internal.h ../../scheduler.h ../../queue.h ../../barrier.h + +# Output executable +TARGET := test + +# Default target +all: $(TARGET) + +# Link the object files to create the executable +$(TARGET): $(SRCS) + $(CXX) -o $@ $^ $(CXXFLAGS) + +# Clean up build files +clean: + rm -f $(TARGET) + +.PHONY: all clean diff --git a/examples/taskqueue/taskqueue.cpp b/examples/taskqueue/taskqueue.cpp new file mode 100644 index 0000000..bed706e --- /dev/null +++ b/examples/taskqueue/taskqueue.cpp @@ -0,0 +1,60 @@ +#include "converse.h" +#include +#include + +CpvDeclare(int, test); + +int ping_handlerID; +int payloadSize = 1 * sizeof(int); + +struct Message { + CmiMessageHeader header; + int data[1]; +}; + +void ping_handler(void *vmsg) { + Message *msg = (Message *)vmsg; + printf("PE %d pinged in ring with index %d.\n", CmiMyRank(), msg->data[0]); + + // test assert statements are working + CmiAssert(CmiMyRank() == msg->header.destPE); + + if (CmiMyRank() != CmiMyNodeSize() - 1) { + Message *newmsg = new Message; + newmsg->header.handlerId = ping_handlerID; + newmsg->header.messageSize = sizeof(Message); + + newmsg->data[0] = msg->data[0] + 1; + + CmiSyncSendAndFree(CmiMyRank() + 1, newmsg->header.messageSize, newmsg); + } else { + CmiExit(0); + } +} + +CmiStartFn mymain(int argc, char **argv) { + CpvInitialize(int, test); + CpvAccess(test) = 42; + + ping_handlerID = CmiRegisterHandler(ping_handler); + + if (CmiMyRank() == 0) { + // create a message + Message *msg = new Message; + msg->header.handlerId = ping_handlerID; + msg->header.messageSize = sizeof(Message); + msg->data[0] = 0; + + int sendToPE = 0; + + // Send from my pe-i on node-0 to q+i on node-1 + CmiSyncSendAndFree(sendToPE, msg->header.messageSize, msg); + } + + return 0; +} + +int main(int argc, char **argv) { + ConverseInit(argc, argv, (CmiStartFn)mymain); + return 0; +} From 30a5714c1c1ce7558c7d86cb30129ef9df556555 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Tue, 6 May 2025 16:08:47 -0500 Subject: [PATCH 07/21] working example but not if you define k really big --- examples/taskqueue/taskqueue.cpp | 84 +++++++++++++++++++------------- include/converse.h | 2 + src/convcore.cpp | 39 ++++++++++++--- src/converse_internal.h | 2 +- src/scheduler.cpp | 1 + 5 files changed, 86 insertions(+), 42 deletions(-) diff --git a/examples/taskqueue/taskqueue.cpp b/examples/taskqueue/taskqueue.cpp index bed706e..df316a8 100644 --- a/examples/taskqueue/taskqueue.cpp +++ b/examples/taskqueue/taskqueue.cpp @@ -2,56 +2,70 @@ #include #include -CpvDeclare(int, test); +#define K 1600 +#define X 10000 -int ping_handlerID; -int payloadSize = 1 * sizeof(int); +CsvDeclare(int, globalCounter); +CpvDeclare(int, tasksExecuted); + +int handlerID; +int print_handlerID; struct Message { CmiMessageHeader header; int data[1]; }; -void ping_handler(void *vmsg) { - Message *msg = (Message *)vmsg; - printf("PE %d pinged in ring with index %d.\n", CmiMyRank(), msg->data[0]); - - // test assert statements are working - CmiAssert(CmiMyRank() == msg->header.destPE); - - if (CmiMyRank() != CmiMyNodeSize() - 1) { - Message *newmsg = new Message; - newmsg->header.handlerId = ping_handlerID; - newmsg->header.messageSize = sizeof(Message); +void print_results_handler_func(void* vmsg) { + printf("PE %d executed %d tasks.\n", CmiMyRank(), CpvAccess(tasksExecuted)); + CmiNodeBarrier(); + if (CmiMyRank() == 0) { + CmiExit(0); + } +} - newmsg->data[0] = msg->data[0] + 1; +void handler_func(void *vmsg) { + Message* incoming_msg = (Message*)vmsg; + //printf("PE %d pinged this function with data index: %d.\n", CmiMyRank(), incoming_msg->data[0]); + + // do dummy work + for (int i = 0; i < X; i++) { + CmiWallTimer(); + } - CmiSyncSendAndFree(CmiMyRank() + 1, newmsg->header.messageSize, newmsg); - } else { - CmiExit(0); + CpvAccess(tasksExecuted)++; + CsvAccess(globalCounter) = CsvAccess(globalCounter) - 1; + if (CsvAccess(globalCounter) == 0) { + Message* msg = new Message; + msg->header.handlerId = print_handlerID; + msg->header.messageSize = sizeof(Message); + printf("All tasks have been executed.\n"); + CmiSyncBroadcastAllAndFree(sizeof(Message), msg); } } CmiStartFn mymain(int argc, char **argv) { - CpvInitialize(int, test); - CpvAccess(test) = 42; + CpvInitialize(int, tasksExecuted); + CpvAccess(tasksExecuted) = 0; - ping_handlerID = CmiRegisterHandler(ping_handler); + handlerID = CmiRegisterHandler(handler_func); + print_handlerID = CmiRegisterHandler(print_results_handler_func); - if (CmiMyRank() == 0) { - // create a message - Message *msg = new Message; - msg->header.handlerId = ping_handlerID; - msg->header.messageSize = sizeof(Message); - msg->data[0] = 0; - - int sendToPE = 0; - - // Send from my pe-i on node-0 to q+i on node-1 - CmiSyncSendAndFree(sendToPE, msg->header.messageSize, msg); - } - - return 0; + if (CmiMyPe() == 0) { + CsvInitialize(int, globalCounter); + CsvAccess(globalCounter) = K; + for (int i = 0; i < K; i++) { + Message* newmsg = new Message; + newmsg->data[0] = i; + newmsg->header.messageSize = sizeof(Message); + newmsg->header.handlerId = handlerID; + //fprintf(stderr, "before: PE %d: sending message with data: %p\n", CmiMyRank(), newmsg); + CmiTaskQueueSyncSend(CmiMyRank(), sizeof(Message), newmsg); + //CmiSyncSendAndFree(CmiMyPe(), sizeof(Message), newmsg); + //fprintf(stderr, "after: PE %d: sending message with data: %p\n", CmiMyRank(), newmsg); + } + } + return 0; } int main(int argc, char **argv) { diff --git a/include/converse.h b/include/converse.h index 6de8492..d6513fb 100644 --- a/include/converse.h +++ b/include/converse.h @@ -306,6 +306,8 @@ void CmiFreeSendFn(int destPE, int messageSize, char *msg); void CmiSyncListSendFn(int npes, const int *pes, int len, char *msg); void CmiFreeListSendFn(int npes, const int *pes, int len, char *msg); +void CmiTaskQueueSyncSend(int destPE, int messageSize, void *msg); + // broadcasts void CmiSyncBroadcast(int size, void *msg); void CmiSyncBroadcastAndFree(int size, void *msg); diff --git a/src/convcore.cpp b/src/convcore.cpp index 2b851b0..0275980 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -72,8 +72,7 @@ void converseRunPe(int rank) { // init state CmiInitState(rank); - // init things like cld module, ccs, etc - CldModuleInit(Cmi_argv); + #ifdef SET_CPU_AFFINITY CmiSetCPUAffinity(rank); #endif @@ -83,6 +82,11 @@ void converseRunPe(int rank) { //initalize collective operations/arrays/handlers/etc collectiveInit(); + // init things like cld module, ccs, etc + // moved this after Cmi_Exithandler so exithandler will theoretically be + //registered at index 0, so if user forgets to specify handlerid, it will exit + CldModuleInit(Cmi_argv); + // barrier to ensure all global structs are initialized CmiNodeBarrier(); @@ -214,6 +218,7 @@ void CmiInitState(int rank) { Cmi_taskqueues[Cmi_myrank] = TaskQueueCreate(); //printf("task queue created at pointer: %p\n", Cmi_taskqueues[Cmi_myrank]); if(CmiMyNodeSize() > 1) { + //fprintf(stderr, "%d: called by pe %d\n", __LINE__, CmiMyPe()); CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); } @@ -266,6 +271,8 @@ void CmiSetInfo(void *msg, int infofn) { void CmiPushPE(int destPE, int messageSize, void *msg) { int rank = CmiRankOf(destPE); + // printf("in cmipushpe: myPe: %d, destPe: %d, nodeSize: %d\n", CmiMyPe(), destPE, + // Cmi_mynodesize); CmiAssertMsg( rank >= 0 && rank < Cmi_mynodesize, "CmiPushPE(myPe: %d, destPe: %d, nodeSize: %d): rank out of range", @@ -309,6 +316,7 @@ void CmiSyncSendAndFree(int destPE, int messageSize, void *msg) { } if (CmiMyNode() == destNode) { + CmiPushPE(destPE, messageSize, msg); } else { comm_backend::sendAm(destNode, msg, messageSize, comm_backend::MR_NULL, @@ -907,6 +915,25 @@ int CmiTryLock(CmiNodeLock lock) { return pthread_mutex_trylock(lock); } //Task Queue Functions/Definitions +void CmiTaskQueueSyncSend(int destPE, int messageSize, void *msg) { + // fprintf(stderr, "task queue sync send called by pe %d and the destPe is %d\n", CmiMyPe(), destPE); + char *copymsg = (char *)CmiAlloc(messageSize); + std::memcpy(copymsg, msg, + messageSize); // optionally avoid memcpy and block instead + + int destindex = CmiRankOf(destPE); + // fprintf(stderr, "destindex: %d\n", destindex); + TaskQueue* dest_taskq = (TaskQueue*)(Cmi_taskqueues[destindex]); + if (dest_taskq == NULL) { + CmiFree(copymsg); + return; + } + + //fprintf(stderr, "copymsg: %p\n", copymsg); + TaskQueuePush(dest_taskq, copymsg); +} + + // Function to create a new TaskQueue and initialize its members TaskQueue* TaskQueueCreate() { TaskQueue* taskqueue = (TaskQueue*)malloc(sizeof(TaskQueue)); @@ -923,9 +950,6 @@ void TaskQueuePush(TaskQueue* queue, void* data) { queue->data[queue->tail % TASKQUEUE_SIZE] = data; CmiMemoryWriteFence(); //makes sure the data is fully written before updating the tail pointer queue->tail++; - if (queue->tail >= TASKQUEUE_SIZE) { - fprintf(stderr, "TaskQueue overflow: possible corruption/overwrite possibility of data\n"); - } } // Function to pop a task from the TaskQueue. Victims pop from the tail @@ -935,7 +959,8 @@ void* TaskQueuePop(TaskQueue* queue) { taskq_idx head = queue->head; taskq_idx tail = queue->tail; if (tail > head) { // there are more than two tasks in the queue, so it is safe to pop a task from the queue. - return queue->data[tail % TASKQUEUE_SIZE]; + //fprintf(stderr, "%d: taskpop called by pe %d\n", __LINE__, CmiMyPe()); + return queue->data[tail % TASKQUEUE_SIZE]; } if (tail < head) { // The taskqueue is empty and the last task has been stolen by a thief. @@ -948,6 +973,7 @@ void* TaskQueuePop(TaskQueue* queue) { if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the last task has already stolen. return NULL; } + //fprintf(stderr, "%d: taskpop called by pe %d\n", __LINE__, CmiMyPe()); return queue->data[tail % TASKQUEUE_SIZE]; } @@ -1000,6 +1026,7 @@ void StealTask() { void TaskStealBeginIdle() { // can discuss whether we need to add the isHelper csv variable that is in old converse. // not going to add it for now, because it's turned/left on by default in old converse + //fprintf(stderr, "%d: task steal queue idle called: %d\n", __LINE__, CmiMyPe()); if (CmiMyNodeSize() > 1) { StealTask(); } diff --git a/src/converse_internal.h b/src/converse_internal.h index 57ab183..4fdb66a 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -171,7 +171,7 @@ CmiBroadcastSource CmiGetBcastSource(void *msg); // TASK QUEUE RELATED FUNCTIONS/DEFINITIONS -#define TASKQUEUE_SIZE 1024 +#define TASKQUEUE_SIZE 2000 #define CmiMemoryWriteFence() // No-op if not in SMP mode typedef int taskq_idx; diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 24e6a47..4a02a97 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -42,6 +42,7 @@ void CsdScheduler() { } } else if (taskQueue && (msg = TaskQueuePop(taskQueue))) { //taskqueue pop handles all possible queue cases arleady so we only need to check if it exists or not assert(msg != NULL); + //fprintf(stderr, "%d: taskqueue popped message: %p\n", CmiMyPe(), msg); //process event CmiHandleMessage(msg); From fd2b68fb05832b4cbea4f1d4d4b7d35db6705f32 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Tue, 6 May 2025 16:26:47 -0500 Subject: [PATCH 08/21] undefined handler functionality + clean up comments --- examples/taskqueue/taskqueue.cpp | 3 --- src/convcore.cpp | 19 ++++++++++--------- src/converse_internal.h | 6 ++++++ src/scheduler.cpp | 1 - 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/examples/taskqueue/taskqueue.cpp b/examples/taskqueue/taskqueue.cpp index df316a8..9916ce2 100644 --- a/examples/taskqueue/taskqueue.cpp +++ b/examples/taskqueue/taskqueue.cpp @@ -59,10 +59,7 @@ CmiStartFn mymain(int argc, char **argv) { newmsg->data[0] = i; newmsg->header.messageSize = sizeof(Message); newmsg->header.handlerId = handlerID; - //fprintf(stderr, "before: PE %d: sending message with data: %p\n", CmiMyRank(), newmsg); CmiTaskQueueSyncSend(CmiMyRank(), sizeof(Message), newmsg); - //CmiSyncSendAndFree(CmiMyPe(), sizeof(Message), newmsg); - //fprintf(stderr, "after: PE %d: sending message with data: %p\n", CmiMyRank(), newmsg); } } return 0; diff --git a/src/convcore.cpp b/src/convcore.cpp index 0275980..f649e5e 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -45,6 +45,7 @@ thread_local double idle_time; // Special operation handlers (TODO: should these be special values instead like // the exit handler) +int Cmi_undefinedHandler; int Cmi_exitHandler; // TODO: padding for all these thread_locals and cmistates? @@ -76,7 +77,7 @@ void converseRunPe(int rank) { #ifdef SET_CPU_AFFINITY CmiSetCPUAffinity(rank); #endif - + Cmi_undefinedHandler = CmiRegisterHandler(CmiUndefinedHandler); //should be at index 0 which is what gets called if user doesnt define a handlerid in their message Cmi_exitHandler = CmiRegisterHandler(CmiExitHandler); //initalize collective operations/arrays/handlers/etc @@ -216,9 +217,7 @@ void CmiInitState(int rank) { //task queue stuff Cmi_taskqueues[Cmi_myrank] = TaskQueueCreate(); - //printf("task queue created at pointer: %p\n", Cmi_taskqueues[Cmi_myrank]); if(CmiMyNodeSize() > 1) { - //fprintf(stderr, "%d: called by pe %d\n", __LINE__, CmiMyPe()); CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL); } @@ -379,6 +378,14 @@ void CmiExitHandler(void *msg) { CsdExitScheduler(); } +// this will only stop the program on pe1, so it will exit the program if its +pe 1 and hang if it is +pe > 1 +void CmiUndefinedHandler(void* msg) { + (void)msg; + + CmiPrintf("Possible undefined handler called. Please check to see if you've populated the handlerId field of your message.\n"); + CsdExitScheduler(); +} + ConverseNodeQueue *CmiGetNodeQueue() { return CmiNodeQueue; } void CmiSyncNodeSendAndFree(unsigned int destNode, unsigned int size, @@ -916,20 +923,17 @@ int CmiTryLock(CmiNodeLock lock) { return pthread_mutex_trylock(lock); } //Task Queue Functions/Definitions void CmiTaskQueueSyncSend(int destPE, int messageSize, void *msg) { - // fprintf(stderr, "task queue sync send called by pe %d and the destPe is %d\n", CmiMyPe(), destPE); char *copymsg = (char *)CmiAlloc(messageSize); std::memcpy(copymsg, msg, messageSize); // optionally avoid memcpy and block instead int destindex = CmiRankOf(destPE); - // fprintf(stderr, "destindex: %d\n", destindex); TaskQueue* dest_taskq = (TaskQueue*)(Cmi_taskqueues[destindex]); if (dest_taskq == NULL) { CmiFree(copymsg); return; } - //fprintf(stderr, "copymsg: %p\n", copymsg); TaskQueuePush(dest_taskq, copymsg); } @@ -959,7 +963,6 @@ void* TaskQueuePop(TaskQueue* queue) { taskq_idx head = queue->head; taskq_idx tail = queue->tail; if (tail > head) { // there are more than two tasks in the queue, so it is safe to pop a task from the queue. - //fprintf(stderr, "%d: taskpop called by pe %d\n", __LINE__, CmiMyPe()); return queue->data[tail % TASKQUEUE_SIZE]; } @@ -973,7 +976,6 @@ void* TaskQueuePop(TaskQueue* queue) { if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the last task has already stolen. return NULL; } - //fprintf(stderr, "%d: taskpop called by pe %d\n", __LINE__, CmiMyPe()); return queue->data[tail % TASKQUEUE_SIZE]; } @@ -1026,7 +1028,6 @@ void StealTask() { void TaskStealBeginIdle() { // can discuss whether we need to add the isHelper csv variable that is in old converse. // not going to add it for now, because it's turned/left on by default in old converse - //fprintf(stderr, "%d: task steal queue idle called: %d\n", __LINE__, CmiMyPe()); if (CmiMyNodeSize() > 1) { StealTask(); } diff --git a/src/converse_internal.h b/src/converse_internal.h index 4fdb66a..2314da8 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -35,6 +35,7 @@ void CmiNodeBcastHandler(void *msg); void CmiExitHandler(void *msg); void CmiGroupHandler(void *msg); void CmiReduceHandler(void *msg); +void CmiUndefinedHandler(void *msg); typedef struct HandlerInfo { CmiHandler hdlr; @@ -172,7 +173,12 @@ CmiBroadcastSource CmiGetBcastSource(void *msg); // TASK QUEUE RELATED FUNCTIONS/DEFINITIONS #define TASKQUEUE_SIZE 2000 + +#ifdef CMK_SMP +#define CmiMemoryWriteFence() __sync_synchronize() // Memory fence for SMP mode +#else #define CmiMemoryWriteFence() // No-op if not in SMP mode +#endif typedef int taskq_idx; diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 4a02a97..24e6a47 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -42,7 +42,6 @@ void CsdScheduler() { } } else if (taskQueue && (msg = TaskQueuePop(taskQueue))) { //taskqueue pop handles all possible queue cases arleady so we only need to check if it exists or not assert(msg != NULL); - //fprintf(stderr, "%d: taskqueue popped message: %p\n", CmiMyPe(), msg); //process event CmiHandleMessage(msg); From ca8e293ae5566e6d93798186cf4e62e454f57899 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Tue, 6 May 2025 16:46:35 -0500 Subject: [PATCH 09/21] reordering --- examples/taskqueue/taskqueue.cpp | 2 +- src/convcore.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/taskqueue/taskqueue.cpp b/examples/taskqueue/taskqueue.cpp index 9916ce2..800986e 100644 --- a/examples/taskqueue/taskqueue.cpp +++ b/examples/taskqueue/taskqueue.cpp @@ -2,7 +2,7 @@ #include #include -#define K 1600 +#define K 1000 #define X 10000 CsvDeclare(int, globalCounter); diff --git a/src/convcore.cpp b/src/convcore.cpp index 8603851..390c028 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -112,10 +112,10 @@ void CmiStartThreads() { Cmi_queues = new ConverseQueue *[Cmi_mynodesize]; CmiHandlerTable = new std::vector *[Cmi_mynodesize]; CmiNodeQueue = new ConverseNodeQueue(); + Cmi_taskqueues = new TaskQueue*[Cmi_mynodesize]; _smp_mutex = CmiCreateLock(); - - Cmi_taskqueues = new TaskQueue*[Cmi_mynodesize]; + // make sure the queues are allocated before PEs start sending messages around comm_backend::barrier(); From 1b94c90a8ee9eefa64944ac5d600d2895613e0e1 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Tue, 6 May 2025 16:53:15 -0500 Subject: [PATCH 10/21] try again --- examples/taskqueue/CMakeLists.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/taskqueue/CMakeLists.txt b/examples/taskqueue/CMakeLists.txt index db59981..681f270 100644 --- a/examples/taskqueue/CMakeLists.txt +++ b/examples/taskqueue/CMakeLists.txt @@ -1,2 +1 @@ -add_reconverse_executable(taskqueue taskqueue.cpp) -add_test(NAME task_queue COMMAND task_queue +pe 7) \ No newline at end of file +add_reconverse_executable(taskqueue taskqueue.cpp) \ No newline at end of file From feafc33b51aacbb419a32c3db352311cbafd8ac7 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 11:54:15 -0500 Subject: [PATCH 11/21] testing to see if build test passes now --- src/convcore.cpp | 2 +- src/converse_internal.h | 27 +++++++++++++++++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/convcore.cpp b/src/convcore.cpp index 390c028..fc6a784 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -79,7 +79,7 @@ void converseRunPe(int rank) { #ifdef SET_CPU_AFFINITY CmiSetCPUAffinity(rank); #endif - Cmi_undefinedHandler = CmiRegisterHandler(CmiUndefinedHandler); //should be at index 0 which is what gets called if user doesnt define a handlerid in their message + //Cmi_undefinedHandler = CmiRegisterHandler(CmiUndefinedHandler); //should be at index 0 which is what gets called if user doesnt define a handlerid in their message Cmi_exitHandler = CmiRegisterHandler(CmiExitHandler); //initalize collective operations/arrays/handlers/etc diff --git a/src/converse_internal.h b/src/converse_internal.h index e8b0ede..4e4244a 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -88,9 +88,6 @@ extern void CmiInitHwlocTopology(void); extern int CmiSetCPUAffinity(int); // REDUCTION RELATED FUNCTIONS/DEFINITIONS - -#define CMI_REDUCTION_ID_MULTIPLIER 4 - using CmiReductionID = decltype(CmiMessageHeader::collectiveMetaInfo); // needs to match header using CmiBroadcastSource = @@ -175,6 +172,28 @@ CmiBroadcastSource CmiGetBcastSource(void *msg); // TASK QUEUE RELATED FUNCTIONS/DEFINITIONS +/** + * I think we should just abort if we see estimated queue size is more than (say) half of the max size. + * I say half because thats a power of 2. But may be a small number will do. + * Like estimatedSize > Maxsize - 2*numPEs. (here estimated size is something like tail-head (or vice versa?). + * And note not to use atomic etc. for accessing tail and head for this estimated size + */ + +/** + * I suggest we increase the taskq size, document it (and maybe provide a build option or just instructions + * for where to change it in the doc), and see if a warning (if not abort) is possible. + * Note that the head and tail roll over (as 32 bit ints.. It probably should be unsigned int) + * (and the actual index is calculated as (head % qsize). So the difference will be negative briefly + * when the tail rolls over after going above MAXINT but head hasn’t yet.. but thats ok. + * The purpose of the warning (or abort) is still reasonably served + */ + +/** + * For later, we should consider unbounded queue. But it won’t be as efficient as this. + * (one change I suggest in the current code is that queuesize be specified as a power of two + * (by specifying the exponent), so there is no chance of someone changing it to a non power of 2, + * and then changing the modulo operator to a bitmas + */ #define TASKQUEUE_SIZE 2000 #ifdef CMK_SMP @@ -207,4 +226,4 @@ void StealTask(void); extern ConverseQueue **Cmi_queues; // array of queue pointers extern TaskQueue** Cmi_taskqueues; -#endif +#endif \ No newline at end of file From 8fee676c06ff8969a7c030a21290b8212808d32a Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 11:59:28 -0500 Subject: [PATCH 12/21] try again --- examples/reduction_dual/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/reduction_dual/CMakeLists.txt b/examples/reduction_dual/CMakeLists.txt index a2d0950..bc99d40 100644 --- a/examples/reduction_dual/CMakeLists.txt +++ b/examples/reduction_dual/CMakeLists.txt @@ -1,3 +1,2 @@ add_reconverse_executable(reduction_dual reduction_dual.cpp) add_test(NAME reduction_dual COMMAND reduction_dual +pe 7) -add_test(NAME reduction_dual-multinode COMMAND ${RECONVERSE_TEST_LAUNCHER} -n 2 $ +pe 4) From 4cd5fd7fb5e78c88ca3737966793323dc51d13e9 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Thu, 8 May 2025 13:46:14 -0500 Subject: [PATCH 13/21] remove lci1 test --- .github/workflows/ci-lci1.yml | 52 ----------------------------------- 1 file changed, 52 deletions(-) delete mode 100644 .github/workflows/ci-lci1.yml diff --git a/.github/workflows/ci-lci1.yml b/.github/workflows/ci-lci1.yml deleted file mode 100644 index b2e0702..0000000 --- a/.github/workflows/ci-lci1.yml +++ /dev/null @@ -1,52 +0,0 @@ -name: LCI1 - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - -jobs: - build-and-test: - runs-on: ubuntu-latest - name: Build and Test - - steps: - - uses: actions/checkout@v4 - - - name: Install Dependencies - run: | - sudo apt-get update - sudo apt-get install -y cmake ninja-build libfabric-bin libfabric-dev openmpi-bin openmpi-common openmpi-doc libopenmpi-dev - - - name: Verify Installation - run: | - cmake --version - ninja --version - mpicc --version - mpirun --version - fi_info --version - - - name: Configure - shell: bash - run: | - cmake \ - -Bbuild \ - -GNinja \ - -DCMAKE_BUILD_TYPE=Debug \ - -DRECONVERSE_TRY_ENABLE_COMM_LCI1=ON \ - -DRECONVERSE_AUTOFETCH_LCI1=ON \ - -DLCI_SERVER=ofi \ - -DLCT_PMI_BACKEND_ENABLE_MPI=ON \ - . - - - name: Build - shell: bash - run: | - cmake --build build --target all - - - name: Test - shell: bash - run: | - cd build - ctest --extra-verbose --timeout 60 From ad5c110d9251b6686eb7879c21519f2692f45d88 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 15:47:25 -0500 Subject: [PATCH 14/21] example bug where stuff is hanging fixed --- examples/taskqueue/taskqueue.cpp | 24 +++++++++++------------- src/convcore.cpp | 10 ++++++++++ src/converse_internal.h | 2 +- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/examples/taskqueue/taskqueue.cpp b/examples/taskqueue/taskqueue.cpp index 800986e..a100959 100644 --- a/examples/taskqueue/taskqueue.cpp +++ b/examples/taskqueue/taskqueue.cpp @@ -1,12 +1,13 @@ #include "converse.h" #include #include +#include -#define K 1000 +#define K 500 #define X 10000 -CsvDeclare(int, globalCounter); -CpvDeclare(int, tasksExecuted); +thread_local int tasksExecuted = 0; +static std::atomic globalCounter{K}; int handlerID; int print_handlerID; @@ -17,7 +18,7 @@ struct Message { }; void print_results_handler_func(void* vmsg) { - printf("PE %d executed %d tasks.\n", CmiMyRank(), CpvAccess(tasksExecuted)); + printf("PE %d executed %d tasks.\n", CmiMyRank(), tasksExecuted); CmiNodeBarrier(); if (CmiMyRank() == 0) { CmiExit(0); @@ -33,9 +34,11 @@ void handler_func(void *vmsg) { CmiWallTimer(); } - CpvAccess(tasksExecuted)++; - CsvAccess(globalCounter) = CsvAccess(globalCounter) - 1; - if (CsvAccess(globalCounter) == 0) { + tasksExecuted++; + + int prev = globalCounter.fetch_sub(1, std::memory_order_acq_rel); + CmiPrintf("Current globalCounter: %d\n", prev - 1); + if (prev == 1) { Message* msg = new Message; msg->header.handlerId = print_handlerID; msg->header.messageSize = sizeof(Message); @@ -44,16 +47,11 @@ void handler_func(void *vmsg) { } } -CmiStartFn mymain(int argc, char **argv) { - CpvInitialize(int, tasksExecuted); - CpvAccess(tasksExecuted) = 0; - +CmiStartFn mymain(int argc, char **argv) { handlerID = CmiRegisterHandler(handler_func); print_handlerID = CmiRegisterHandler(print_results_handler_func); if (CmiMyPe() == 0) { - CsvInitialize(int, globalCounter); - CsvAccess(globalCounter) = K; for (int i = 0; i < K; i++) { Message* newmsg = new Message; newmsg->data[0] = i; diff --git a/src/convcore.cpp b/src/convcore.cpp index fc6a784..73356cd 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -1017,6 +1017,16 @@ TaskQueue* TaskQueueCreate() { // Function to push a task onto the TaskQueue void TaskQueuePush(TaskQueue* queue, void* data) { + // compute estimated size + //probably not thread safe, so that's why its estimated size? + taskq_idx head = queue->head; + taskq_idx tail = queue->tail; + size_t estimated_size = std::abs(tail - head); + if (estimated_size > TASKQUEUE_SIZE - (2 * CmiMyNodeSize())) { + CmiPrintf("tail: %lu, head: %lu\n", tail, head); + CmiAbort("TaskQueuePush: TaskQueue is approaching full. Estimated size: %lu, threshold: %lu \n", estimated_size, TASKQUEUE_SIZE - (2 * CmiMyNodeSize())); + } + queue->data[queue->tail % TASKQUEUE_SIZE] = data; CmiMemoryWriteFence(); //makes sure the data is fully written before updating the tail pointer queue->tail++; diff --git a/src/converse_internal.h b/src/converse_internal.h index 4e4244a..8fac02a 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -194,7 +194,7 @@ CmiBroadcastSource CmiGetBcastSource(void *msg); * (by specifying the exponent), so there is no chance of someone changing it to a non power of 2, * and then changing the modulo operator to a bitmas */ -#define TASKQUEUE_SIZE 2000 +#define TASKQUEUE_SIZE 2048 // in old converse it is 1024 #ifdef CMK_SMP #define CmiMemoryWriteFence() __sync_synchronize() // Memory fence for SMP mode From 51c6264a307237c1ab05288a1332134ac6dec414 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 16:03:49 -0500 Subject: [PATCH 15/21] null check + start of CmiSynctaskQSendndFree --- examples/taskqueue/taskqueue.cpp | 2 +- include/converse.h | 3 ++- src/convcore.cpp | 17 +++++++++++------ 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/examples/taskqueue/taskqueue.cpp b/examples/taskqueue/taskqueue.cpp index a100959..e4fd2fd 100644 --- a/examples/taskqueue/taskqueue.cpp +++ b/examples/taskqueue/taskqueue.cpp @@ -57,7 +57,7 @@ CmiStartFn mymain(int argc, char **argv) { newmsg->data[0] = i; newmsg->header.messageSize = sizeof(Message); newmsg->header.handlerId = handlerID; - CmiTaskQueueSyncSend(CmiMyRank(), sizeof(Message), newmsg); + CmiSyncTaskQSend(CmiMyRank(), sizeof(Message), newmsg); } } return 0; diff --git a/include/converse.h b/include/converse.h index 330f450..b40051e 100644 --- a/include/converse.h +++ b/include/converse.h @@ -337,7 +337,8 @@ void CmiFreeSendFn(int destPE, int messageSize, char *msg); void CmiSyncListSendFn(int npes, const int *pes, int len, char *msg); void CmiFreeListSendFn(int npes, const int *pes, int len, char *msg); -void CmiTaskQueueSyncSend(int destPE, int messageSize, void *msg); +void CmiSyncTaskQSend(int destPE, int messageSize, void *msg); +void CmiSyncTaskQSendAndFree(int destPE, int messageSize, void *msg); // broadcasts void CmiSyncBroadcast(int size, void *msg); diff --git a/src/convcore.cpp b/src/convcore.cpp index 73356cd..f58062a 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -79,7 +79,7 @@ void converseRunPe(int rank) { #ifdef SET_CPU_AFFINITY CmiSetCPUAffinity(rank); #endif - //Cmi_undefinedHandler = CmiRegisterHandler(CmiUndefinedHandler); //should be at index 0 which is what gets called if user doesnt define a handlerid in their message + Cmi_undefinedHandler = CmiRegisterHandler(CmiUndefinedHandler); //should be at index 0 which is what gets called if user doesnt define a handlerid in their message Cmi_exitHandler = CmiRegisterHandler(CmiExitHandler); //initalize collective operations/arrays/handlers/etc @@ -988,7 +988,7 @@ int CmiTryLock(CmiNodeLock lock) { return pthread_mutex_trylock(lock); } //Task Queue Functions/Definitions -void CmiTaskQueueSyncSend(int destPE, int messageSize, void *msg) { +void CmiSyncTaskQSend(int destPE, int messageSize, void *msg) { char *copymsg = (char *)CmiAlloc(messageSize); std::memcpy(copymsg, msg, messageSize); // optionally avoid memcpy and block instead @@ -1003,6 +1003,10 @@ void CmiTaskQueueSyncSend(int destPE, int messageSize, void *msg) { TaskQueuePush(dest_taskq, copymsg); } +void CmiSyncTaskQSendAndFree(int destPE, int messageSize, void *msg) { + return; +} + // Function to create a new TaskQueue and initialize its members TaskQueue* TaskQueueCreate() { @@ -1093,10 +1097,11 @@ void StealTask() { // and we are the last PE on our node } } - - void* msg = TaskQueueSteal((TaskQueue*)(Cmi_taskqueues[random_rank])); - if (msg != NULL) { - TaskQueuePush((TaskQueue*)(Cmi_taskqueues[Cmi_myrank]), msg); + if (Cmi_taskqueues[random_rank] != NULL) { + void* msg = TaskQueueSteal((TaskQueue*)(Cmi_taskqueues[random_rank])); + if (msg != NULL && Cmi_taskqueues[Cmi_myrank] != NULL) { + TaskQueuePush((TaskQueue*)(Cmi_taskqueues[Cmi_myrank]), msg); + } } } From 34456fe704a9f10f2815ac06e6970f0129e6aac2 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 16:12:12 -0500 Subject: [PATCH 16/21] remove print statement + change to sendandfree --- examples/taskqueue/taskqueue.cpp | 4 ++-- src/convcore.cpp | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/taskqueue/taskqueue.cpp b/examples/taskqueue/taskqueue.cpp index e4fd2fd..6603553 100644 --- a/examples/taskqueue/taskqueue.cpp +++ b/examples/taskqueue/taskqueue.cpp @@ -37,7 +37,7 @@ void handler_func(void *vmsg) { tasksExecuted++; int prev = globalCounter.fetch_sub(1, std::memory_order_acq_rel); - CmiPrintf("Current globalCounter: %d\n", prev - 1); + //CmiPrintf("Current globalCounter: %d\n", prev - 1); if (prev == 1) { Message* msg = new Message; msg->header.handlerId = print_handlerID; @@ -57,7 +57,7 @@ CmiStartFn mymain(int argc, char **argv) { newmsg->data[0] = i; newmsg->header.messageSize = sizeof(Message); newmsg->header.handlerId = handlerID; - CmiSyncTaskQSend(CmiMyRank(), sizeof(Message), newmsg); + CmiSyncTaskQSendAndFree(CmiMyRank(), sizeof(Message), newmsg); } } return 0; diff --git a/src/convcore.cpp b/src/convcore.cpp index f58062a..789223d 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -993,21 +993,21 @@ void CmiSyncTaskQSend(int destPE, int messageSize, void *msg) { std::memcpy(copymsg, msg, messageSize); // optionally avoid memcpy and block instead + CmiSyncTaskQSendAndFree(destPE, messageSize, copymsg); +} + +void CmiSyncTaskQSendAndFree(int destPE, int messageSize, void *msg) { int destindex = CmiRankOf(destPE); TaskQueue* dest_taskq = (TaskQueue*)(Cmi_taskqueues[destindex]); if (dest_taskq == NULL) { - CmiFree(copymsg); + CmiFree(msg); return; } - TaskQueuePush(dest_taskq, copymsg); -} - -void CmiSyncTaskQSendAndFree(int destPE, int messageSize, void *msg) { + TaskQueuePush(dest_taskq, msg); return; } - // Function to create a new TaskQueue and initialize its members TaskQueue* TaskQueueCreate() { TaskQueue* taskqueue = (TaskQueue*)malloc(sizeof(TaskQueue)); From ef9109df668ef6ae8b4dd8c6a1e8982b99b26ea7 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 16:14:17 -0500 Subject: [PATCH 17/21] task queue steal cange --- src/convcore.cpp | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/src/convcore.cpp b/src/convcore.cpp index 789223d..7f6c629 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -1061,29 +1061,34 @@ void* TaskQueuePop(TaskQueue* queue) { // Function to steal a task from another TaskQueue. Other PEs/Threads steal from the head void* TaskQueueSteal(TaskQueue* queue) { - taskq_idx head, tail; - while (1) { - head = queue->head; - tail = queue->tail; - if (head >= tail) { - // The queue is empty - // or the last element has been stolen by other thieves - // or popped by the victim. - return NULL; - } - - if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the task this thief is trying to steal is still in the queue and not stolen by the other thieves. - continue; - } - return queue->data[head % TASKQUEUE_SIZE]; - } + if (queue == NULL) { + return NULL; + } + + taskq_idx head, tail; + while (1) { + head = queue->head; + tail = queue->tail; + if (head >= tail) { + // The queue is empty + // or the last element has been stolen by other thieves + // or popped by the victim. + return NULL; + } + + if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the task this thief is trying to steal is still in the queue and not stolen by the other thieves. + continue; + } + return queue->data[head % TASKQUEUE_SIZE]; + } } // Function to destroy the TaskQueue and free its memory void TaskQueueDestroy(TaskQueue* queue) { - if (queue != NULL) { - free(queue); - } + if (queue != NULL) { + free(queue); + queue = NULL; + } } void StealTask() { From 2a3329103e356a153bcb317fb60566733bc86b37 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 16:19:53 -0500 Subject: [PATCH 18/21] change value of k --- examples/taskqueue/taskqueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/taskqueue/taskqueue.cpp b/examples/taskqueue/taskqueue.cpp index 6603553..e1a83fc 100644 --- a/examples/taskqueue/taskqueue.cpp +++ b/examples/taskqueue/taskqueue.cpp @@ -3,7 +3,7 @@ #include #include -#define K 500 +#define K 2000 // will tend to abort if your k value is too high (higher than the max task queue size) #define X 10000 thread_local int tasksExecuted = 0; From 3f4032b8e3f84e5b320d55d7a5e4c513664d8d2d Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 16:28:35 -0500 Subject: [PATCH 19/21] add head check --- src/convcore.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/convcore.cpp b/src/convcore.cpp index 7f6c629..c74cfca 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -1079,6 +1079,10 @@ void* TaskQueueSteal(TaskQueue* queue) { if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the task this thief is trying to steal is still in the queue and not stolen by the other thieves. continue; } + if (head < 0) { + CmiPrintf("corruption detected possibly?"); + return NULL; + } return queue->data[head % TASKQUEUE_SIZE]; } } From e80670158c600b665272c6df4dea059b9b18402e Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 16:50:13 -0500 Subject: [PATCH 20/21] add task queu logic to csdschdulerpoll --- src/scheduler.cpp | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 24e6a47..cc4d7ef 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -18,7 +18,6 @@ void CsdScheduler() { //get task level queue TaskQueue* taskQueue = (TaskQueue*)(Cmi_taskqueues[CmiMyRank()]); - //printf("scheduler %d: taskq pointer %p", __LINE__, taskQueue); void* msg = NULL; @@ -104,6 +103,10 @@ void CsdSchedulePoll() { // get node level queue ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); + TaskQueue* taskQueue = (TaskQueue*)(Cmi_taskqueues[CmiMyRank()]); + + void* msg = NULL; + while(1){ CcdCallBacks(); @@ -114,7 +117,7 @@ void CsdSchedulePoll() { if (!nodeQueue->empty()) { auto result = nodeQueue->pop(); if (result) { - void *msg = result.value(); + msg = result.value(); // process event CmiHandleMessage(msg); @@ -126,10 +129,23 @@ void CsdSchedulePoll() { } } + else if (taskQueue && (msg = TaskQueuePop(taskQueue))) { //taskqueue pop handles all possible queue cases arleady so we only need to check if msg exists or not + assert(msg != NULL); + //process event + CmiHandleMessage(msg); + + // release idle if necessary + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } + + } + // poll thread queue else if (!queue->empty()) { // get next event (guaranteed to be there because only single consumer) - void *msg = queue->pop().value(); + msg = queue->pop().value(); // process event CmiHandleMessage(msg); From e510f1db37721a9236169720d37acab60fc8eb22 Mon Sep 17 00:00:00 2001 From: Steven Qie Date: Thu, 8 May 2025 16:54:14 -0500 Subject: [PATCH 21/21] print statement remove --- src/convcore.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/convcore.cpp b/src/convcore.cpp index c74cfca..3a14369 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -1080,7 +1080,6 @@ void* TaskQueueSteal(TaskQueue* queue) { continue; } if (head < 0) { - CmiPrintf("corruption detected possibly?"); return NULL; } return queue->data[head % TASKQUEUE_SIZE];