@@ -16,6 +16,7 @@ use crate::service::Routes;
16
16
pub use conn:: { Connected , TcpConnectInfo } ;
17
17
use hyper_util:: {
18
18
rt:: { TokioExecutor , TokioIo , TokioTimer } ,
19
+ server:: graceful:: GracefulShutdown ,
19
20
service:: TowerToHyperService ,
20
21
} ;
21
22
#[ cfg( feature = "tls" ) ]
@@ -561,10 +562,7 @@ impl<L> Server<L> {
561
562
builder
562
563
} ;
563
564
564
- let ( signal_tx, signal_rx) = tokio:: sync:: watch:: channel ( ( ) ) ;
565
- let signal_tx = Arc :: new ( signal_tx) ;
566
-
567
- let graceful = signal. is_some ( ) ;
565
+ let graceful = signal. is_some ( ) . then ( GracefulShutdown :: new) ;
568
566
let mut sig = pin ! ( Fuse { inner: signal } ) ;
569
567
let mut incoming = pin ! ( incoming) ;
570
568
@@ -600,21 +598,13 @@ impl<L> Server<L> {
600
598
let hyper_io = TokioIo :: new( io) ;
601
599
let hyper_svc = TowerToHyperService :: new( req_svc. map_request( |req: Request <Incoming >| req. map( boxed) ) ) ;
602
600
603
- serve_connection( hyper_io, hyper_svc, server. clone( ) , graceful. then ( || signal_rx . clone( ) ) ) ;
601
+ serve_connection( hyper_io, hyper_svc, server. clone( ) , graceful. clone( ) ) ;
604
602
}
605
603
}
606
604
}
607
605
608
- if graceful {
609
- let _ = signal_tx. send ( ( ) ) ;
610
- drop ( signal_rx) ;
611
- trace ! (
612
- "waiting for {} connections to close" ,
613
- signal_tx. receiver_count( )
614
- ) ;
615
-
616
- // Wait for all connections to close
617
- signal_tx. closed ( ) . await ;
606
+ if let Some ( graceful) = graceful {
607
+ graceful. shutdown ( ) . await ;
618
608
}
619
609
620
610
Ok ( ( ) )
@@ -627,7 +617,7 @@ fn serve_connection<IO, S>(
627
617
hyper_io : IO ,
628
618
hyper_svc : S ,
629
619
builder : ConnectionBuilder ,
630
- mut watcher : Option < tokio :: sync :: watch :: Receiver < ( ) > > ,
620
+ graceful : Option < GracefulShutdown > ,
631
621
) where
632
622
IO : hyper:: rt:: Read + hyper:: rt:: Write + Unpin + Send + ' static ,
633
623
S : HyperService < Request < Incoming > , Response = Response < BoxBody > > + Clone + Send + ' static ,
@@ -636,28 +626,20 @@ fn serve_connection<IO, S>(
636
626
{
637
627
tokio:: spawn ( async move {
638
628
{
639
- let mut sig = pin ! ( Fuse {
640
- inner: watcher. as_mut( ) . map( |w| w. changed( ) ) ,
641
- } ) ;
629
+ let conn = builder. serve_connection ( hyper_io, hyper_svc) ;
642
630
643
- let mut conn = pin ! ( builder. serve_connection( hyper_io, hyper_svc) ) ;
631
+ let result = if let Some ( graceful) = graceful {
632
+ let conn = graceful. watch ( conn) ;
633
+ conn. await
634
+ } else {
635
+ conn. await
636
+ } ;
644
637
645
- loop {
646
- tokio:: select! {
647
- rv = & mut conn => {
648
- if let Err ( err) = rv {
649
- debug!( "failed serving connection: {:#}" , err) ;
650
- }
651
- break ;
652
- } ,
653
- _ = & mut sig => {
654
- conn. as_mut( ) . graceful_shutdown( ) ;
655
- }
656
- }
638
+ if let Err ( err) = result {
639
+ debug ! ( "failed serving connection: {:#}" , err) ;
657
640
}
658
641
}
659
642
660
- drop ( watcher) ;
661
643
trace ! ( "connection closed" ) ;
662
644
} ) ;
663
645
}
0 commit comments