Skip to content

Commit 521f1d8

Browse files
authored
Merge pull request #51 from lf-lang/port-graph
Port graph
2 parents 4c25fb4 + d72ad9d commit 521f1d8

22 files changed

+631
-216
lines changed

examples/ports/main.cc

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#include <iostream>
22

3-
#include "reactor-cpp/reactor-cpp.hh"
3+
#include <reactor-cpp/reactor-cpp.hh>
44

55
using namespace reactor;
66
using namespace std::chrono_literals;
@@ -38,6 +38,7 @@ class Counter : public Reactor {
3838
Output<int> count{"count", this}; // NOLINT
3939

4040
void assemble() override {
41+
std::cout << "assemble Counter" << std::endl << std::flush;
4142
r_trigger.declare_trigger(&trigger);
4243
r_trigger.declare_antidependency(&count);
4344
}
@@ -53,12 +54,15 @@ class Printer : public Reactor {
5354
Reaction r_value{"r_value", 1, this, [this]() { on_value(); }};
5455

5556
public:
56-
Input<int> value{"value", this}; // NOLINT
57-
57+
Input<int> value{"value", this}; // NOLINT
58+
Input<int> forward{"forward", this}; // NOLINT
5859
Printer(const std::string& name, Environment* env)
5960
: Reactor(name, env) {}
6061

61-
void assemble() override { r_value.declare_trigger(&value); }
62+
void assemble() override {
63+
std::cout << "assemble Printer" << std::endl << std::flush;
64+
r_value.declare_trigger(&value);
65+
}
6266

6367
void on_value() { std::cout << this->name() << ": " << *value.get() << std::endl; }
6468
};
@@ -84,6 +88,7 @@ class Adder : public Reactor {
8488
void add() {
8589
if (i1.is_present() && i2.is_present()) {
8690
sum.set(*i1.get() + *i2.get());
91+
std::cout << "setting sum" << std::endl;
8792
}
8893
}
8994
};
@@ -94,23 +99,31 @@ auto main() -> int {
9499
Trigger trigger1{"t1", &env, 1s};
95100
Counter counter1{"c1", &env};
96101
Printer printer1{"p1", &env};
97-
trigger1.trigger.bind_to(&counter1.trigger);
98-
counter1.count.bind_to(&printer1.value);
102+
103+
env.draw_connection(trigger1.trigger, counter1.trigger, ConnectionProperties{});
104+
env.draw_connection(counter1.count, printer1.value, ConnectionProperties{});
99105

100106
Trigger trigger2{"t2", &env, 2s};
101107
Counter counter2{"c2", &env};
102108
Printer printer2{"p2", &env};
103-
trigger2.trigger.bind_to(&counter2.trigger);
104-
counter2.count.bind_to(&printer2.value);
109+
110+
// trigger2.trigger.set_inward_binding(&counter2.trigger);
111+
// counter2.count.set_inward_binding(&printer2.value);
112+
env.draw_connection(trigger2.trigger, counter2.trigger, ConnectionProperties{});
113+
env.draw_connection(counter2.count, printer2.value, ConnectionProperties{});
105114

106115
Adder add{"add", &env};
107116
Printer p_add{"p_add", &env};
108-
counter1.count.bind_to(&add.i1);
109-
counter2.count.bind_to(&add.i2);
110-
add.sum.bind_to(&p_add.value);
111117

118+
env.draw_connection(counter1.count, add.i1, ConnectionProperties{});
119+
env.draw_connection(counter2.count, add.i2, ConnectionProperties{});
120+
env.draw_connection(add.sum, p_add.forward, ConnectionProperties{ConnectionType::Delayed, 10s, nullptr});
121+
env.draw_connection(p_add.forward, p_add.value, ConnectionProperties{ConnectionType::Delayed, 5s, nullptr});
122+
123+
std::cout << "assemble" << std::endl << std::flush;
112124
env.assemble();
113125

126+
std::cout << "optimize" << std::endl << std::flush;
114127
auto thread = env.startup();
115128
thread.join();
116129

examples/power_train/main.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,14 @@ auto main() -> int {
164164
Brake brakes{&env};
165165
Engine engine{&env};
166166

167+
env.draw_connection(left_pedal.angle, brake_control.angle, ConnectionProperties{});
168+
env.draw_connection(left_pedal.angle, engine_control.on_off, ConnectionProperties{});
169+
env.draw_connection(brake_control.force, brakes.force, ConnectionProperties{});
170+
env.draw_connection(right_pedal.angle, engine_control.angle, ConnectionProperties{});
171+
env.draw_connection(engine_control.check, right_pedal.check, ConnectionProperties{});
172+
env.draw_connection(engine_control.torque, engine.torque, ConnectionProperties{});
173+
167174
env.assemble();
168-
left_pedal.angle.bind_to(&brake_control.angle);
169-
left_pedal.on_off.bind_to(&engine_control.on_off);
170-
brake_control.force.bind_to(&brakes.force);
171-
right_pedal.angle.bind_to(&engine_control.angle);
172-
engine_control.check.bind_to(&right_pedal.check);
173-
engine_control.torque.bind_to(&engine.torque);
174175

175176
env.export_dependency_graph("graph.dot");
176177

include/reactor-cpp/action.hh

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "fwd.hh"
1515
#include "logical_time.hh"
1616
#include "reactor.hh"
17+
#include "reactor_element.hh"
1718
#include "time_barrier.hh"
1819
#include "value_ptr.hh"
1920

@@ -27,7 +28,7 @@ class BaseAction : public ReactorElement {
2728
private:
2829
std::set<Reaction*> triggers_{};
2930
std::set<Reaction*> schedulers_{};
30-
const Duration min_delay_{};
31+
const Duration min_delay_{0};
3132
const bool logical_{true};
3233
bool present_{false};
3334

@@ -48,22 +49,13 @@ protected:
4849
* indicates that the tag is safe to process.
4950
*/
5051
virtual auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
51-
const std::function<bool(void)>& abort_waiting) -> bool {
52-
reactor_assert(!logical_);
53-
reactor_assert(lock.owns_lock());
54-
return PhysicalTimeBarrier::acquire_tag(tag, lock, environment()->scheduler(), abort_waiting);
55-
}
52+
const std::function<bool(void)>& abort_waiting) -> bool;
5653

5754
BaseAction(const std::string& name, Reactor* container, bool logical, Duration min_delay)
5855
: ReactorElement(name, ReactorElement::Type::Action, container)
5956
, min_delay_(min_delay)
6057
, logical_(logical) {}
61-
BaseAction(const std::string& name, Environment* environment, bool logical, Duration min_delay)
62-
: ReactorElement(name, ReactorElement::Type::Action, environment)
63-
, min_delay_(min_delay)
64-
, logical_(logical) {
65-
environment->register_input_action(this);
66-
}
58+
BaseAction(const std::string& name, Environment* environment, bool logical, Duration min_delay);
6759

6860
public:
6961
[[nodiscard]] auto inline triggers() const noexcept -> const auto& { return triggers_; }
@@ -81,8 +73,8 @@ template <class T> class Action : public BaseAction {
8173
private:
8274
ImmutableValuePtr<T> value_ptr_{nullptr};
8375

84-
std::map<Tag, ImmutableValuePtr<T>> events_;
85-
std::mutex mutex_events_;
76+
std::map<Tag, ImmutableValuePtr<T>> events_{};
77+
std::mutex mutex_events_{};
8678

8779
protected:
8880
void setup() noexcept override;
@@ -150,12 +142,7 @@ public:
150142

151143
template <class T> class PhysicalAction : public Action<T> {
152144
public:
153-
PhysicalAction(const std::string& name, Reactor* container)
154-
: Action<T>(name, container, false, Duration::zero()) {
155-
// all physical actions act as input actions to the program as they can be
156-
// scheduled from external threads
157-
container->environment()->register_input_action(this);
158-
}
145+
PhysicalAction(const std::string& name, Reactor* container);
159146
};
160147

161148
template <class T> class LogicalAction : public Action<T> {
@@ -166,8 +153,8 @@ public:
166153

167154
class Timer : public BaseAction {
168155
private:
169-
const Duration offset_{};
170-
const Duration period_{};
156+
const Duration offset_{0};
157+
const Duration period_{0};
171158

172159
void cleanup() noexcept final;
173160

include/reactor-cpp/connection.hh

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,19 @@ protected:
3939
[[nodiscard]] auto upstream_port() -> auto* { return upstream_port_; }
4040
[[nodiscard]] auto upstream_port() const -> const auto* { return upstream_port_; }
4141

42-
virtual auto upstream_set_callback() noexcept -> PortCallback = 0;
43-
4442
public:
43+
virtual auto upstream_set_callback() noexcept -> PortCallback = 0;
4544
virtual void bind_upstream_port(Port<T>* port) {
4645
reactor_assert(upstream_port_ == nullptr);
4746
upstream_port_ = port;
48-
port->register_set_callback(upstream_set_callback());
4947
}
5048

49+
virtual void bind_downstream_ports(const std::vector<BasePort*>& ports) {
50+
// with C++23 we can use insert_rage here
51+
for ([[maybe_unused]] auto* port : ports) { // NOLINT
52+
this->downstream_ports_.insert(static_cast<Port<T>*>(port));
53+
}
54+
}
5155
virtual void bind_downstream_port(Port<T>* port) {
5256
[[maybe_unused]] bool result = this->downstream_ports_.insert(port).second;
5357
reactor_assert(result);
@@ -110,14 +114,14 @@ protected:
110114

111115
EnclaveConnection(const std::string& name, Environment* enclave, const Duration& delay)
112116
: BaseDelayedConnection<T>(name, enclave, false, delay)
113-
, log_{this->fqn()}
114-
, logical_time_barrier_(enclave->scheduler()) {}
117+
, logical_time_barrier_(enclave->scheduler())
118+
, log_{this->fqn()} {}
115119

116120
public:
117121
EnclaveConnection(const std::string& name, Environment* enclave)
118122
: BaseDelayedConnection<T>(name, enclave, false, Duration::zero())
119-
, log_{this->fqn()}
120-
, logical_time_barrier_(enclave->scheduler()) {}
123+
, logical_time_barrier_(enclave->scheduler())
124+
, log_{this->fqn()} {};
121125

122126
inline auto upstream_set_callback() noexcept -> PortCallback override {
123127
return [this](const BasePort& port) {
@@ -128,7 +132,7 @@ public:
128132
// of the upstream port. Hence, we can retrieve the current tag directly
129133
// without locking.
130134
auto tag = Tag::from_logical_time(scheduler->logical_time());
131-
bool result{false};
135+
[[maybe_unused]] bool result{false}; // NOLINT value set is not used
132136
if constexpr (std::is_same<T, void>::value) {
133137
result = this->schedule_at(tag);
134138
} else {
@@ -186,21 +190,21 @@ public:
186190
: EnclaveConnection<T>(name, enclave, delay) {}
187191

188192
inline auto upstream_set_callback() noexcept -> PortCallback override {
189-
return [this](const BasePort& port) {
190-
// We know that port must be of type Port<T>
191-
auto& typed_port = reinterpret_cast<const Port<T>&>(port); // NOLINT
192-
const auto* scheduler = port.environment()->scheduler();
193+
return [&](const BasePort& port) { // NOLINT unused this
193194
// This callback will be called from a reaction executing in the context
194195
// of the upstream port. Hence, we can retrieve the current tag directly
195196
// without locking.
196-
auto tag = Tag::from_logical_time(scheduler->logical_time()).delay(this->min_delay());
197-
bool result{false};
197+
198198
if constexpr (std::is_same<T, void>::value) {
199-
result = this->schedule_at(tag);
199+
this->schedule_at(
200+
Tag::from_logical_time(port.environment()->scheduler()->logical_time()).delay(this->min_delay()));
200201
} else {
201-
result = this->schedule_at(std::move(typed_port.get()), tag);
202+
// We know that port must be of type Port<T>
203+
auto& typed_port = reinterpret_cast<const Port<T>&>(port); // NOLINT
204+
this->schedule_at(
205+
std::move(typed_port.get()),
206+
Tag::from_logical_time(port.environment()->scheduler()->logical_time()).delay(this->min_delay()));
202207
}
203-
reactor_assert(result);
204208
};
205209
}
206210

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (C) 2023 TU Dresden
3+
* All rights reserved.
4+
*
5+
* Authors:
6+
* Tassilo Tanneberger
7+
*/
8+
9+
#ifndef REACTOR_CPP_CONNECTION_PROPERTIES_HH
10+
#define REACTOR_CPP_CONNECTION_PROPERTIES_HH
11+
12+
#include "fwd.hh"
13+
#include "logical_time.hh"
14+
15+
namespace reactor {
16+
17+
enum ConnectionType { Normal, Delayed, Enclaved, Physical, DelayedEnclaved, PhysicalEnclaved, Plugin, Invalid };
18+
struct ConnectionProperties {
19+
ConnectionType type_ = ConnectionType::Normal;
20+
Duration delay_{0};
21+
Environment* enclave_{nullptr};
22+
23+
auto operator<(const ConnectionProperties& elem2) const noexcept -> bool {
24+
return (this->type_ < elem2.type_) || (this->type_ == elem2.type_ && this->delay_ < elem2.delay_);
25+
}
26+
27+
auto operator==(const ConnectionProperties& elem2) const noexcept -> bool {
28+
return this->type_ == elem2.type_ && this->delay_ == elem2.delay_;
29+
}
30+
};
31+
32+
} // namespace reactor
33+
34+
#endif // REACTOR_CPP_CONNECTION_PROPERTIES_HH

include/reactor-cpp/environment.hh

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
#include <string>
1414
#include <vector>
1515

16+
#include "connection_properties.hh"
1617
#include "fwd.hh"
18+
#include "graph.hh"
1719
#include "reactor-cpp/logging.hh"
1820
#include "reactor-cpp/time.hh"
19-
#include "reactor.hh"
2021
#include "scheduler.hh"
2122

2223
namespace reactor {
@@ -58,6 +59,9 @@ private:
5859

5960
const Duration timeout_{};
6061

62+
Graph<BasePort*, ConnectionProperties> graph_{};
63+
Graph<BasePort*, ConnectionProperties> optimized_graph_{};
64+
6165
void build_dependency_graph(Reactor* reactor);
6266
void calculate_indexes();
6367

@@ -72,7 +76,24 @@ public:
7276

7377
auto name() -> const std::string& { return name_; }
7478

79+
// this method draw a connection between two graph elements with some properties
80+
template <class T> void draw_connection(Port<T>& source, Port<T>& sink, ConnectionProperties properties) {
81+
this->draw_connection(&source, &sink, properties);
82+
}
83+
84+
template <class T> void draw_connection(Port<T>* source, Port<T>* sink, ConnectionProperties properties) {
85+
if (top_environment_ == nullptr || top_environment_ == this) {
86+
log::Debug() << "drawing connection: " << source << " --> " << sink;
87+
graph_.add_edge(source, sink, properties);
88+
} else {
89+
top_environment_->draw_connection(source, sink, properties);
90+
}
91+
}
92+
93+
void optimize();
94+
7595
void register_reactor(Reactor* reactor);
96+
void register_port(BasePort* port) noexcept;
7697
void register_input_action(BaseAction* action);
7798
void assemble();
7899
auto startup() -> std::thread;

0 commit comments

Comments
 (0)