From 89a594461e94e1063992c516950a82ea44b9ec63 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Tue, 29 Jun 2021 23:30:10 +0200 Subject: [PATCH 1/8] WIP: Update to Tokio 1.0 --- Cargo.toml | 3 +++ core-client/transports/Cargo.toml | 6 +++--- http/Cargo.toml | 2 +- http/src/lib.rs | 2 +- ipc/Cargo.toml | 2 +- server-utils/Cargo.toml | 7 ++++--- server-utils/src/lib.rs | 1 + server-utils/src/reactor.rs | 5 ++--- server-utils/src/suspendable_stream.rs | 15 ++++++++------- stdio/Cargo.toml | 6 +++--- tcp/src/server.rs | 2 ++ tcp/src/tests.rs | 14 +++++++------- 12 files changed, 36 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b509827d5..58c501a43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,6 @@ members = [ "test", "ws", ] + +[patch.crates-io] +parity-tokio-ipc = { git = "https://github.com/Xanewok/parity-tokio-ipc", branch = "mio-07" } diff --git a/core-client/transports/Cargo.toml b/core-client/transports/Cargo.toml index af4b3a62a..285b8369d 100644 --- a/core-client/transports/Cargo.toml +++ b/core-client/transports/Cargo.toml @@ -44,11 +44,11 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" url = "1.7" -hyper = { version = "0.13", optional = true } -hyper-tls = { version = "0.4", optional = true } +hyper = { version = "0.14", features = ["client", "http1", "server"], optional = true } +hyper-tls = { version = "0.5", optional = true } jsonrpc-server-utils = { version = "17.1", path = "../../server-utils", optional = true } parity-tokio-ipc = { version = "0.8", optional = true } -tokio = { version = "0.2", optional = true } +tokio = { version = "1", optional = true } websocket = { version = "0.24", optional = true } [dev-dependencies] diff --git a/http/Cargo.toml b/http/Cargo.toml index 1e6172abc..eb0d6c59e 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -12,7 +12,7 @@ version = "17.1.0" [dependencies] futures = "0.3" -hyper = "0.13" +hyper = { version = "0.14", features = ["client", "server", "stream"] } jsonrpc-core = { version = "17.1", path = "../core" } jsonrpc-server-utils = { version = "17.1", path = "../server-utils" } log = "0.4" diff --git a/http/src/lib.rs b/http/src/lib.rs index a229ae509..4a1bdcff6 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -566,7 +566,7 @@ fn serve>( let raw_socket = (); let server_builder = - hyper::Server::from_tcp(listener).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + hyper::server::Server::from_tcp(listener).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; // Add current host to allowed headers. // NOTE: we need to use `l.local_addr()` instead of `addr` // it might be different! diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index 0eef09da3..226912e4e 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -23,7 +23,7 @@ env_logger = "0.7" lazy_static = "1.0" [target.'cfg(not(windows))'.dev-dependencies] -tokio = { version = "0.2", default-features = false, features = ["uds", "time", "rt-threaded", "io-driver"] } +tokio = { version = "1", default-features = false, features = ["net", "time", "rt-multi-thread"] } [badges] travis-ci = { repository = "paritytech/jsonrpc", branch = "master"} diff --git a/server-utils/Cargo.toml b/server-utils/Cargo.toml index aacbce4f0..5a6e27f18 100644 --- a/server-utils/Cargo.toml +++ b/server-utils/Cargo.toml @@ -11,14 +11,15 @@ repository = "https://github.com/paritytech/jsonrpc" version = "17.1.0" [dependencies] -bytes = "0.5" +bytes = "1.0" futures = "0.3" globset = "0.4" jsonrpc-core = { version = "17.1", path = "../core" } lazy_static = "1.1.0" log = "0.4" -tokio = { version = "0.2", features = ["rt-threaded", "io-driver", "io-util", "time", "tcp"] } -tokio-util = { version = "0.3", features = ["codec"] } +tokio = { version = "1", features = ["rt-multi-thread", "io-util", "time", "net"] } +tokio-util = { version = "0.6", features = ["codec"] } +tokio-stream = { version = "0.1", features = ["net"] } unicase = "2.0" diff --git a/server-utils/src/lib.rs b/server-utils/src/lib.rs index 5c9f52d1a..3d6cd5cc3 100644 --- a/server-utils/src/lib.rs +++ b/server-utils/src/lib.rs @@ -9,6 +9,7 @@ extern crate log; extern crate lazy_static; pub use tokio; +pub use tokio_stream; pub use tokio_util; pub mod cors; diff --git a/server-utils/src/reactor.rs b/server-utils/src/reactor.rs index df8afd408..041328528 100644 --- a/server-utils/src/reactor.rs +++ b/server-utils/src/reactor.rs @@ -96,9 +96,8 @@ impl RpcEventLoop { pub fn with_name(name: Option) -> io::Result { let (stop, stopped) = futures::channel::oneshot::channel(); - let mut tb = runtime::Builder::new(); - tb.core_threads(1); - tb.threaded_scheduler(); + let mut tb = runtime::Builder::new_multi_thread(); + tb.worker_threads(1); tb.enable_all(); if let Some(name) = name { diff --git a/server-utils/src/suspendable_stream.rs b/server-utils/src/suspendable_stream.rs index 8d3179da9..b23e51268 100644 --- a/server-utils/src/suspendable_stream.rs +++ b/server-utils/src/suspendable_stream.rs @@ -2,9 +2,7 @@ use std::future::Future; use std::io; use std::pin::Pin; use std::task::Poll; -use std::time::Duration; - -use tokio::time::Delay; +use std::time::{Duration, Instant}; /// `Incoming` is a stream of incoming sockets /// Polling the stream may return a temporary io::Error (for instance if we can't open the connection because of "too many open files" limit) @@ -19,7 +17,7 @@ pub struct SuspendableStream { next_delay: Duration, initial_delay: Duration, max_delay: Duration, - timeout: Option, + timeout: Option, } impl SuspendableStream { @@ -44,8 +42,11 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { loop { - if let Some(timeout) = self.timeout.as_mut() { - match Pin::new(timeout).poll(cx) { + if let Some(timeout) = &self.timeout { + let timeout = tokio::time::Instant::from_std(*timeout); + let sleep = tokio::time::sleep_until(timeout); + futures::pin_mut!(sleep); + match sleep.poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(()) => {} } @@ -78,7 +79,7 @@ where }; debug!("Error accepting connection: {}", err); debug!("The server will stop accepting connections for {:?}", self.next_delay); - self.timeout = Some(tokio::time::delay_for(self.next_delay)); + self.timeout = Some(Instant::now() + self.next_delay); } } } diff --git a/stdio/Cargo.toml b/stdio/Cargo.toml index 8c907431f..2e5c91ba9 100644 --- a/stdio/Cargo.toml +++ b/stdio/Cargo.toml @@ -13,11 +13,11 @@ version = "17.1.0" futures = "0.3" jsonrpc-core = { version = "17.1", path = "../core" } log = "0.4" -tokio = { version = "0.2", features = ["io-std", "io-driver", "io-util"] } -tokio-util = { version = "0.3", features = ["codec"] } +tokio = { version = "1", features = ["io-std", "io-util"] } +tokio-util = { version = "0.6", features = ["codec"] } [dev-dependencies] -tokio = { version = "0.2", features = ["rt-core", "macros"] } +tokio = { version = "1", features = ["rt", "macros"] } lazy_static = "1.0" env_logger = "0.7" diff --git a/tcp/src/server.rs b/tcp/src/server.rs index 55a2b8d4a..9e6aad222 100644 --- a/tcp/src/server.rs +++ b/tcp/src/server.rs @@ -7,6 +7,7 @@ use tower_service::Service as _; use crate::futures::{self, future}; use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware}; +use crate::server_utils::tokio_stream::wrappers::TcpListenerStream; use crate::server_utils::{codecs, reactor, tokio, tokio_util::codec::Framed, SuspendableStream}; use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels}; @@ -94,6 +95,7 @@ where executor.executor().spawn(async move { let start = async { let listener = tokio::net::TcpListener::bind(&address).await?; + let listener = TcpListenerStream::new(listener); let connections = SuspendableStream::new(listener); let server = connections.map(|socket| { diff --git a/tcp/src/tests.rs b/tcp/src/tests.rs index b95e1b288..36d7c45c8 100644 --- a/tcp/src/tests.rs +++ b/tcp/src/tests.rs @@ -1,4 +1,4 @@ -use std::net::{Shutdown, SocketAddr}; +use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -23,7 +23,7 @@ fn casual_server() -> ServerBuilder { } fn run_future(fut: impl std::future::Future + Send) -> O { - let mut rt = tokio::runtime::Runtime::new().unwrap(); + let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(fut) } @@ -60,9 +60,9 @@ fn disconnect() { let _server = server.start(&addr).expect("Server must run with no issues"); run_future(async move { - let stream = TcpStream::connect(&addr).await.unwrap(); + let mut stream = TcpStream::connect(&addr).await.unwrap(); assert_eq!(stream.peer_addr().unwrap(), addr); - stream.shutdown(::std::net::Shutdown::Both).unwrap(); + stream.shutdown().await.unwrap(); }); ::std::thread::sleep(::std::time::Duration::from_millis(50)); @@ -76,7 +76,7 @@ fn dummy_request(addr: &SocketAddr, data: Vec) -> Vec { let stream = async move { let mut stream = TcpStream::connect(addr).await?; stream.write_all(&data).await?; - stream.shutdown(Shutdown::Write)?; + stream.shutdown().await?; let mut read_buf = vec![]; let _ = stream.read_to_end(&mut read_buf).await; @@ -243,7 +243,7 @@ fn message() { let client = async move { let stream = TcpStream::connect(&addr); - let delay = tokio::time::delay_for(Duration::from_millis(500)); + let delay = tokio::time::sleep(Duration::from_millis(500)); let (stream, _) = futures::join!(stream, delay); let mut stream = stream?; @@ -272,7 +272,7 @@ fn message() { let data = b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n"; stream.write_all(&data[..]).await?; - stream.shutdown(Shutdown::Write).unwrap(); + stream.shutdown().await.unwrap(); let mut read_buf = vec![]; let _ = stream.read_to_end(&mut read_buf).await?; From 1a71efe21b7690610f761e26ce950f7dfbb544d9 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Wed, 30 Jun 2021 13:18:17 +0200 Subject: [PATCH 2/8] ipc: Migrate remaining Unix test code to Tokio 1.0 --- ipc/src/server.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ipc/src/server.rs b/ipc/src/server.rs index 107150304..1b8fd0969 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -360,7 +360,7 @@ mod tests { reply.expect("there should be one reply") }; - let mut rt = tokio::runtime::Runtime::new().unwrap(); + let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(reply).expect("wait for reply") } @@ -609,9 +609,10 @@ mod tests { tx.send(true).expect("failed to report that the server has stopped"); }); - let mut rt = tokio::runtime::Runtime::new().unwrap(); + let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { - let timeout = tokio::time::delay_for(Duration::from_millis(500)); + let timeout = tokio::time::sleep(Duration::from_millis(500)); + futures::pin_mut!(timeout); match futures::future::select(rx, timeout).await { futures::future::Either::Left((result, _)) => { From b05019d5cb88cccdd5a93382aeb8e4f0b1c39f8f Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Wed, 30 Jun 2021 13:18:58 +0200 Subject: [PATCH 3/8] core-client: Don't depend on unused hyper/server feature --- core-client/transports/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core-client/transports/Cargo.toml b/core-client/transports/Cargo.toml index 285b8369d..b27fb8a96 100644 --- a/core-client/transports/Cargo.toml +++ b/core-client/transports/Cargo.toml @@ -44,7 +44,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" url = "1.7" -hyper = { version = "0.14", features = ["client", "http1", "server"], optional = true } +hyper = { version = "0.14", features = ["client", "http1"], optional = true } hyper-tls = { version = "0.5", optional = true } jsonrpc-server-utils = { version = "17.1", path = "../../server-utils", optional = true } parity-tokio-ipc = { version = "0.8", optional = true } From a26620ce9a1811599102c8d36da030a32956c1be Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Mon, 5 Jul 2021 00:35:17 +0200 Subject: [PATCH 4/8] http: Fix used feature set by hyper --- http/Cargo.toml | 2 +- http/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/http/Cargo.toml b/http/Cargo.toml index eb0d6c59e..8208268f2 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -12,7 +12,7 @@ version = "17.1.0" [dependencies] futures = "0.3" -hyper = { version = "0.14", features = ["client", "server", "stream"] } +hyper = { version = "0.14", features = ["http1", "tcp", "server", "stream"] } jsonrpc-core = { version = "17.1", path = "../core" } jsonrpc-server-utils = { version = "17.1", path = "../server-utils" } log = "0.4" diff --git a/http/src/lib.rs b/http/src/lib.rs index 4a1bdcff6..a229ae509 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -566,7 +566,7 @@ fn serve>( let raw_socket = (); let server_builder = - hyper::server::Server::from_tcp(listener).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + hyper::Server::from_tcp(listener).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; // Add current host to allowed headers. // NOTE: we need to use `l.local_addr()` instead of `addr` // it might be different! From 29ebb2cedaf47735b9b0b2e251e07a31867cabce Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Mon, 5 Jul 2021 00:58:54 +0200 Subject: [PATCH 5/8] WIP: Bump to the transferred parity-tokio-ipc repo --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 58c501a43..ddcc07a59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,4 @@ members = [ ] [patch.crates-io] -parity-tokio-ipc = { git = "https://github.com/Xanewok/parity-tokio-ipc", branch = "mio-07" } +parity-tokio-ipc = { git = "https://github.com/paritytech/parity-tokio-ipc", branch = "master" } From 6a450920cf0faf2d8ed999a7bdc65d30b7cd0255 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Tue, 6 Jul 2021 15:28:50 +0200 Subject: [PATCH 6/8] Use newly released version of parity-tokio-ipc --- Cargo.toml | 2 -- core-client/transports/Cargo.toml | 2 +- ipc/Cargo.toml | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ddcc07a59..f25b341fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,5 +15,3 @@ members = [ "ws", ] -[patch.crates-io] -parity-tokio-ipc = { git = "https://github.com/paritytech/parity-tokio-ipc", branch = "master" } diff --git a/core-client/transports/Cargo.toml b/core-client/transports/Cargo.toml index b27fb8a96..f5617df2c 100644 --- a/core-client/transports/Cargo.toml +++ b/core-client/transports/Cargo.toml @@ -47,7 +47,7 @@ url = "1.7" hyper = { version = "0.14", features = ["client", "http1"], optional = true } hyper-tls = { version = "0.5", optional = true } jsonrpc-server-utils = { version = "17.1", path = "../../server-utils", optional = true } -parity-tokio-ipc = { version = "0.8", optional = true } +parity-tokio-ipc = { version = "0.9", optional = true } tokio = { version = "1", optional = true } websocket = { version = "0.24", optional = true } diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index 226912e4e..6f3d9adaf 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -15,7 +15,7 @@ log = "0.4" tower-service = "0.3" jsonrpc-core = { version = "17.1", path = "../core" } jsonrpc-server-utils = { version = "17.1", path = "../server-utils", default-features = false } -parity-tokio-ipc = "0.8" +parity-tokio-ipc = "0.9" parking_lot = "0.11.0" [dev-dependencies] From 75b28a1cb79cd84af76093b56fdeae2071dc0f95 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Tue, 6 Jul 2021 15:30:13 +0200 Subject: [PATCH 7/8] Remove extra newline in Cargo.toml --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f25b341fb..b509827d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,4 +14,3 @@ members = [ "test", "ws", ] - From e265114ad3bdb875ff7bcae3e3c259e81b3d78e8 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Wed, 7 Jul 2021 15:12:51 +0200 Subject: [PATCH 8/8] Refactor suspension slightly in SuspendableStream --- server-utils/src/suspendable_stream.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/server-utils/src/suspendable_stream.rs b/server-utils/src/suspendable_stream.rs index b23e51268..96af9c91a 100644 --- a/server-utils/src/suspendable_stream.rs +++ b/server-utils/src/suspendable_stream.rs @@ -17,7 +17,7 @@ pub struct SuspendableStream { next_delay: Duration, initial_delay: Duration, max_delay: Duration, - timeout: Option, + suspended_until: Option, } impl SuspendableStream { @@ -29,7 +29,7 @@ impl SuspendableStream { next_delay: Duration::from_millis(20), initial_delay: Duration::from_millis(10), max_delay: Duration::from_secs(5), - timeout: None, + suspended_until: None, } } } @@ -42,13 +42,17 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { loop { - if let Some(timeout) = &self.timeout { - let timeout = tokio::time::Instant::from_std(*timeout); - let sleep = tokio::time::sleep_until(timeout); + // If we encountered a connection error before then we suspend + // polling from the underlying stream for a bit + if let Some(deadline) = &mut self.suspended_until { + let deadline = tokio::time::Instant::from_std(*deadline); + let sleep = tokio::time::sleep_until(deadline); futures::pin_mut!(sleep); match sleep.poll(cx) { Poll::Pending => return Poll::Pending, - Poll::Ready(()) => {} + Poll::Ready(()) => { + self.suspended_until = None; + } } } @@ -79,7 +83,7 @@ where }; debug!("Error accepting connection: {}", err); debug!("The server will stop accepting connections for {:?}", self.next_delay); - self.timeout = Some(Instant::now() + self.next_delay); + self.suspended_until = Some(Instant::now() + self.next_delay); } } }