Skip to content

Commit a386b6c

Browse files
authored
Document in detail how events at duplicate timestamps are handled (#348)
Signed-off-by: Adam Glustein <[email protected]>
1 parent 4153d01 commit a386b6c

File tree

3 files changed

+70
-2
lines changed

3 files changed

+70
-2
lines changed

docs/wiki/api-references/Base-Nodes-API.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ csp.unroll(x: ts[['T']]) → ts['T']
174174
Given a timeseries of a *list* of values, unroll will "unroll" the values in the list into a timeseries of the elements.
175175
`unroll` will ensure to preserve the order across all list ticks.
176176
Ticks will be unrolled in subsequent engine cycles.
177+
For a detailed explanation of this behavior, see the documentation on [duplicate timestamps](Execution-Modes#handling-duplicate-timestamps).
177178

178179
## `csp.collect`
179180

docs/wiki/concepts/Common-Mistakes.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ from typing import List
7474
def next_movie_showing(show_times: ts[List[datetime]]) -> ts[datetime]:
7575
next_showing = None
7676
for time in show_times:
77-
if time >= csp.now(): # list may include some shows today that have already past, so let's filter those out
77+
if time >= datetime.now(): # list may include some shows today that have already past, so let's filter those out
7878
if next_showing is None or time < next_showing:
7979
next_showing = time
8080

docs/wiki/concepts/Execution-Modes.md

+68-1
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ All inputs in simulation are driven off the provided timestamped data of its inp
66
In realtime mode, the engine runs in wallclock time as of "now".
77
Realtime engines can get data from realtime adapters which source data on separate threads and pass them through to the engine (ie think of activeMQ events happening on an activeMQ thread and being passed along to the engine in "realtime").
88

9-
Since engines can run in both simulated and realtime mode, users should **always** use **`csp.now()`** to get the current time in `csp.node`s.
9+
Since engines can run in both simulated and realtime mode, users should **always** use **`csp.now()`** to get the current time in a `csp.node`.
1010

1111
## Table of Contents
1212

1313
- [Table of Contents](#table-of-contents)
1414
- [Simulation Mode](#simulation-mode)
1515
- [Realtime Mode](#realtime-mode)
1616
- [csp.PushMode](#csppushmode)
17+
- [Handling Duplicate Timestamps](#handling-duplicate-timestamps)
1718
- [Realtime Group Event Synchronization](#realtime-group-event-synchronization)
1819

1920
## Simulation Mode
@@ -50,6 +51,72 @@ When consuming data from input adapters there are three choices on how one can c
5051
| **BURST** | Simulation | all ticks from input source with duplicate timestamps (on the same timeseries) will tick once with a list of all values |
5152
| | Realtime | all ticks that occurred since previous engine cycle will tick once with a list of all the values |
5253

54+
## Handling duplicate timestamps
55+
56+
In `csp`, there can be multiple engine cycles that occur at the same engine time. This is often the case when using nodes with internal alarms (e.g. [`csp.unroll`](Base-Nodes-API#cspunroll)) or using feedback edges ([`csp.feedback`](Feedback-and-Delayed-Edge#cspfeedback)).
57+
If multiple events are scheduled at the same timestamp on a single time-series edge, they will be executed on separate cycles *in the order* they were scheduled. For example, consider the code snippet below:
58+
59+
```python
60+
import csp
61+
from csp import ts
62+
from datetime import datetime, timedelta
63+
64+
@csp.node
65+
def ticks_n_times(x: ts[int], n: int) -> ts[int]:
66+
# Ticks out a value n times, incrementing it each time
67+
with csp.alarms():
68+
alarm = csp.alarm(int)
69+
70+
if csp.ticked(x):
71+
for i in range(n):
72+
csp.schedule_alarm(alarm, timedelta(), x+i)
73+
74+
if csp.ticked(alarm):
75+
return alarm
76+
77+
@csp.graph
78+
def duplicate_timestamps():
79+
v = csp.const(1)
80+
csp.print('ticks_once', ticks_n_times(v, 1))
81+
csp.print('ticks_twice', ticks_n_times(v, 2))
82+
csp.print('ticks_thrice', ticks_n_times(v, 3))
83+
84+
csp.run(duplicate_timestamps, starttime=datetime(2020,1,1))
85+
```
86+
87+
When we run this graph, the output is:
88+
89+
```raw
90+
2020-01-01 00:00:00 ticks_once:1
91+
2020-01-01 00:00:00 ticks_twice:1
92+
2020-01-01 00:00:00 ticks_thrice:1
93+
2020-01-01 00:00:00 ticks_twice:2
94+
2020-01-01 00:00:00 ticks_thrice:2
95+
2020-01-01 00:00:00 ticks_thrice:3
96+
```
97+
98+
A real life example is when using `csp.unroll` to tick out a list of values on separate engine cycles. If we were to use `csp.sample` on the output, we would get the *first* value that is unrolled at each timestamp. Why?
99+
The event that is scheduled on the sampling timer is its first (and only) event at that time; thus, it is executed on the first engine cycle, and samples the first unrolled value.
100+
101+
```python
102+
def sampling_unroll():
103+
u = csp.unroll(csp.const.using(T=[int])([1, 2, 3]))
104+
s = csp.sample(csp.const(True), u)
105+
csp.print('unrolled', u)
106+
csp.print('sampled', s)
107+
108+
csp.run(sampling_unroll, starttime=datetime(2020,1,1))
109+
```
110+
111+
Output:
112+
113+
```raw
114+
2020-01-01 00:00:00 unrolled:1
115+
2020-01-01 00:00:00 sampled:1
116+
2020-01-01 00:00:00 unrolled:2
117+
2020-01-01 00:00:00 unrolled:3
118+
```
119+
53120
## Realtime Group Event Synchronization
54121

55122
The CSP framework supports properly synchronizing events across multiple timeseries that are sourced from the same realtime adapter.

0 commit comments

Comments
 (0)