From db0c47ea8e273afa8be13ed7dadd1287fa70e642 Mon Sep 17 00:00:00 2001 From: Gabe Montague Date: Fri, 2 May 2025 17:25:15 -0400 Subject: [PATCH 1/2] Add --reload-dir argument --- taskiq/cli/watcher.py | 3 ++- taskiq/cli/worker/args.py | 11 +++++++++++ taskiq/cli/worker/process_manager.py | 23 ++++++++++++++--------- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/taskiq/cli/watcher.py b/taskiq/cli/watcher.py index 9ed541d..8c6cb29 100644 --- a/taskiq/cli/watcher.py +++ b/taskiq/cli/watcher.py @@ -14,12 +14,13 @@ class FileWatcher: # pragma: no cover def __init__( self, callback: Callable[..., None], + path: Path, use_gitignore: bool = True, **callback_kwargs: Any, ) -> None: self.callback = callback self.gitignore = None - gpath = Path("./.gitignore") + gpath = path / ".gitignore" if use_gitignore and gpath.exists(): self.gitignore = parse_gitignore(gpath) self.callback_kwargs = callback_kwargs diff --git a/taskiq/cli/worker/args.py b/taskiq/cli/worker/args.py index 06b7fa2..ef187ab 100644 --- a/taskiq/cli/worker/args.py +++ b/taskiq/cli/worker/args.py @@ -39,6 +39,7 @@ class WorkerArgs: no_parse: bool = False shutdown_timeout: float = 5 reload: bool = False + reload_dirs: List[str] = field(default_factory=list) no_gitignore: bool = False max_async_tasks: int = 100 receiver: str = "taskiq.receiver:Receiver" @@ -172,6 +173,16 @@ def from_cli( help="Reload workers if file is changed. " "`reload` extra is required for this option.", ) + parser.add_argument( + "--reload-dir", + action="append", + dest="reload_dirs", + default=[], + help=( + "Specify a directory to watch for changes. Can be specified " + "multiple times. Defaults to the current working directory." + ), + ) parser.add_argument( "--do-not-use-gitignore", action="store_true", diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index c6633b1..918b57c 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -6,6 +6,7 @@ from dataclasses import dataclass from multiprocessing import Event, Process, Queue, current_process from multiprocessing.synchronize import Event as EventType +from pathlib import Path from time import sleep from typing import Any, Callable, List, Optional @@ -163,15 +164,19 @@ def __init__( self.action_queue: "Queue[ProcessActionBase]" = Queue(-1) self.args = args if args.reload and observer is not None: - observer.schedule( - FileWatcher( - callback=schedule_workers_reload, - use_gitignore=not args.no_gitignore, - action_queue=self.action_queue, - ), - path=".", - recursive=True, - ) + watch_paths = args.reload_dirs if args.reload_dirs else ["."] + for path_to_watch in watch_paths: + logger.info(f"Watching directory: {path_to_watch}") + observer.schedule( + FileWatcher( + callback=schedule_workers_reload, + path=Path(path_to_watch), + use_gitignore=not args.no_gitignore, + action_queue=self.action_queue, + ), + path=path_to_watch, + recursive=True, + ) shutdown_handler = get_signal_handler(self.action_queue, ShutdownAction()) signal.signal(signal.SIGINT, shutdown_handler) From f568da24acd8a6c8ad5ae93b945aff10a6d1a801 Mon Sep 17 00:00:00 2001 From: Gabe Montague Date: Fri, 2 May 2025 17:38:09 -0400 Subject: [PATCH 2/2] Change added logging from info to debug --- taskiq/cli/worker/process_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index 918b57c..0fd6b4a 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -166,7 +166,7 @@ def __init__( if args.reload and observer is not None: watch_paths = args.reload_dirs if args.reload_dirs else ["."] for path_to_watch in watch_paths: - logger.info(f"Watching directory: {path_to_watch}") + logger.debug(f"Watching directory: {path_to_watch}") observer.schedule( FileWatcher( callback=schedule_workers_reload,