Skip to content

Commit d3ba914

Browse files
committed
merge with master
2 parents e592f27 + 3b25ffe commit d3ba914

File tree

6 files changed

+57
-19
lines changed

6 files changed

+57
-19
lines changed

include/reactor-cpp/environment.hh

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,18 @@ private:
5555

5656
Scheduler scheduler_;
5757
Phase phase_{Phase::Construction};
58-
Tag start_tag_{};
5958

59+
/// Timeout as given in the constructor
6060
const Duration timeout_{};
6161

6262
Graph<BasePort> graph_{};
6363
Graph<BasePort> optimized_graph_{};
6464

65+
/// The start tag as determined during startup()
66+
Tag start_tag_{};
67+
/// The timeout tag as determined during startup()
68+
Tag timeout_tag_{};
69+
6570
void build_dependency_graph(Reactor* reactor);
6671
void calculate_indexes();
6772

@@ -83,7 +88,7 @@ public:
8388

8489
template <class T> void draw_connection(Port<T>* source, Port<T>* sink, ConnectionProperties properties) {
8590
if (top_environment_ == nullptr || top_environment_ == this) {
86-
log::Debug() << "drawing connection: " << source << " --> " << sink;
91+
log::Debug() << "drawing connection: " << source->fqn() << " --> " << sink->fqn();
8792
graph_.add_edge(source, sink, properties);
8893
} else {
8994
top_environment_->draw_connection(source, sink, properties);
@@ -117,6 +122,7 @@ public:
117122
[[nodiscard]] auto logical_time() const noexcept -> const LogicalTime& { return scheduler_.logical_time(); }
118123
[[nodiscard]] auto start_tag() const noexcept -> const Tag& { return start_tag_; }
119124
[[nodiscard]] auto timeout() const noexcept -> const Duration& { return timeout_; }
125+
[[nodiscard]] auto timeout_tag() const noexcept -> const Tag& { return timeout_tag_; }
120126

121127
static auto physical_time() noexcept -> TimePoint { return get_physical_time(); }
122128

include/reactor-cpp/logical_time.hh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public:
4343
[[nodiscard]] auto delay(Duration offset = Duration::zero()) const noexcept -> Tag;
4444
[[nodiscard]] auto subtract(Duration offset = Duration::zero()) const noexcept -> Tag;
4545
[[nodiscard]] auto decrement() const noexcept -> Tag;
46+
47+
[[nodiscard]] static auto max() noexcept -> Tag { return {TimePoint::max(), std::numeric_limits<mstep_t>::max()}; }
4648
};
4749

4850
// define all the comparison operators

include/reactor-cpp/scheduler.hh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ private:
161161
std::vector<ReleaseTagCallback> release_tag_callbacks_{};
162162
void release_current_tag();
163163

164+
void cleanup_after_tag();
165+
164166
void schedule() noexcept;
165167
auto schedule_ready_reactions() -> bool;
166168
void next();

lib/action.cc

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,11 @@ void Timer::cleanup() noexcept {
7171
ShutdownTrigger::ShutdownTrigger(const std::string& name, Reactor* container)
7272
: Timer(name, container, Duration::zero(), container->environment()->timeout()) {}
7373

74-
void ShutdownTrigger::setup() noexcept {
75-
BaseAction::setup();
76-
environment()->sync_shutdown();
77-
}
74+
void ShutdownTrigger::setup() noexcept { BaseAction::setup(); }
7875

7976
void ShutdownTrigger::shutdown() {
80-
if (!is_present()) {
81-
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
82-
environment()->scheduler()->schedule_sync(this, tag);
83-
}
77+
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
78+
environment()->scheduler()->schedule_sync(this, tag);
8479
}
8580

8681
auto Action<void>::schedule_at(const Tag& tag) -> bool {

lib/environment.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ void Environment::assemble() { // NOLINT
102102
env->assemble();
103103
}
104104

105+
// If this is the top level environment, then instantiate all connections.
105106
if (top_environment_ == nullptr || top_environment_ == this) {
106107
log::Debug() << "start optimization on port graph";
107108
this->optimize();
@@ -339,10 +340,25 @@ auto Environment::startup() -> std::thread {
339340
auto Environment::startup(const TimePoint& start_time) -> std::thread {
340341
validate(this->phase() == Phase::Assembly, "startup() may only be called during assembly phase!");
341342

343+
log::Debug() << "Building the Dependency-Graph";
344+
for (auto* reactor : top_level_reactors_) {
345+
build_dependency_graph(reactor);
346+
}
347+
348+
calculate_indexes();
349+
342350
log_.debug() << "Starting the execution";
343351
phase_ = Phase::Startup;
344352

345353
this->start_tag_ = Tag::from_physical_time(start_time);
354+
if (this->timeout_ == Duration::max()) {
355+
this->timeout_tag_ = Tag::max();
356+
} else if (this->timeout_ == Duration::zero()) {
357+
this->timeout_tag_ = this->start_tag_;
358+
} else {
359+
this->timeout_tag_ = this->start_tag_.delay(this->timeout_);
360+
}
361+
346362
// start up initialize all reactors
347363
for (auto* reactor : top_level_reactors_) {
348364
reactor->startup();

lib/scheduler.cc

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,18 @@ void Scheduler::schedule() noexcept {
9191
bool found_ready_reactions = schedule_ready_reactions();
9292

9393
while (!found_ready_reactions) {
94+
if (!continue_execution_ && !found_ready_reactions) {
95+
// Cleanup and let all workers know that they should terminate.
96+
cleanup_after_tag();
97+
terminate_all_workers();
98+
break;
99+
}
100+
94101
log_.debug() << "call next()";
95102
next();
96103
reaction_queue_pos_ = 0;
97104

98105
found_ready_reactions = schedule_ready_reactions();
99-
100-
if (!continue_execution_ && !found_ready_reactions) {
101-
// let all workers know that they should terminate
102-
terminate_all_workers();
103-
break;
104-
}
105106
}
106107
}
107108

@@ -306,7 +307,7 @@ void Scheduler::advance_logical_time_to(const Tag& tag) {
306307
Statistics::increment_processed_events();
307308
}
308309

309-
void Scheduler::next() { // NOLINT
310+
void Scheduler::cleanup_after_tag() {
310311
// Notify other environments and let them know that we finished processing the
311312
// current tag
312313
release_current_tag();
@@ -328,6 +329,11 @@ void Scheduler::next() { // NOLINT
328329
}
329330
vec_ports.clear();
330331
}
332+
}
333+
334+
void Scheduler::next() { // NOLINT
335+
// First, clean up after the last tag.
336+
cleanup_after_tag();
331337

332338
{
333339
std::unique_lock<std::mutex> lock{scheduling_mutex_};
@@ -354,8 +360,8 @@ void Scheduler::next() { // NOLINT
354360
log_.debug() << "Shutting down the scheduler";
355361
Tag t_next = Tag::from_logical_time(logical_time_).delay();
356362
if (!event_queue_.empty() && t_next == event_queue_.next_tag()) {
357-
log_.debug() << "Schedule the last round of reactions including all "
358-
"termination reactions";
363+
log_.debug() << "Trigger the last round of reactions including all "
364+
"shutdown reactions";
359365
triggered_actions_ = event_queue_.extract_next_event();
360366
advance_logical_time_to(t_next);
361367
} else {
@@ -395,6 +401,17 @@ void Scheduler::next() { // NOLINT
395401
continue;
396402
}
397403

404+
// Stop execution in case we reach the timeout tag. This checks needs to
405+
// be done here, after acquiring the check, as only then we are fully
406+
// commited to executing the tag t_next. Otherwise, we could still get
407+
// earlier events (e.g., from a physical action).
408+
if (t_next == environment_->timeout_tag()) {
409+
continue_execution_ = false;
410+
log_.debug() << "Shutting down the scheduler due to timeout";
411+
log_.debug() << "Trigger the last round of reactions including all "
412+
"shutdwon reactions";
413+
}
414+
398415
// Retrieve all triggered actions at the next tag.
399416
// We do not need to lock mutex_event_queue_ here, as the lock on
400417
// scheduling_mutex_ already ensures that no one can write to the event

0 commit comments

Comments
 (0)