Skip to content

Commit 3ed02e6

Browse files
authored
feat: add async source transformer (#230)
1 parent 0d87b8f commit 3ed02e6

File tree

10 files changed

+724
-0
lines changed

10 files changed

+724
-0
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
####################################################################################################
2+
# builder: install needed dependencies
3+
####################################################################################################
4+
5+
FROM python:3.10-slim-bullseye AS builder
6+
7+
ENV PYTHONFAULTHANDLER=1 \
8+
PYTHONUNBUFFERED=1 \
9+
PYTHONHASHSEED=random \
10+
PIP_NO_CACHE_DIR=on \
11+
PIP_DISABLE_PIP_VERSION_CHECK=on \
12+
PIP_DEFAULT_TIMEOUT=100 \
13+
POETRY_VERSION=1.2.2 \
14+
POETRY_HOME="/opt/poetry" \
15+
POETRY_VIRTUALENVS_IN_PROJECT=true \
16+
POETRY_NO_INTERACTION=1 \
17+
PYSETUP_PATH="/opt/pysetup"
18+
19+
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/sourcetransform/async_event_time_filter"
20+
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
21+
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
22+
23+
RUN apt-get update \
24+
&& apt-get install --no-install-recommends -y \
25+
curl \
26+
wget \
27+
# deps for building python deps
28+
build-essential \
29+
&& apt-get install -y git \
30+
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
31+
\
32+
# install dumb-init
33+
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
34+
&& chmod +x /dumb-init \
35+
&& curl -sSL https://install.python-poetry.org | python3 -
36+
37+
####################################################################################################
38+
# udf: used for running the udf vertices
39+
####################################################################################################
40+
FROM builder AS udf
41+
42+
WORKDIR $PYSETUP_PATH
43+
COPY ./ ./
44+
45+
WORKDIR $EXAMPLE_PATH
46+
RUN poetry lock
47+
RUN poetry install --no-cache --no-root && \
48+
rm -rf ~/.cache/pypoetry/
49+
50+
RUN chmod +x entry.sh
51+
52+
ENTRYPOINT ["/dumb-init", "--"]
53+
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]
54+
55+
EXPOSE 5000
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/async-mapt-event-time-filter:${TAG}
4+
DOCKER_FILE_PATH = examples/sourcetransform/async_event_time_filter/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
poetry update -vv
9+
10+
.PHONY: image-push
11+
image-push: update
12+
cd ../../../ && docker buildx build \
13+
-f ${DOCKER_FILE_PATH} \
14+
-t ${IMAGE_REGISTRY} \
15+
--platform linux/amd64,linux/arm64 . --push
16+
17+
.PHONY: image
18+
image: update
19+
cd ../../../ && docker build \
20+
-f ${DOCKER_FILE_PATH} \
21+
-t ${IMAGE_REGISTRY} .
22+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}:${TAG}; fi
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -eux
3+
4+
python example.py
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import datetime
2+
import logging
3+
4+
from pynumaflow.sourcetransformer import Messages, Message, Datum
5+
from pynumaflow.sourcetransformer import SourceTransformAsyncServer
6+
7+
"""
8+
This is a simple User Defined Function example which receives a message, applies the following
9+
data transformation, and returns the message.
10+
If the message event time is before year 2022, drop the message with event time unchanged.
11+
If it's within year 2022, update the tag to "within_year_2022" and
12+
update the message event time to Jan 1st 2022.
13+
Otherwise, (exclusively after year 2022), update the tag to "after_year_2022" and update the
14+
message event time to Jan 1st 2023.
15+
"""
16+
17+
january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
18+
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)
19+
20+
21+
async def my_handler(keys: list[str], datum: Datum) -> Messages:
22+
val = datum.value
23+
event_time = datum.event_time
24+
messages = Messages()
25+
26+
if event_time < january_first_2022:
27+
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
28+
messages.append(Message.to_drop(event_time))
29+
elif event_time < january_first_2023:
30+
logging.info(
31+
"Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
32+
event_time,
33+
)
34+
messages.append(
35+
Message(value=val, event_time=january_first_2022, tags=["within_year_2022"])
36+
)
37+
else:
38+
logging.info(
39+
"Got event time:%s, it is after year 2022, so forwarding to after_year_2022", event_time
40+
)
41+
messages.append(Message(value=val, event_time=january_first_2023, tags=["after_year_2022"]))
42+
43+
return messages
44+
45+
46+
if __name__ == "__main__":
47+
grpc_server = SourceTransformAsyncServer(my_handler)
48+
grpc_server.start()
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[tool.poetry]
2+
name = "async-mapt-event-time-filter"
3+
version = "0.2.4"
4+
description = ""
5+
authors = ["Numaflow developers"]
6+
readme = "README.md"
7+
packages = [{include = "mapt_event_time_filter"}]
8+
9+
[tool.poetry.dependencies]
10+
python = ">=3.9, <3.12"
11+
pynumaflow = { path = "../../../"}
12+
13+
[build-system]
14+
requires = ["poetry-core"]
15+
build-backend = "poetry.core.masonry.api"

pynumaflow/sourcetransformer/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
)
88
from pynumaflow.sourcetransformer.multiproc_server import SourceTransformMultiProcServer
99
from pynumaflow.sourcetransformer.server import SourceTransformServer
10+
from pynumaflow.sourcetransformer.async_server import SourceTransformAsyncServer
1011

1112
__all__ = [
1213
"Message",
@@ -16,4 +17,5 @@
1617
"SourceTransformServer",
1718
"SourceTransformer",
1819
"SourceTransformMultiProcServer",
20+
"SourceTransformAsyncServer",
1921
]

pynumaflow/sourcetransformer/_dtypes.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from dataclasses import dataclass
44
from datetime import datetime
55
from typing import TypeVar, Callable, Union, Optional
6+
from collections.abc import Awaitable
67
from warnings import warn
78

89
from pynumaflow._constants import DROP
@@ -210,3 +211,9 @@ def handler(self, keys: list[str], datum: Datum) -> Messages:
210211
# SourceTransformCallable is the type of the handler function for the
211212
# Source Transformer UDFunction.
212213
SourceTransformCallable = Union[SourceTransformHandler, SourceTransformer]
214+
215+
216+
# SourceTransformAsyncCallable is a callable which can be used as a handler
217+
# for the Asynchronous Transformer UDF
218+
SourceTransformHandlerAsyncHandlerCallable = Callable[[list[str], Datum], Awaitable[Messages]]
219+
SourceTransformAsyncCallable = Union[SourceTransformer, SourceTransformHandlerAsyncHandlerCallable]
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
import aiorun
2+
import grpc
3+
4+
from pynumaflow._constants import (
5+
NUM_THREADS_DEFAULT,
6+
MAX_MESSAGE_SIZE,
7+
MAX_NUM_THREADS,
8+
SOURCE_TRANSFORMER_SOCK_PATH,
9+
SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
10+
)
11+
from pynumaflow.info.types import (
12+
ServerInfo,
13+
MINIMUM_NUMAFLOW_VERSION,
14+
ContainerType,
15+
)
16+
from pynumaflow.proto.sourcetransformer import transform_pb2_grpc
17+
from pynumaflow.shared.server import (
18+
NumaflowServer,
19+
start_async_server,
20+
)
21+
from pynumaflow.sourcetransformer._dtypes import SourceTransformAsyncCallable
22+
from pynumaflow.sourcetransformer.servicer._async_servicer import SourceTransformAsyncServicer
23+
24+
25+
class SourceTransformAsyncServer(NumaflowServer):
26+
"""
27+
Create a new grpc Source Transformer Server instance.
28+
A new servicer instance is created and attached to the server.
29+
The server instance is returned.
30+
Args:
31+
source_transform_instance: The source transformer instance to be used for
32+
Source Transformer UDF
33+
sock_path: The UNIX socket path to be used for the server
34+
max_message_size: The max message size in bytes the server can receive and send
35+
max_threads: The max number of threads to be spawned;
36+
defaults to 4 and max capped at 16
37+
38+
Example Invocation:
39+
40+
import datetime
41+
import logging
42+
43+
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer
44+
# This is a simple User Defined Function example which receives a message,
45+
# applies the following
46+
# data transformation, and returns the message.
47+
# If the message event time is before year 2022, drop the message with event time unchanged.
48+
# If it's within year 2022, update the tag to "within_year_2022" and
49+
# update the message event time to Jan 1st 2022.
50+
# Otherwise, (exclusively after year 2022), update the tag to
51+
# "after_year_2022" and update the
52+
# message event time to Jan 1st 2023.
53+
54+
january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
55+
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)
56+
57+
58+
async def my_handler(keys: list[str], datum: Datum) -> Messages:
59+
val = datum.value
60+
event_time = datum.event_time
61+
messages = Messages()
62+
63+
if event_time < january_first_2022:
64+
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
65+
messages.append(Message.to_drop(event_time))
66+
elif event_time < january_first_2023:
67+
logging.info(
68+
"Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
69+
event_time,
70+
)
71+
messages.append(
72+
Message(value=val, event_time=january_first_2022,
73+
tags=["within_year_2022"])
74+
)
75+
else:
76+
logging.info(
77+
"Got event time:%s, it is after year 2022, so forwarding to
78+
after_year_2022", event_time
79+
)
80+
messages.append(Message(value=val, event_time=january_first_2023,
81+
tags=["after_year_2022"]))
82+
83+
return messages
84+
85+
86+
if __name__ == "__main__":
87+
grpc_server = SourceTransformAsyncServer(my_handler)
88+
grpc_server.start()
89+
"""
90+
91+
def __init__(
92+
self,
93+
source_transform_instance: SourceTransformAsyncCallable,
94+
sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
95+
max_message_size=MAX_MESSAGE_SIZE,
96+
max_threads=NUM_THREADS_DEFAULT,
97+
server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
98+
):
99+
"""
100+
Create a new grpc Asynchronous Map Server instance.
101+
A new servicer instance is created and attached to the server.
102+
The server instance is returned.
103+
Args:
104+
mapper_instance: The mapper instance to be used for Map UDF
105+
sock_path: The UNIX socket path to be used for the server
106+
max_message_size: The max message size in bytes the server can receive and send
107+
max_threads: The max number of threads to be spawned;
108+
defaults to 4 and max capped at 16
109+
"""
110+
self.sock_path = f"unix://{sock_path}"
111+
self.max_threads = min(max_threads, MAX_NUM_THREADS)
112+
self.max_message_size = max_message_size
113+
self.server_info_file = server_info_file
114+
115+
self.source_transform_instance = source_transform_instance
116+
117+
self._server_options = [
118+
("grpc.max_send_message_length", self.max_message_size),
119+
("grpc.max_receive_message_length", self.max_message_size),
120+
]
121+
self.servicer = SourceTransformAsyncServicer(handler=source_transform_instance)
122+
123+
def start(self) -> None:
124+
"""
125+
Starter function for the Async server class, need a separate caller
126+
so that all the async coroutines can be started from a single context
127+
"""
128+
aiorun.run(self.aexec(), use_uvloop=True)
129+
130+
async def aexec(self) -> None:
131+
"""
132+
Starts the Async gRPC server on the given UNIX socket with
133+
given max threads.
134+
"""
135+
136+
# As the server is async, we need to create a new server instance in the
137+
# same thread as the event loop so that all the async calls are made in the
138+
# same context
139+
140+
server_new = grpc.aio.server(options=self._server_options)
141+
server_new.add_insecure_port(self.sock_path)
142+
transform_pb2_grpc.add_SourceTransformServicer_to_server(self.servicer, server_new)
143+
144+
serv_info = ServerInfo.get_default_server_info()
145+
serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[
146+
ContainerType.Sourcetransformer
147+
]
148+
149+
# Start the async server
150+
await start_async_server(
151+
server_async=server_new,
152+
sock_path=self.sock_path,
153+
max_threads=self.max_threads,
154+
cleanup_coroutines=list(),
155+
server_info_file=self.server_info_file,
156+
server_info=serv_info,
157+
)

0 commit comments

Comments
 (0)