diff --git a/taskiq/cli/watcher.py b/taskiq/cli/watcher.py index 9ed541d..8c6cb29 100644 --- a/taskiq/cli/watcher.py +++ b/taskiq/cli/watcher.py @@ -14,12 +14,13 @@ class FileWatcher: # pragma: no cover def __init__( self, callback: Callable[..., None], + path: Path, use_gitignore: bool = True, **callback_kwargs: Any, ) -> None: self.callback = callback self.gitignore = None - gpath = Path("./.gitignore") + gpath = path / ".gitignore" if use_gitignore and gpath.exists(): self.gitignore = parse_gitignore(gpath) self.callback_kwargs = callback_kwargs diff --git a/taskiq/cli/worker/args.py b/taskiq/cli/worker/args.py index 06b7fa2..ef187ab 100644 --- a/taskiq/cli/worker/args.py +++ b/taskiq/cli/worker/args.py @@ -39,6 +39,7 @@ class WorkerArgs: no_parse: bool = False shutdown_timeout: float = 5 reload: bool = False + reload_dirs: List[str] = field(default_factory=list) no_gitignore: bool = False max_async_tasks: int = 100 receiver: str = "taskiq.receiver:Receiver" @@ -172,6 +173,16 @@ def from_cli( help="Reload workers if file is changed. " "`reload` extra is required for this option.", ) + parser.add_argument( + "--reload-dir", + action="append", + dest="reload_dirs", + default=[], + help=( + "Specify a directory to watch for changes. Can be specified " + "multiple times. Defaults to the current working directory." + ), + ) parser.add_argument( "--do-not-use-gitignore", action="store_true", diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index c6633b1..0fd6b4a 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -6,6 +6,7 @@ from dataclasses import dataclass from multiprocessing import Event, Process, Queue, current_process from multiprocessing.synchronize import Event as EventType +from pathlib import Path from time import sleep from typing import Any, Callable, List, Optional @@ -163,15 +164,19 @@ def __init__( self.action_queue: "Queue[ProcessActionBase]" = Queue(-1) self.args = args if args.reload and observer is not None: - observer.schedule( - FileWatcher( - callback=schedule_workers_reload, - use_gitignore=not args.no_gitignore, - action_queue=self.action_queue, - ), - path=".", - recursive=True, - ) + watch_paths = args.reload_dirs if args.reload_dirs else ["."] + for path_to_watch in watch_paths: + logger.debug(f"Watching directory: {path_to_watch}") + observer.schedule( + FileWatcher( + callback=schedule_workers_reload, + path=Path(path_to_watch), + use_gitignore=not args.no_gitignore, + action_queue=self.action_queue, + ), + path=path_to_watch, + recursive=True, + ) shutdown_handler = get_signal_handler(self.action_queue, ShutdownAction()) signal.signal(signal.SIGINT, shutdown_handler)