Skip to content

Commit a2186e5

Browse files
committed
Added task-creators.
1 parent faddfc3 commit a2186e5

File tree

2 files changed

+173
-104
lines changed

2 files changed

+173
-104
lines changed

taskiq/abc/broker.py

+3-104
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
import os
2-
import sys
31
import warnings
42
from abc import ABC, abstractmethod
53
from collections import defaultdict
6-
from functools import wraps
74
from logging import getLogger
85
from typing import (
96
TYPE_CHECKING,
@@ -18,7 +15,6 @@
1815
Optional,
1916
TypeVar,
2017
Union,
21-
overload,
2218
)
2319
from uuid import uuid4
2420

@@ -35,7 +31,8 @@
3531
from taskiq.result_backends.dummy import DummyResultBackend
3632
from taskiq.serializers.json_serializer import JSONSerializer
3733
from taskiq.state import TaskiqState
38-
from taskiq.utils import maybe_awaitable, remove_suffix
34+
from taskiq.task_creator import BaseTaskCreator
35+
from taskiq.utils import maybe_awaitable
3936
from taskiq.warnings import TaskiqDeprecationWarning
4037

4138
if TYPE_CHECKING: # pragma: no cover
@@ -97,6 +94,7 @@ def __init__(
9794
TaskiqDeprecationWarning,
9895
stacklevel=2,
9996
)
97+
self.task: BaseTaskCreator = BaseTaskCreator(self)
10098
self.middlewares: "List[TaskiqMiddleware]" = []
10199
self.result_backend = result_backend
102100
self.decorator_class = AsyncTaskiqDecoratedTask
@@ -255,105 +253,6 @@ def listen(self) -> AsyncGenerator[Union[bytes, AckableMessage], None]:
255253
:return: nothing.
256254
"""
257255

258-
@overload
259-
def task(
260-
self,
261-
task_name: Callable[_FuncParams, _ReturnType],
262-
**labels: Any,
263-
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: # pragma: no cover
264-
...
265-
266-
@overload
267-
def task(
268-
self,
269-
task_name: Optional[str] = None,
270-
**labels: Any,
271-
) -> Callable[
272-
[Callable[_FuncParams, _ReturnType]],
273-
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
274-
]: # pragma: no cover
275-
...
276-
277-
def task( # type: ignore[misc]
278-
self,
279-
task_name: Optional[str] = None,
280-
**labels: Any,
281-
) -> Any:
282-
"""
283-
Decorator that turns function into a task.
284-
285-
This decorator converts function to
286-
a `TaskiqDecoratedTask` object.
287-
288-
This object can be called as a usual function,
289-
because it uses decorated function in it's __call__
290-
method.
291-
292-
!! You have to use it with parentheses in order to
293-
get autocompletion. Like this:
294-
295-
>>> @task()
296-
>>> def my_func():
297-
>>> ...
298-
299-
:param task_name: custom name of a task, defaults to decorated function's name.
300-
:param labels: some addition labels for task.
301-
302-
:returns: decorator function or AsyncTaskiqDecoratedTask.
303-
"""
304-
305-
def make_decorated_task(
306-
inner_labels: Dict[str, Union[str, int]],
307-
inner_task_name: Optional[str] = None,
308-
) -> Callable[
309-
[Callable[_FuncParams, _ReturnType]],
310-
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
311-
]:
312-
def inner(
313-
func: Callable[_FuncParams, _ReturnType],
314-
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
315-
nonlocal inner_task_name
316-
if inner_task_name is None:
317-
fmodule = func.__module__
318-
if fmodule == "__main__": # pragma: no cover
319-
fmodule = ".".join(
320-
remove_suffix(sys.argv[0], ".py").split(
321-
os.path.sep,
322-
),
323-
)
324-
fname = func.__name__
325-
if fname == "<lambda>":
326-
fname = f"lambda_{uuid4().hex}"
327-
inner_task_name = f"{fmodule}:{fname}"
328-
wrapper = wraps(func)
329-
330-
decorated_task = wrapper(
331-
self.decorator_class(
332-
broker=self,
333-
original_func=func,
334-
labels=inner_labels,
335-
task_name=inner_task_name,
336-
),
337-
)
338-
339-
self._register_task(decorated_task.task_name, decorated_task) # type: ignore
340-
341-
return decorated_task # type: ignore
342-
343-
return inner
344-
345-
if callable(task_name):
346-
# This is an edge case,
347-
# when decorator called without parameters.
348-
return make_decorated_task(
349-
inner_labels=labels or {},
350-
)(task_name)
351-
352-
return make_decorated_task(
353-
inner_task_name=task_name,
354-
inner_labels=labels or {},
355-
)
356-
357256
def register_task(
358257
self,
359258
func: Callable[_FuncParams, _ReturnType],

taskiq/task_creator.py

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import os
2+
import sys
3+
import warnings
4+
from functools import wraps
5+
from typing import (
6+
TYPE_CHECKING,
7+
Any,
8+
Callable,
9+
Dict,
10+
Optional,
11+
TypeVar,
12+
overload,
13+
)
14+
from uuid import uuid4
15+
16+
from typing_extensions import ParamSpec, Self
17+
18+
from taskiq.decor import AsyncTaskiqDecoratedTask
19+
from taskiq.utils import remove_suffix
20+
21+
if TYPE_CHECKING:
22+
from taskiq.abc.broker import AsyncBroker
23+
24+
25+
_FuncParams = ParamSpec("_FuncParams")
26+
_ReturnType = TypeVar("_ReturnType")
27+
28+
29+
class BaseTaskCreator:
30+
"""
31+
Base class for task creator.
32+
33+
Instances of this class are used to make tasks out of the given functions.
34+
"""
35+
36+
def __init__(self, broker: "AsyncBroker") -> None:
37+
self._broker = broker
38+
self._task_name: Optional[str] = None
39+
self._labels: Dict[str, Any] = {}
40+
41+
def name(self, name: str) -> Self:
42+
"""Assign custom name to the task."""
43+
self._task_name = name
44+
return self
45+
46+
def labels(self, **labels: Any) -> Self:
47+
"""Assign custom labels to the task."""
48+
self._labels = labels
49+
return self
50+
51+
def make_task(
52+
self,
53+
task_name: str,
54+
func: Callable[_FuncParams, _ReturnType],
55+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
56+
"""Make a task from the given function."""
57+
return AsyncTaskiqDecoratedTask(
58+
broker=self._broker,
59+
original_func=func,
60+
labels=self._labels,
61+
task_name=task_name,
62+
)
63+
64+
@overload
65+
def __call__(
66+
self,
67+
task_name: Callable[_FuncParams, _ReturnType],
68+
**labels: Any,
69+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: # pragma: no cover
70+
...
71+
72+
@overload
73+
def __call__(
74+
self,
75+
task_name: Optional[str] = None,
76+
**labels: Any,
77+
) -> Callable[
78+
[Callable[_FuncParams, _ReturnType]],
79+
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
80+
]: # pragma: no cover
81+
...
82+
83+
def __call__( # type: ignore[misc]
84+
self,
85+
task_name: Optional[str] = None,
86+
**labels: Any,
87+
) -> Any:
88+
"""
89+
Decorator that turns function into a task.
90+
91+
This decorator converts function to
92+
a `TaskiqDecoratedTask` object.
93+
94+
This object can be called as a usual function,
95+
because it uses decorated function in it's __call__
96+
method.
97+
98+
!! You have to use it with parentheses in order to
99+
get autocompletion. Like this:
100+
101+
>>> @task()
102+
>>> def my_func():
103+
>>> ...
104+
105+
:param task_name: custom name of a task, defaults to decorated function's name.
106+
:param labels: some addition labels for task.
107+
108+
:returns: decorator function or AsyncTaskiqDecoratedTask.
109+
"""
110+
if task_name is not None and isinstance(task_name, str):
111+
warnings.warn(
112+
"Using task_name is deprecated, @broker.task.name('name') instead",
113+
DeprecationWarning,
114+
stacklevel=2,
115+
)
116+
self._task_name = task_name
117+
if labels:
118+
warnings.warn(
119+
"Using labels is deprecated, @broker.task.labels(**labels) instead",
120+
DeprecationWarning,
121+
stacklevel=2,
122+
)
123+
self._labels.update(labels)
124+
125+
def make_decorated_task() -> Callable[
126+
[Callable[_FuncParams, _ReturnType]],
127+
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
128+
]:
129+
def inner(
130+
func: Callable[_FuncParams, _ReturnType],
131+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
132+
inner_task_name = self._task_name
133+
if inner_task_name is None:
134+
fmodule = func.__module__
135+
if fmodule == "__main__": # pragma: no cover
136+
fmodule = ".".join(
137+
remove_suffix(sys.argv[0], ".py").split(os.path.sep),
138+
)
139+
fname = func.__name__
140+
if fname == "<lambda>":
141+
fname = f"lambda_{uuid4().hex}"
142+
inner_task_name = f"{fmodule}:{fname}"
143+
144+
wrapper = wraps(func)
145+
decorated_task = wrapper(
146+
self.make_task(
147+
task_name=inner_task_name,
148+
func=func,
149+
),
150+
)
151+
152+
# We need these ignored lines because
153+
# `wrap` function copies __annotations__,
154+
# therefore mypy thinks that decorated_task
155+
# is still an instance of the original function.
156+
self._broker._register_task( # noqa: SLF001
157+
decorated_task.task_name, # type: ignore
158+
decorated_task, # type: ignore
159+
)
160+
161+
return decorated_task # type: ignore
162+
163+
return inner
164+
165+
if callable(task_name):
166+
# This is an edge case,
167+
# when decorator called without parameters.
168+
return make_decorated_task()(task_name)
169+
170+
return make_decorated_task()

0 commit comments

Comments
 (0)