From 75f0e512603923f1f71251c08be5dfec67b5de99 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 14 Apr 2025 14:18:20 -0400 Subject: [PATCH] feat(server): add graceful::Watcher type --- src/server/graceful.rs | 48 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/src/server/graceful.rs b/src/server/graceful.rs index c187942..9553a75 100644 --- a/src/server/graceful.rs +++ b/src/server/graceful.rs @@ -17,10 +17,20 @@ use pin_project_lite::pin_project; use tokio::sync::watch; /// A graceful shutdown utility +// Purposefully not `Clone`, see `watcher()` method for why. pub struct GracefulShutdown { tx: watch::Sender<()>, } +/// A watcher side of the graceful shutdown. +/// +/// This type can only watch a connection, it cannot trigger a shutdown. +/// +/// Call [`GracefulShutdown::watcher()`] to construct one of these. +pub struct Watcher { + rx: watch::Receiver<()>, +} + impl GracefulShutdown { /// Create a new graceful shutdown helper. pub fn new() -> Self { @@ -30,12 +40,20 @@ impl GracefulShutdown { /// Wrap a future for graceful shutdown watching. pub fn watch(&self, conn: C) -> impl Future { - let mut rx = self.tx.subscribe(); - GracefulConnectionFuture::new(conn, async move { - let _ = rx.changed().await; - // hold onto the rx until the watched future is completed - rx - }) + self.watcher().watch(conn) + } + + /// Create an owned type that can watch a connection. + /// + /// This method allows created an owned type that can be sent onto another + /// task before calling [`Watcher::watch()`]. + // Internal: this function exists because `Clone` allows footguns. + // If the `tx` were cloned (or the `rx`), race conditions can happens where + // one task starting a shutdown is scheduled and interwined with a task + // starting to watch a connection, and the "watch version" is one behind. + pub fn watcher(&self) -> Watcher { + let rx = self.tx.subscribe(); + Watcher { rx } } /// Signal shutdown for all watched connections. @@ -64,6 +82,24 @@ impl Default for GracefulShutdown { } } +impl Watcher { + /// Wrap a future for graceful shutdown watching. + pub fn watch(self, conn: C) -> impl Future { + let Watcher { mut rx } = self; + GracefulConnectionFuture::new(conn, async move { + let _ = rx.changed().await; + // hold onto the rx until the watched future is completed + rx + }) + } +} + +impl Debug for Watcher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("GracefulWatcher").finish() + } +} + pin_project! { struct GracefulConnectionFuture { #[pin]