Skip to content

Commit 83e13b3

Browse files
committed
WIP: quantity try 2
1 parent 8ad2249 commit 83e13b3

File tree

17 files changed

+847
-271
lines changed

17 files changed

+847
-271
lines changed

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -592,13 +592,12 @@ async def main() -> None: # (6)!
592592
[_run]: #the-_run-method
593593
"""
594594

595-
from ..timeseries._resampling import ResamplerConfig
596595
from ._actor import Actor
597596
from ._background_service import BackgroundService
598597
from ._channel_registry import ChannelRegistry
599598
from ._config_managing import ConfigManagingActor
600599
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
601-
from ._resampling import ComponentMetricsResamplingActor
600+
from ._resampling import ComponentMetricsResamplingActor, ResamplingActorConfig
602601
from ._run_utils import run
603602

604603
__all__ = [
@@ -609,6 +608,6 @@ async def main() -> None: # (6)!
609608
"ComponentMetricsResamplingActor",
610609
"ConfigManagingActor",
611610
"DataSourcingActor",
612-
"ResamplerConfig",
611+
"ResamplingActorConfig",
613612
"run",
614613
]

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
MeterData,
2121
)
2222
from ...timeseries import Sample
23-
from ...timeseries._quantities import Quantity
2423
from .._channel_registry import ChannelRegistry
2524
from ._component_metric_request import ComponentMetricRequest
2625

@@ -306,7 +305,7 @@ def _get_metric_senders(
306305
self,
307306
category: ComponentCategory,
308307
requests: dict[ComponentMetricId, list[ComponentMetricRequest]],
309-
) -> list[tuple[Callable[[Any], float], list[Sender[Sample[Quantity]]]]]:
308+
) -> list[tuple[Callable[[Any], float], list[Sender[Sample[float]]]]]:
310309
"""Get channel senders from the channel registry for each requested metric.
311310
312311
Args:
@@ -357,9 +356,7 @@ def process_msg(data: Any) -> None:
357356
tasks = []
358357
for extractor, senders in stream_senders:
359358
for sender in senders:
360-
tasks.append(
361-
sender.send(Sample(data.timestamp, Quantity(extractor(data))))
362-
)
359+
tasks.append(sender.send(Sample(data.timestamp, extractor(data))))
363360
asyncio.gather(*tasks)
364361
nonlocal pending_messages
365362
pending_messages -= 1

src/frequenz/sdk/actor/_resampling.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
import asyncio
88
import dataclasses
99
import logging
10+
from typing import Callable
1011

1112
from frequenz.channels import Receiver, Sender
1213

1314
from .._internal._asyncio import cancel_and_await
14-
from ..timeseries import Sample
15-
from ..timeseries._quantities import Quantity
1615
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
1716
from ._actor import Actor
1817
from ._channel_registry import ChannelRegistry
@@ -21,6 +20,13 @@
2120
_logger = logging.getLogger(__name__)
2221

2322

23+
class ResamplingActorConfig(ResamplerConfig[float]):
24+
"""Configuration for the resampling actor."""
25+
26+
value_constructor: Callable[[float], float] = float
27+
"""The constructor to use to create new sample values."""
28+
29+
2430
class ComponentMetricsResamplingActor(Actor):
2531
"""An actor to resample microgrid component metrics."""
2632

@@ -30,7 +36,7 @@ def __init__( # pylint: disable=too-many-arguments
3036
channel_registry: ChannelRegistry,
3137
data_sourcing_request_sender: Sender[ComponentMetricRequest],
3238
resampling_request_receiver: Receiver[ComponentMetricRequest],
33-
config: ResamplerConfig,
39+
config: ResamplingActorConfig,
3440
name: str | None = None,
3541
) -> None:
3642
"""Initialize an instance.
@@ -55,7 +61,7 @@ def __init__( # pylint: disable=too-many-arguments
5561
self._resampling_request_receiver: Receiver[
5662
ComponentMetricRequest
5763
] = resampling_request_receiver
58-
self._resampler: Resampler = Resampler(config)
64+
self._resampler: Resampler[float] = Resampler(config)
5965
self._active_req_channels: set[str] = set()
6066

6167
async def _subscribe(self, request: ComponentMetricRequest) -> None:
@@ -83,10 +89,7 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None:
8389
# exceptions to report errors.
8490
sender = self._channel_registry.new_sender(request_channel_name)
8591

86-
async def sink_adapter(sample: Sample[Quantity]) -> None:
87-
await sender.send(sample)
88-
89-
self._resampler.add_timeseries(request_channel_name, receiver, sink_adapter)
92+
self._resampler.add_timeseries(request_channel_name, receiver, sender.send)
9093

9194
async def _process_resampling_requests(self) -> None:
9295
"""Process resampling data requests."""
@@ -109,8 +112,6 @@ async def _run(self) -> None:
109112
Raises:
110113
RuntimeError: If there is some unexpected error while resampling or
111114
handling requests.
112-
113-
[//]: # (# noqa: DAR401 error)
114115
"""
115116
tasks_to_cancel: set[asyncio.Task[None]] = set()
116117
try:

src/frequenz/sdk/microgrid/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@
122122
to limit the charge power of individual EV Chargers.
123123
""" # noqa: D205, D400
124124

125-
from ..actor import ResamplerConfig
125+
from ..actor import ResamplingActorConfig
126126
from . import _data_pipeline, client, component, connection_manager, metadata
127127
from ._data_pipeline import (
128128
battery_pool,
@@ -133,7 +133,9 @@
133133
)
134134

135135

136-
async def initialize(host: str, port: int, resampler_config: ResamplerConfig) -> None:
136+
async def initialize(
137+
host: str, port: int, resampler_config: ResamplingActorConfig
138+
) -> None:
137139
"""Initialize the microgrid connection manager and the data pipeline.
138140
139141
Args:

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class _DataPipeline: # pylint: disable=too-many-instance-attributes
7878

7979
def __init__(
8080
self,
81-
resampler_config: ResamplerConfig,
81+
resampler_config: ResamplerConfig[float],
8282
) -> None:
8383
"""Create a `DataPipeline` instance.
8484
@@ -384,7 +384,7 @@ async def _stop(self) -> None:
384384
_DATA_PIPELINE: _DataPipeline | None = None
385385

386386

387-
async def initialize(resampler_config: ResamplerConfig) -> None:
387+
async def initialize(resampler_config: ResamplerConfig[float]) -> None:
388388
"""Initialize a `DataPipeline` instance.
389389
390390
Args:

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,37 @@
1010
from collections.abc import Callable, Iterator
1111
from dataclasses import dataclass
1212
from datetime import datetime, timezone
13-
from typing import Generic, Self, overload
13+
from typing import Generic, Protocol, Self, SupportsFloat, overload
1414

15-
from ._quantities import Power, QuantityT
15+
from ._quantities import Power
1616

1717
UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
1818
"""The UNIX epoch (in UTC)."""
1919

2020

21+
class Comparable(Protocol):
22+
def __lt__(self, other: Self) -> bool:
23+
...
24+
25+
def __gt__(self, other: Self) -> bool:
26+
...
27+
28+
def __le__(self, other: Self) -> bool:
29+
...
30+
31+
def __ge__(self, other: Self) -> bool:
32+
...
33+
34+
35+
_T = typing.TypeVar("_T")
36+
SupportsFloatT = typing.TypeVar("SupportsFloatT", bound=SupportsFloat)
37+
"""Type variable for types that support conversion to float."""
38+
39+
ComparableT = typing.TypeVar("ComparableT", bound=Comparable)
40+
41+
2142
@dataclass(frozen=True, order=True)
22-
class Sample(Generic[QuantityT]):
43+
class Sample(Generic[_T]):
2344
"""A measurement taken at a particular point in time.
2445
2546
The `value` could be `None` if a component is malfunctioning or data is
@@ -30,12 +51,12 @@ class Sample(Generic[QuantityT]):
3051
timestamp: datetime
3152
"""The time when this sample was generated."""
3253

33-
value: QuantityT | None = None
54+
value: _T | None = None
3455
"""The value of this sample."""
3556

3657

3758
@dataclass(frozen=True)
38-
class Sample3Phase(Generic[QuantityT]):
59+
class Sample3Phase(Generic[ComparableT]):
3960
"""A 3-phase measurement made at a particular point in time.
4061
4162
Each of the `value` fields could be `None` if a component is malfunctioning
@@ -46,16 +67,16 @@ class Sample3Phase(Generic[QuantityT]):
4667

4768
timestamp: datetime
4869
"""The time when this sample was generated."""
49-
value_p1: QuantityT | None
70+
value_p1: ComparableT | None
5071
"""The value of the 1st phase in this sample."""
5172

52-
value_p2: QuantityT | None
73+
value_p2: ComparableT | None
5374
"""The value of the 2nd phase in this sample."""
5475

55-
value_p3: QuantityT | None
76+
value_p3: ComparableT | None
5677
"""The value of the 3rd phase in this sample."""
5778

58-
def __iter__(self) -> Iterator[QuantityT | None]:
79+
def __iter__(self) -> Iterator[ComparableT | None]:
5980
"""Return an iterator that yields values from each of the phases.
6081
6182
Yields:
@@ -66,14 +87,14 @@ def __iter__(self) -> Iterator[QuantityT | None]:
6687
yield self.value_p3
6788

6889
@overload
69-
def max(self, default: QuantityT) -> QuantityT:
90+
def max(self, default: ComparableT) -> ComparableT:
7091
...
7192

7293
@overload
73-
def max(self, default: None = None) -> QuantityT | None:
94+
def max(self, default: None = None) -> ComparableT | None:
7495
...
7596

76-
def max(self, default: QuantityT | None = None) -> QuantityT | None:
97+
def max(self, default: ComparableT | None = None) -> ComparableT | None:
7798
"""Return the max value among all phases, or default if they are all `None`.
7899
79100
Args:
@@ -84,21 +105,21 @@ def max(self, default: QuantityT | None = None) -> QuantityT | None:
84105
"""
85106
if not any(self):
86107
return default
87-
value: QuantityT = functools.reduce(
108+
value: ComparableT = functools.reduce(
88109
lambda x, y: x if x > y else y,
89110
filter(None, self),
90111
)
91112
return value
92113

93114
@overload
94-
def min(self, default: QuantityT) -> QuantityT:
115+
def min(self, default: ComparableT) -> ComparableT:
95116
...
96117

97118
@overload
98-
def min(self, default: None = None) -> QuantityT | None:
119+
def min(self, default: None = None) -> ComparableT | None:
99120
...
100121

101-
def min(self, default: QuantityT | None = None) -> QuantityT | None:
122+
def min(self, default: ComparableT | None = None) -> ComparableT | None:
102123
"""Return the min value among all phases, or default if they are all `None`.
103124
104125
Args:
@@ -109,16 +130,16 @@ def min(self, default: QuantityT | None = None) -> QuantityT | None:
109130
"""
110131
if not any(self):
111132
return default
112-
value: QuantityT = functools.reduce(
133+
value: ComparableT = functools.reduce(
113134
lambda x, y: x if x < y else y,
114135
filter(None, self),
115136
)
116137
return value
117138

118139
def map(
119140
self,
120-
function: Callable[[QuantityT], QuantityT],
121-
default: QuantityT | None = None,
141+
function: Callable[[ComparableT], ComparableT],
142+
default: ComparableT | None = None,
122143
) -> Self:
123144
"""Apply the given function on each of the phase values and return the result.
124145
@@ -140,9 +161,6 @@ def map(
140161
)
141162

142163

143-
_T = typing.TypeVar("_T")
144-
145-
146164
@dataclass(frozen=True)
147165
class Bounds(Generic[_T]):
148166
"""Lower and upper bound values."""

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,22 @@
99
import math
1010
from collections.abc import Sequence
1111
from datetime import datetime, timedelta
12-
from typing import SupportsIndex, overload
12+
from typing import Generic, SupportsIndex, overload
1313

1414
import numpy as np
1515
from frequenz.channels import Broadcast, Receiver, Sender
1616
from numpy.typing import ArrayLike
1717

1818
from ..actor._background_service import BackgroundService
1919
from ._base_types import UNIX_EPOCH, Sample
20-
from ._quantities import Quantity
20+
from ._quantities import QuantityT
2121
from ._resampling import Resampler, ResamplerConfig
2222
from ._ringbuffer import OrderedRingBuffer
2323

2424
_logger = logging.getLogger(__name__)
2525

2626

27-
class MovingWindow(BackgroundService):
27+
class MovingWindow(BackgroundService, Generic[QuantityT]):
2828
"""
2929
A data window that moves with the latest datapoints of a data stream.
3030
@@ -130,9 +130,9 @@ async def run() -> None:
130130
def __init__( # pylint: disable=too-many-arguments
131131
self,
132132
size: timedelta,
133-
resampled_data_recv: Receiver[Sample[Quantity]],
133+
resampled_data_recv: Receiver[Sample[QuantityT]],
134134
input_sampling_period: timedelta,
135-
resampler_config: ResamplerConfig | None = None,
135+
resampler_config: ResamplerConfig[QuantityT] | None = None,
136136
align_to: datetime = UNIX_EPOCH,
137137
*,
138138
name: str | None = None,
@@ -166,8 +166,8 @@ def __init__( # pylint: disable=too-many-arguments
166166

167167
self._sampling_period = input_sampling_period
168168

169-
self._resampler: Resampler | None = None
170-
self._resampler_sender: Sender[Sample[Quantity]] | None = None
169+
self._resampler: Resampler[QuantityT] | None = None
170+
self._resampler_sender: Sender[Sample[QuantityT]] | None = None
171171

172172
if resampler_config:
173173
assert (
@@ -182,7 +182,7 @@ def __init__( # pylint: disable=too-many-arguments
182182
size.total_seconds() / self._sampling_period.total_seconds()
183183
)
184184

185-
self._resampled_data_recv = resampled_data_recv
185+
self._resampled_data_recv: Receiver[Sample[QuantityT]] = resampled_data_recv
186186
self._buffer = OrderedRingBuffer(
187187
np.empty(shape=num_samples, dtype=float),
188188
sampling_period=self._sampling_period,
@@ -341,14 +341,16 @@ def _configure_resampler(self) -> None:
341341
"""Configure the components needed to run the resampler."""
342342
assert self._resampler is not None
343343

344-
async def sink_buffer(sample: Sample[Quantity]) -> None:
344+
async def sink_buffer(sample: Sample[QuantityT]) -> None:
345345
if sample.value is not None:
346346
self._buffer.update(sample)
347347

348-
resampler_channel = Broadcast[Sample[Quantity]]("average")
348+
resampler_channel = Broadcast[Sample[QuantityT]]("average")
349349
self._resampler_sender = resampler_channel.new_sender()
350350
self._resampler.add_timeseries(
351-
"avg", resampler_channel.new_receiver(), sink_buffer
351+
"avg",
352+
resampler_channel.new_receiver(),
353+
sink_buffer,
352354
)
353355
self._tasks.add(
354356
asyncio.create_task(self._resampler.resample(), name="resample")

0 commit comments

Comments
 (0)