Skip to content

feat(torii-transport): custom transport with retries & user agent #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ torii-grpc-client = { path = "crates/grpc/client" }
torii-grpc-server = { path = "crates/grpc/server" }
torii-adigraphmap = { path = "crates/adigraphmap" }
torii-task-network = { path = "crates/task-network" }
torii-transport = { path = "crates/transport" }

# macros
merge-options = { git = "https://github.com/dojoengine/dojo", rev = "82fe9bd" }
Expand Down
1 change: 1 addition & 0 deletions crates/runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ torii-libp2p-relay = { workspace = true }
torii-server.workspace = true
torii-processors.workspace = true
tower.workspace = true
torii-transport.workspace = true

tempfile.workspace = true
tower-http.workspace = true
Expand Down
16 changes: 16 additions & 0 deletions crates/transport/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
edition.workspace = true
license.workspace = true
name = "torii-transport"
repository.workspace = true
version.workspace = true

[dependencies]
tracing.workspace = true
futures-util.workspace = true
tokio.workspace = true
reqwest.workspace = true
serde_json.workspace = true
starknet.workspace = true
serde.workspace = true
async-trait.workspace = true
15 changes: 15 additions & 0 deletions crates/transport/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/// Errors using [`HttpTransport`].
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum HttpTransportError {
/// HTTP-related errors.
Reqwest(reqwest::Error),
/// JSON serialization/deserialization errors.
Json(serde_json::Error),
/// Unexpected response ID.
#[error("unexpected response ID: {0}")]
UnexpectedResponseId(u64),
/// Retries exhausted.
#[error("retries exhausted after {max_retries} attempts: {last_error}")]
RetriesExhausted { max_retries: u32, last_error: Box<HttpTransportError> },
}
273 changes: 273 additions & 0 deletions crates/transport/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
use async_trait::async_trait;
use tracing::trace;
use reqwest::{Client, Url};
use serde::{de::DeserializeOwned, Serialize};
use tokio::time;

use starknet::providers::{
jsonrpc::{JsonRpcTransport, JsonRpcMethod, JsonRpcResponse},
ProviderRequestData,
};

pub mod error;

pub use error::HttpTransportError;

/// A [`JsonRpcTransport`] implementation that uses HTTP connections.
#[derive(Debug, Clone)]
pub struct HttpTransport {
client: Client,
url: Url,
headers: Vec<(String, String)>,
max_retries: u32,
retry_delay_ms: u64,
}

#[derive(Debug, Serialize)]
struct JsonRpcRequest<T> {
id: u64,
jsonrpc: &'static str,
method: JsonRpcMethod,
params: T,
}

impl HttpTransport {
/// Constructs [`HttpTransport`] from a JSON-RPC server URL, using default HTTP client settings.
/// Defaults to 3 retries with a 500ms base delay.
///
/// To use custom HTTP settings (e.g. proxy, timeout), use
/// [`new_with_client`](fn.new_with_client) instead.
pub fn new(url: impl Into<Url>) -> Self {
Self::new_with_client(url, Client::new())
}

/// Constructs [`HttpTransport`] from a JSON-RPC server URL and a custom `reqwest` client.
/// Defaults to 3 retries with a 500ms base delay.
pub fn new_with_client(url: impl Into<Url>, client: Client) -> Self {
Self {
client,
url: url.into(),
headers: vec![],
max_retries: 3, // Default max retries
retry_delay_ms: 500, // Default base delay in ms
}
}

/// Sets the maximum number of retries for requests.
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}

/// Sets the base delay in milliseconds for exponential backoff.
pub fn with_retry_delay_ms(mut self, retry_delay_ms: u64) -> Self {
self.retry_delay_ms = retry_delay_ms;
self
}

/// Consumes the current [`HttpTransport`] instance and returns a new one with the header
/// appended. Same as calling [`add_header`](fn.add_header).
pub fn with_header(self, name: String, value: String) -> Self {
let mut headers = self.headers;
headers.push((name, value));

Self {
client: self.client,
url: self.url,
headers,
max_retries: self.max_retries,
retry_delay_ms: self.retry_delay_ms,
}
}

/// Adds a custom HTTP header to be sent for requests.
pub fn add_header(&mut self, name: String, value: String) {
self.headers.push((name, value))
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl JsonRpcTransport for HttpTransport {
type Error = HttpTransportError;

async fn send_request<P, R>(
&self,
method: JsonRpcMethod,
params: P,
) -> Result<JsonRpcResponse<R>, Self::Error>
where
P: Serialize + Send,
R: DeserializeOwned,
{
let mut attempts = 0;
let mut last_error: Option<HttpTransportError> = None;

while attempts <= self.max_retries {
let request_body_data = JsonRpcRequest {
id: 1,
jsonrpc: "2.0",
method,
params: &params,
};

let request_body_json = match serde_json::to_string(&request_body_data) {
Ok(json) => json,
Err(e) => return Err(HttpTransportError::Json(e)),
};

trace!("Sending request via JSON-RPC (attempt {}): {}", attempts + 1, request_body_json);

let mut request_builder = self
.client
.post(self.url.clone())
.body(request_body_json.clone())
.header("Content-Type", "application/json");

for (name, value) in &self.headers {
request_builder = request_builder.header(name, value);
}

match request_builder.send().await {
Ok(response) => {
let response_text = match response.text().await {
Ok(text) => text,
Err(e) => {
last_error = Some(HttpTransportError::Reqwest(e));
attempts += 1;
if attempts <= self.max_retries && self.retry_delay_ms > 0 {
let delay = self.retry_delay_ms * (2u64.pow(attempts -1));
time::sleep(time::Duration::from_millis(delay)).await;
}
continue;
}
};
trace!("Response from JSON-RPC: {}", response_text);
match serde_json::from_str(&response_text) {
Ok(parsed) => return Ok(parsed),
Err(e) => return Err(HttpTransportError::Json(e)),
}
}
Err(e) => {
last_error = Some(HttpTransportError::Reqwest(e));
attempts += 1;
if attempts <= self.max_retries && self.retry_delay_ms > 0 {
let delay = self.retry_delay_ms * (2u64.pow(attempts - 1));
time::sleep(time::Duration::from_millis(delay)).await;
}
}
}
}
Err(HttpTransportError::RetriesExhausted {
max_retries: self.max_retries,
last_error: Box::new(last_error.unwrap_or_else(|| HttpTransportError::Reqwest(reqwest::Error::from(std::io::Error::new(std::io::ErrorKind::Other, "Unknown error during request processing")))))
})
}

async fn send_requests<R>(
&self,
requests_data: R,
) -> Result<Vec<JsonRpcResponse<serde_json::Value>>, Self::Error>
where
R: AsRef<[ProviderRequestData]> + Send + Sync,
{
let mut attempts = 0;
let mut last_error: Option<HttpTransportError> = None;

let original_request_bodies: Vec<_> = requests_data
.as_ref()
.iter()
.enumerate()
.map(|(ind, request_item)| JsonRpcRequest {
id: ind as u64,
jsonrpc: "2.0",
method: request_item.jsonrpc_method(),
params: request_item,
})
.collect();

let request_count = original_request_bodies.len();

while attempts <= self.max_retries {
let request_body_json = match serde_json::to_string(&original_request_bodies) {
Ok(json) => json,
Err(e) => return Err(HttpTransportError::Json(e)),
};
trace!("Sending batch request via JSON-RPC (attempt {}): {}", attempts + 1, request_body_json);

let mut request_builder = self
.client
.post(self.url.clone())
.body(request_body_json)
.header("Content-Type", "application/json");

for (name, value) in &self.headers {
request_builder = request_builder.header(name, value);
}

match request_builder.send().await {
Ok(response) => {
let response_text = match response.text().await {
Ok(text) => text,
Err(e) => {
last_error = Some(HttpTransportError::Reqwest(e));
attempts += 1;
if attempts <= self.max_retries && self.retry_delay_ms > 0 {
let delay = self.retry_delay_ms * (2u64.pow(attempts -1));
time::sleep(time::Duration::from_millis(delay)).await;
}
continue;
}
};
trace!("Response from JSON-RPC: {}", response_text);

let parsed_response_batch: Vec<JsonRpcResponse<serde_json::Value>> =
match serde_json::from_str(&response_text) {
Ok(parsed) => parsed,
Err(e) => return Err(HttpTransportError::Json(e)),
};

let mut responses_ordered: Vec<Option<JsonRpcResponse<serde_json::Value>>> = vec![];
responses_ordered.resize(request_count, None);

for response_item in parsed_response_batch {
let id = match &response_item {
JsonRpcResponse::Success { id, .. } | JsonRpcResponse::Error { id, .. } => {
*id as usize
}
};

if id >= request_count {
return Err(HttpTransportError::UnexpectedResponseId(id as u64));
}
responses_ordered[id] = Some(response_item);
}

if responses_ordered.iter().any(Option::is_none) {
last_error = Some(HttpTransportError::UnexpectedResponseId(request_count as u64));
attempts += 1;
if attempts <= self.max_retries && self.retry_delay_ms > 0 {
let delay = self.retry_delay_ms * (2u64.pow(attempts -1));
time::sleep(time::Duration::from_millis(delay)).await;
}
continue;
}

return Ok(responses_ordered.into_iter().flatten().collect::<Vec<_>>());
}
Err(e) => {
last_error = Some(HttpTransportError::Reqwest(e));
attempts += 1;
if attempts <= self.max_retries && self.retry_delay_ms > 0 {
let delay = self.retry_delay_ms * (2u64.pow(attempts-1));
time::sleep(time::Duration::from_millis(delay)).await;
}
}
}
}
Err(HttpTransportError::RetriesExhausted {
max_retries: self.max_retries,
last_error: Box::new(last_error.unwrap_or_else(|| HttpTransportError::Reqwest(reqwest::Error::from(std::io::Error::new(std::io::ErrorKind::Other, "Unknown error during batch request processing")))))
})
}
}
Loading