Skip to content

Commit a9a8f6d

Browse files
authored
middleware: RpcServiceT distinct return types for notif, batch, call (#1564)
* middleware: fix option layer Close #1560 * middleware: RpcServiceT unique ret tys for futures * remove ToJson trait bound from RpcServiceT * fix nit in error docs * fix tests * move ToJson to core::traits * remove future.boxed() in RpcService impl * remove needless clone * simplify code in example
1 parent 1b0799c commit a9a8f6d

26 files changed

+488
-478
lines changed

client/http-client/src/client.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use crate::{HttpRequest, HttpResponse};
3535
use hyper::body::Bytes;
3636
use hyper::http::{Extensions, HeaderMap};
3737
use jsonrpsee_core::client::{
38-
BatchResponse, ClientT, Error, IdKind, MethodResponse, RequestIdManager, Subscription, SubscriptionClientT,
39-
generate_batch_id_range,
38+
BatchResponse, ClientT, Error, IdKind, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse,
39+
RequestIdManager, Subscription, SubscriptionClientT, generate_batch_id_range,
4040
};
4141
use jsonrpsee_core::middleware::layer::{RpcLogger, RpcLoggerLayer};
4242
use jsonrpsee_core::middleware::{Batch, RpcServiceBuilder, RpcServiceT};
@@ -351,7 +351,12 @@ impl HttpClient<HttpBackend> {
351351

352352
impl<S> ClientT for HttpClient<S>
353353
where
354-
S: RpcServiceT<Error = Error, Response = MethodResponse> + Send + Sync,
354+
S: RpcServiceT<
355+
MethodResponse = Result<MiddlewareMethodResponse, Error>,
356+
BatchResponse = Result<MiddlewareBatchResponse, Error>,
357+
NotificationResponse = Result<MiddlewareNotifResponse, Error>,
358+
> + Send
359+
+ Sync,
355360
{
356361
fn notification<Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<(), Error>> + Send
357362
where
@@ -363,13 +368,9 @@ where
363368
None => None,
364369
};
365370
let params = params.to_rpc_params()?.map(StdCow::Owned);
371+
let fut = self.service.notification(Notification::new(method.into(), params));
366372

367-
run_future_until_timeout(
368-
self.service.notification(Notification::new(method.into(), params)),
369-
self.request_timeout,
370-
)
371-
.await
372-
.map_err(|e| Error::Transport(e.into()))?;
373+
run_future_until_timeout(fut, self.request_timeout).await.map_err(|e| Error::Transport(e.into()))?;
373374
Ok(())
374375
}
375376
}
@@ -392,8 +393,7 @@ where
392393
self.request_timeout,
393394
)
394395
.await?
395-
.into_method_call()
396-
.expect("Method call must return a method call reponse; qed");
396+
.into_response();
397397

398398
let rp = ResponseSuccess::try_from(method_response.into_inner())?;
399399

@@ -405,7 +405,7 @@ where
405405
fn batch_request<'a, R>(
406406
&self,
407407
batch: BatchRequestBuilder<'a>,
408-
) -> impl Future<Output = Result<BatchResponse<'a, R>, Error>> + Send
408+
) -> impl Future<Output = Result<jsonrpsee_core::client::BatchResponse<'a, R>, Error>> + Send
409409
where
410410
R: DeserializeOwned + fmt::Debug + 'a,
411411
{
@@ -431,19 +431,18 @@ where
431431
batch_request.push(req);
432432
}
433433

434-
let rp = run_future_until_timeout(self.service.batch(batch_request), self.request_timeout).await?;
435-
let json_rps = rp.into_batch().expect("Batch must return a batch reponse; qed");
434+
let rps = run_future_until_timeout(self.service.batch(batch_request), self.request_timeout).await?;
436435

437436
let mut batch_response = Vec::new();
438437
let mut success = 0;
439438
let mut failed = 0;
440439

441440
// Fill the batch response with placeholder values.
442-
for _ in 0..json_rps.len() {
441+
for _ in 0..rps.len() {
443442
batch_response.push(Err(ErrorObject::borrowed(0, "", None)));
444443
}
445444

446-
for rp in json_rps.into_iter() {
445+
for rp in rps.into_iter() {
447446
let id = rp.id().try_parse_inner_as_number()?;
448447

449448
let res = match ResponseSuccess::try_from(rp.into_inner()) {
@@ -477,7 +476,12 @@ where
477476

478477
impl<S> SubscriptionClientT for HttpClient<S>
479478
where
480-
S: RpcServiceT<Error = Error, Response = MethodResponse> + Send + Sync,
479+
S: RpcServiceT<
480+
MethodResponse = Result<MiddlewareMethodResponse, Error>,
481+
BatchResponse = Result<MiddlewareBatchResponse, Error>,
482+
NotificationResponse = Result<MiddlewareNotifResponse, Error>,
483+
> + Send
484+
+ Sync,
481485
{
482486
/// Send a subscription request to the server. Not implemented for HTTP; will always return
483487
/// [`Error::HttpNotImplemented`].
@@ -503,9 +507,9 @@ where
503507
}
504508
}
505509

506-
async fn run_future_until_timeout<F>(fut: F, timeout: Duration) -> Result<MethodResponse, Error>
510+
async fn run_future_until_timeout<F, T>(fut: F, timeout: Duration) -> Result<T, Error>
507511
where
508-
F: std::future::Future<Output = Result<MethodResponse, Error>>,
512+
F: std::future::Future<Output = Result<T, Error>>,
509513
{
510514
match tokio::time::timeout(timeout, fut).await {
511515
Ok(Ok(r)) => Ok(r),

client/http-client/src/rpc_service.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use hyper::body::Bytes;
44
use jsonrpsee_core::{
55
BoxError, JsonRawValue,
6-
client::{Error, MethodResponse},
6+
client::{Error, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse},
77
middleware::{Batch, Notification, Request, RpcServiceT},
88
};
99
use jsonrpsee_types::Response;
@@ -34,21 +34,24 @@ where
3434
B::Data: Send,
3535
B::Error: Into<BoxError>,
3636
{
37-
type Error = Error;
38-
type Response = MethodResponse;
37+
type BatchResponse = Result<MiddlewareBatchResponse, Error>;
38+
type MethodResponse = Result<MiddlewareMethodResponse, Error>;
39+
type NotificationResponse = Result<MiddlewareNotifResponse, Error>;
3940

40-
fn call<'a>(&self, request: Request<'a>) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send + 'a {
41+
fn call<'a>(&self, request: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
4142
let service = self.service.clone();
4243

4344
async move {
4445
let raw = serde_json::to_string(&request)?;
4546
let bytes = service.send_and_read_body(raw).await.map_err(|e| Error::Transport(e.into()))?;
46-
let json_rp: Response<Box<JsonRawValue>> = serde_json::from_slice(&bytes)?;
47-
Ok(MethodResponse::method_call(json_rp.into_owned().into(), request.extensions))
47+
let mut rp: Response<Box<JsonRawValue>> = serde_json::from_slice(&bytes)?;
48+
rp.extensions = request.extensions;
49+
50+
Ok(MiddlewareMethodResponse::response(rp.into_owned().into()))
4851
}
4952
}
5053

51-
fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send + 'a {
54+
fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
5255
let service = self.service.clone();
5356

5457
async move {
@@ -59,20 +62,20 @@ where
5962
.map(|r| r.into_owned().into())
6063
.collect();
6164

62-
Ok(MethodResponse::batch(rp, batch.into_extensions()))
65+
Ok(rp)
6366
}
6467
}
6568

6669
fn notification<'a>(
6770
&self,
6871
notif: Notification<'a>,
69-
) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send + 'a {
72+
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
7073
let service = self.service.clone();
7174

7275
async move {
7376
let raw = serde_json::to_string(&notif)?;
7477
service.send(raw).await.map_err(|e| Error::Transport(e.into()))?;
75-
Ok(MethodResponse::notification(notif.extensions))
78+
Ok(notif.extensions.into())
7679
}
7780
}
7881
}

core/src/client/async_client/mod.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ mod manager;
3131
mod rpc_service;
3232
mod utils;
3333

34-
pub use rpc_service::{Error as RpcServiceError, RpcService};
34+
pub use rpc_service::RpcService;
3535

3636
use std::borrow::Cow as StdCow;
3737
use std::time::Duration;
@@ -66,7 +66,10 @@ use tokio::sync::{mpsc, oneshot};
6666
use tower::layer::util::Identity;
6767

6868
use self::utils::{InactivityCheck, IntervalStream};
69-
use super::{FrontToBack, IdKind, MethodResponse, RequestIdManager, generate_batch_id_range, subscription_channel};
69+
use super::{
70+
FrontToBack, IdKind, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse, RequestIdManager,
71+
generate_batch_id_range, subscription_channel,
72+
};
7073

7174
pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<Box<JsonRawValue>>>;
7275

@@ -455,16 +458,13 @@ impl<L> Client<L> {
455458
!self.to_back.is_closed()
456459
}
457460

458-
async fn run_future_until_timeout(
459-
&self,
460-
fut: impl Future<Output = Result<MethodResponse, RpcServiceError>>,
461-
) -> Result<MethodResponse, Error> {
461+
async fn run_future_until_timeout<T>(&self, fut: impl Future<Output = Result<T, Error>>) -> Result<T, Error> {
462462
tokio::pin!(fut);
463463

464464
match futures_util::future::select(fut, futures_timer::Delay::new(self.request_timeout)).await {
465465
Either::Left((Ok(r), _)) => Ok(r),
466-
Either::Left((Err(RpcServiceError::Client(e)), _)) => Err(e),
467-
Either::Left((Err(RpcServiceError::FetchFromBackend), _)) => Err(self.on_disconnect().await),
466+
Either::Left((Err(Error::ServiceDisconnect), _)) => Err(self.on_disconnect().await),
467+
Either::Left((Err(e), _)) => Err(e),
468468
Either::Right(_) => Err(Error::RequestTimeout),
469469
}
470470
}
@@ -495,7 +495,12 @@ impl<L> Drop for Client<L> {
495495

496496
impl<L> ClientT for Client<L>
497497
where
498-
L: RpcServiceT<Error = RpcServiceError, Response = MethodResponse> + Send + Sync,
498+
L: RpcServiceT<
499+
MethodResponse = Result<MiddlewareMethodResponse, Error>,
500+
BatchResponse = Result<MiddlewareBatchResponse, Error>,
501+
NotificationResponse = Result<MiddlewareNotifResponse, Error>,
502+
> + Send
503+
+ Sync,
499504
{
500505
fn notification<Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<(), Error>> + Send
501506
where
@@ -520,8 +525,8 @@ where
520525
let id = self.id_manager.next_request_id();
521526
let params = params.to_rpc_params()?;
522527
let fut = self.service.call(Request::borrowed(method, params.as_deref(), id.clone()));
523-
let rp = self.run_future_until_timeout(fut).await?.into_method_call().expect("Method call response");
524-
let success = ResponseSuccess::try_from(rp.into_inner())?;
528+
let rp = self.run_future_until_timeout(fut).await?;
529+
let success = ResponseSuccess::try_from(rp.into_response().into_inner())?;
525530

526531
serde_json::from_str(success.result.get()).map_err(Into::into)
527532
}
@@ -554,7 +559,7 @@ where
554559
b.extensions_mut().insert(IsBatch { id_range });
555560

556561
let fut = self.service.batch(b);
557-
let json_values = self.run_future_until_timeout(fut).await?.into_batch().expect("Batch response");
562+
let json_values = self.run_future_until_timeout(fut).await?;
558563

559564
let mut responses = Vec::with_capacity(json_values.len());
560565
let mut successful_calls = 0;
@@ -580,7 +585,12 @@ where
580585

581586
impl<L> SubscriptionClientT for Client<L>
582587
where
583-
L: RpcServiceT<Error = RpcServiceError, Response = MethodResponse> + Send + Sync,
588+
L: RpcServiceT<
589+
MethodResponse = Result<MiddlewareMethodResponse, Error>,
590+
BatchResponse = Result<MiddlewareBatchResponse, Error>,
591+
NotificationResponse = Result<MiddlewareNotifResponse, Error>,
592+
> + Send
593+
+ Sync,
584594
{
585595
/// Send a subscription request to the server.
586596
///
@@ -617,10 +627,12 @@ where
617627
};
618628

619629
let fut = self.service.call(req);
620-
let (sub_id, notifs_rx) =
621-
self.run_future_until_timeout(fut).await?.into_subscription().expect("Subscription response");
622-
623-
Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
630+
let sub = self
631+
.run_future_until_timeout(fut)
632+
.await?
633+
.into_subscription()
634+
.expect("Extensions set to subscription, must return subscription; qed");
635+
Ok(Subscription::new(self.to_back.clone(), sub.stream, SubscriptionKind::Subscription(sub.sub_id)))
624636
}
625637
}
626638

0 commit comments

Comments
 (0)