diff --git a/core-client/Cargo.toml b/core-client/Cargo.toml index e4340540..22920e86 100644 --- a/core-client/Cargo.toml +++ b/core-client/Cargo.toml @@ -24,6 +24,7 @@ http = ["jsonrpc-client-transports/http"] ws = ["jsonrpc-client-transports/ws"] ipc = ["jsonrpc-client-transports/ipc"] arbitrary_precision = ["jsonrpc-client-transports/arbitrary_precision"] +wasmhttp = ["jsonrpc-client-transports/wasmhttp"] [dependencies] jsonrpc-client-transports = { version = "18.0.0", path = "./transports", default-features = false } diff --git a/core-client/transports/Cargo.toml b/core-client/transports/Cargo.toml index 66ef04a6..8bd53c4c 100644 --- a/core-client/transports/Cargo.toml +++ b/core-client/transports/Cargo.toml @@ -32,6 +32,7 @@ ipc = [ "jsonrpc-server-utils", "tokio", ] +wasmhttp = ["gloo-net"] arbitrary_precision = ["serde_json/arbitrary_precision", "jsonrpc-core/arbitrary_precision"] [dependencies] @@ -49,6 +50,7 @@ jsonrpc-server-utils = { version = "18.0.0", path = "../../server-utils", option parity-tokio-ipc = { version = "0.9", optional = true } tokio = { version = "1", optional = true } websocket = { version = "0.26", optional = true } +gloo-net = { version = "0.1.0", optional = true, features=["http"] } [dev-dependencies] assert_matches = "1.1" diff --git a/core-client/transports/src/transports/mod.rs b/core-client/transports/src/transports/mod.rs index 9df534a0..b9b2e825 100644 --- a/core-client/transports/src/transports/mod.rs +++ b/core-client/transports/src/transports/mod.rs @@ -15,6 +15,8 @@ pub mod ipc; pub mod local; #[cfg(feature = "ws")] pub mod ws; +#[cfg(feature = "wasmhttp")] +pub mod wasmhttp; pub use duplex::duplex; diff --git a/core-client/transports/src/transports/wasmhttp.rs b/core-client/transports/src/transports/wasmhttp.rs new file mode 100644 index 00000000..e9e9598a --- /dev/null +++ b/core-client/transports/src/transports/wasmhttp.rs @@ -0,0 +1,80 @@ +//! HTTP client for wasm via gloo_net +use super::RequestBuilder; +use gloo_net::http; +use futures::{future, Future, FutureExt, StreamExt, TryFutureExt}; +use crate::{RpcChannel, RpcError, RpcMessage, RpcResult}; + + +/// Create a HTTP Client via gloo_net for wasm +pub async fn connect(url: &str) -> RpcResult<(TClient, impl Future + '_)> +where TClient: From +{ + let max_parallel = 8; + // Keep track of internal request IDs when building subsequent requests + let mut request_builder = RequestBuilder::new(); + + let (sender, receiver) = futures::channel::mpsc::unbounded(); + + let fut = receiver + .filter_map(move |msg: RpcMessage| { + future::ready(match msg { + RpcMessage::Call(call) => { + let (_, request) = request_builder.call_request(&call); + Some((request, Some(call.sender))) + } + RpcMessage::Notify(notify) => Some((request_builder.notification(¬ify), None)), + RpcMessage::Subscribe(_) => { + log::warn!("Unsupported `RpcMessage` type `Subscribe`."); + None + } + }) + }) + .map(move |(request, sender)| { + log::info!("got request {:?}", request); + let request = http::Request::post(&url) + .header( + "Content-Type", + "application/json", + ) + .header( + "Accept", + "application/json", + ) + .body(request); + + request.send() + .then(|response| async move { (response, sender) }) + }) + .buffer_unordered(max_parallel) + .for_each(|(response, sender)| async { + let result = match response { + Ok(ref res) if !res.ok() => { + log::trace!("http result status {}", res.status()); + Err(RpcError::Client(format!( + "Unexpected response status code: {}", + res.status() + ))) + } + Err(err) => Err(RpcError::Other(Box::new(err))), + Ok(res) => { + res.binary() + .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e))) + .await + } + }; + + if let Some(sender) = sender { + let response = result + .and_then(|response| { + let response_str = String::from_utf8_lossy(response.as_ref()).into_owned(); + super::parse_response(&response_str) + }) + .and_then(|r| r.1); + if let Err(err) = sender.send(response) { + log::warn!("Error resuming asynchronous request: {:?}", err); + } + } + }); + + Ok((TClient::from(sender.into()), fut)) +}