diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 0372514dba7d..83ab5fd3335a 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -64,6 +64,10 @@ tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-ut [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.29.0", features = ["fs"] } +[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies] +web-time = { version = "1.1.0" } +wasm-bindgen-futures = "0.4.18" + [features] default = ["fs"] cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded"] diff --git a/object_store/src/client/body.rs b/object_store/src/client/body.rs index 8f62afa4ff2e..94634b0ab256 100644 --- a/object_store/src/client/body.rs +++ b/object_store/src/client/body.rs @@ -48,6 +48,13 @@ impl HttpRequestBody { )), } } + #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] + pub(crate) fn into_reqwest(self) -> reqwest::Body { + match self.0 { + Inner::Bytes(b) => b.into(), + Inner::PutPayload(_, payload) => Into::::into(payload).into(), + } + } /// Returns true if this body is empty pub fn is_empty(&self) -> bool { diff --git a/object_store/src/client/connection.rs b/object_store/src/client/connection.rs index 7e2daf4cdb81..d2229d094586 100644 --- a/object_store/src/client/connection.rs +++ b/object_store/src/client/connection.rs @@ -224,6 +224,57 @@ impl HttpService for reqwest::Client { } } +#[async_trait] +#[cfg(all(target_arch = "wasm32", target_os = "unknown"))] +impl HttpService for reqwest::Client { + async fn call(&self, req: HttpRequest) -> Result { + let (parts, body) = req.into_parts(); + let url = parts.uri.to_string().parse().unwrap(); + let mut req = reqwest::Request::new(parts.method, url); + *req.headers_mut() = parts.headers; + *req.body_mut() = Some(body.into_reqwest()); + + use futures::{ + channel::{mpsc, oneshot}, + SinkExt, StreamExt, TryStreamExt, + }; + use http_body_util::{Empty, StreamBody}; + use wasm_bindgen_futures::spawn_local; + + let (mut tx, rx) = mpsc::channel(1); + let (tx_parts, rx_parts) = oneshot::channel(); + let res_fut = self.execute(req); + + spawn_local(async move { + match res_fut.await.map_err(HttpError::reqwest) { + Err(err) => { + let _ = tx_parts.send(Err(err)); + drop(tx); + } + Ok(res) => { + let (mut parts, _) = http::Response::new(Empty::<()>::new()).into_parts(); + parts.headers = res.headers().clone(); + parts.status = res.status(); + let _ = tx_parts.send(Ok(parts)); + let mut stream = res.bytes_stream().map_err(HttpError::reqwest); + while let Some(chunk) = stream.next().await { + tx.send(chunk).await.unwrap(); + } + } + } + }); + + let parts = rx_parts.await.unwrap()?; + let safe_stream = rx.map(|chunk| { + let frame = hyper::body::Frame::data(chunk?); + Ok(frame) + }); + let body = HttpResponseBody::new(StreamBody::new(safe_stream)); + + Ok(HttpResponse::from_parts(parts, body)) + } +} + /// A factory for [`HttpClient`] pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static { /// Create a new [`HttpClient`] with the provided [`ClientOptions`] @@ -233,10 +284,10 @@ pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static { /// [`HttpConnector`] using [`reqwest::Client`] #[derive(Debug, Default)] #[allow(missing_copy_implementations)] -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))] pub struct ReqwestConnector {} -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))] impl HttpConnector for ReqwestConnector { fn connect(&self, options: &ClientOptions) -> crate::Result { let client = options.client()?; @@ -244,21 +295,21 @@ impl HttpConnector for ReqwestConnector { } } -#[cfg(target_arch = "wasm32")] +#[cfg(all(target_arch = "wasm32", target_os = "wasi"))] pub(crate) fn http_connector( custom: Option>, ) -> crate::Result> { match custom { Some(x) => Ok(x), None => Err(crate::Error::NotSupported { - source: "WASM32 architectures must provide an HTTPConnector" + source: "WASI architectures must provide an HTTPConnector" .to_string() .into(), }), } } -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))] pub(crate) fn http_connector( custom: Option>, ) -> crate::Result> { diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index bd0347b4311c..f6623a43130e 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -50,7 +50,7 @@ pub(crate) mod builder; mod connection; pub(crate) use connection::http_connector; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))] pub use connection::ReqwestConnector; pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService}; @@ -718,6 +718,22 @@ impl ClientOptions { .build() .map_err(map_client_error) } + + #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] + pub(crate) fn client(&self) -> Result { + let mut builder = reqwest::ClientBuilder::new(); + + match &self.user_agent { + Some(user_agent) => builder = builder.user_agent(user_agent.get()?), + None => builder = builder.user_agent(DEFAULT_USER_AGENT), + } + + if let Some(headers) = &self.default_headers { + builder = builder.default_headers(headers.clone()) + } + + builder.build().map_err(map_client_error) + } } pub(crate) trait GetOptionsExt { diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index 96244aac9b0f..2bbf7c25e59c 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -26,7 +26,11 @@ use futures::future::BoxFuture; use http::{Method, Uri}; use reqwest::header::LOCATION; use reqwest::StatusCode; +#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] use std::time::{Duration, Instant}; +#[cfg(all(target_arch = "wasm32", target_os = "unknown"))] +use web_time::{Duration, Instant}; + use tracing::info; /// Retry request error