Skip to content

Commit 35434aa

Browse files
authored
Merge pull request #53 from lf-lang/shutdown
Fix timeout so shutdown is invoked only once
2 parents 521f1d8 + 2e7b8a7 commit 35434aa

File tree

6 files changed

+46
-18
lines changed

6 files changed

+46
-18
lines changed

include/reactor-cpp/environment.hh

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,15 @@ private:
5555

5656
Scheduler scheduler_;
5757
Phase phase_{Phase::Construction};
58-
Tag start_tag_{};
5958

59+
/// Timeout as given in the constructor
6060
const Duration timeout_{};
6161

62+
/// The start tag as determined during startup()
63+
Tag start_tag_{};
64+
/// The timeout tag as determined during startup()
65+
Tag timeout_tag_{};
66+
6267
Graph<BasePort*, ConnectionProperties> graph_{};
6368
Graph<BasePort*, ConnectionProperties> optimized_graph_{};
6469

@@ -118,6 +123,7 @@ public:
118123
[[nodiscard]] auto logical_time() const noexcept -> const LogicalTime& { return scheduler_.logical_time(); }
119124
[[nodiscard]] auto start_tag() const noexcept -> const Tag& { return start_tag_; }
120125
[[nodiscard]] auto timeout() const noexcept -> const Duration& { return timeout_; }
126+
[[nodiscard]] auto timeout_tag() const noexcept -> const Tag& { return timeout_tag_; }
121127

122128
static auto physical_time() noexcept -> TimePoint { return get_physical_time(); }
123129

include/reactor-cpp/logical_time.hh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public:
4343
[[nodiscard]] auto delay(Duration offset = Duration::zero()) const noexcept -> Tag;
4444
[[nodiscard]] auto subtract(Duration offset = Duration::zero()) const noexcept -> Tag;
4545
[[nodiscard]] auto decrement() const noexcept -> Tag;
46+
47+
[[nodiscard]] static auto max() noexcept -> Tag { return {TimePoint::max(), std::numeric_limits<mstep_t>::max()}; }
4648
};
4749

4850
// define all the comparison operators

include/reactor-cpp/scheduler.hh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ private:
161161
std::vector<ReleaseTagCallback> release_tag_callbacks_{};
162162
void release_current_tag();
163163

164+
void cleanup_after_tag();
165+
164166
void schedule() noexcept;
165167
auto schedule_ready_reactions() -> bool;
166168
void next();

lib/action.cc

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,11 @@ void Timer::cleanup() noexcept {
7171
ShutdownTrigger::ShutdownTrigger(const std::string& name, Reactor* container)
7272
: Timer(name, container, Duration::zero(), container->environment()->timeout()) {}
7373

74-
void ShutdownTrigger::setup() noexcept {
75-
BaseAction::setup();
76-
environment()->sync_shutdown();
77-
}
74+
void ShutdownTrigger::setup() noexcept { BaseAction::setup(); }
7875

7976
void ShutdownTrigger::shutdown() {
80-
if (!is_present()) {
81-
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
82-
environment()->scheduler()->schedule_sync(this, tag);
83-
}
77+
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
78+
environment()->scheduler()->schedule_sync(this, tag);
8479
}
8580

8681
auto Action<void>::schedule_at(const Tag& tag) -> bool {

lib/environment.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,12 @@ auto Environment::startup(const TimePoint& start_time) -> std::thread {
330330
phase_ = Phase::Startup;
331331

332332
this->start_tag_ = Tag::from_physical_time(start_time);
333+
if (this->timeout_ == Duration::max()) {
334+
this->timeout_tag_ = Tag::max();
335+
} else {
336+
this->timeout_tag_ = this->start_tag_.delay(this->timeout_);
337+
}
338+
333339
// start up initialize all reactors
334340
for (auto* reactor : top_level_reactors_) {
335341
reactor->startup();

lib/scheduler.cc

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,18 @@ void Scheduler::schedule() noexcept {
9191
bool found_ready_reactions = schedule_ready_reactions();
9292

9393
while (!found_ready_reactions) {
94+
if (!continue_execution_ && !found_ready_reactions) {
95+
// Cleanup and let all workers know that they should terminate.
96+
cleanup_after_tag();
97+
terminate_all_workers();
98+
break;
99+
}
100+
94101
log_.debug() << "call next()";
95102
next();
96103
reaction_queue_pos_ = 0;
97104

98105
found_ready_reactions = schedule_ready_reactions();
99-
100-
if (!continue_execution_ && !found_ready_reactions) {
101-
// let all workers know that they should terminate
102-
terminate_all_workers();
103-
break;
104-
}
105106
}
106107
}
107108

@@ -306,7 +307,7 @@ void Scheduler::advance_logical_time_to(const Tag& tag) {
306307
Statistics::increment_processed_events();
307308
}
308309

309-
void Scheduler::next() { // NOLINT
310+
void Scheduler::cleanup_after_tag() {
310311
// Notify other environments and let them know that we finished processing the
311312
// current tag
312313
release_current_tag();
@@ -328,6 +329,11 @@ void Scheduler::next() { // NOLINT
328329
}
329330
vec_ports.clear();
330331
}
332+
}
333+
334+
void Scheduler::next() { // NOLINT
335+
// First, clean up after the last tag.
336+
cleanup_after_tag();
331337

332338
{
333339
std::unique_lock<std::mutex> lock{scheduling_mutex_};
@@ -354,8 +360,8 @@ void Scheduler::next() { // NOLINT
354360
log_.debug() << "Shutting down the scheduler";
355361
Tag t_next = Tag::from_logical_time(logical_time_).delay();
356362
if (!event_queue_.empty() && t_next == event_queue_.next_tag()) {
357-
log_.debug() << "Schedule the last round of reactions including all "
358-
"termination reactions";
363+
log_.debug() << "Trigger the last round of reactions including all "
364+
"shutdown reactions";
359365
triggered_actions_ = event_queue_.extract_next_event();
360366
advance_logical_time_to(t_next);
361367
} else {
@@ -395,6 +401,17 @@ void Scheduler::next() { // NOLINT
395401
continue;
396402
}
397403

404+
// Stop execution in case we reach the timeout tag. This checks needs to
405+
// be done here, after acquiring the check, as only then we are fully
406+
// commited to executing the tag t_next. Otherwise, we could still get
407+
// earlier events (e.g., from a physical action).
408+
if (t_next == environment_->timeout_tag()) {
409+
continue_execution_ = false;
410+
log_.debug() << "Shutting down the scheduler due to timeout";
411+
log_.debug() << "Trigger the last round of reactions including all "
412+
"shutdwon reactions";
413+
}
414+
398415
// Retrieve all triggered actions at the next tag.
399416
// We do not need to lock mutex_event_queue_ here, as the lock on
400417
// scheduling_mutex_ already ensures that no one can write to the event

0 commit comments

Comments
 (0)