Skip to content

Commit 59030b8

Browse files
Soubhik-10mattsse
andauthored
feat: added TransactionStream to subscribe to pending transactions (#10482)
* added new method to subscribe to pending tx * wip * clippy -.- * clippy -.- * unbounded channel * wip * wip test * touchup * check --------- Co-authored-by: Matthias Seitz <[email protected]>
1 parent 48533bb commit 59030b8

File tree

4 files changed

+92
-15
lines changed

4 files changed

+92
-15
lines changed

crates/anvil/src/eth/api.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,14 @@ use foundry_evm::{
8989
primitives::BlockEnv,
9090
},
9191
};
92-
use futures::channel::{mpsc::Receiver, oneshot};
92+
use futures::{
93+
channel::{mpsc::Receiver, oneshot},
94+
StreamExt,
95+
};
9396
use parking_lot::RwLock;
9497
use revm::primitives::Bytecode;
9598
use std::{future::Future, sync::Arc, time::Duration};
99+
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
96100

97101
/// The client version: `anvil/v{major}.{minor}.{patch}`
98102
pub const CLIENT_VERSION: &str = concat!("anvil/v", env!("CARGO_PKG_VERSION"));
@@ -2863,6 +2867,26 @@ impl EthApi {
28632867
self.pool.add_ready_listener()
28642868
}
28652869

2870+
/// Returns a listener for pending transactions, yielding full transactions
2871+
pub fn full_pending_transactions(&self) -> UnboundedReceiver<AnyRpcTransaction> {
2872+
let (tx, rx) = unbounded_channel();
2873+
let mut hashes = self.new_ready_transactions();
2874+
2875+
let this = self.clone();
2876+
2877+
tokio::spawn(async move {
2878+
while let Some(hash) = hashes.next().await {
2879+
if let Ok(Some(txn)) = this.transaction_by_hash(hash).await {
2880+
if tx.send(txn).is_err() {
2881+
break;
2882+
}
2883+
}
2884+
}
2885+
});
2886+
2887+
rx
2888+
}
2889+
28662890
/// Returns a new accessor for certain storage elements
28672891
pub fn storage_info(&self) -> StorageInfo {
28682892
StorageInfo::new(Arc::clone(&self.backend))

crates/anvil/src/pubsub.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{
22
eth::{backend::notifications::NewBlockNotifications, error::to_rpc_result},
33
StorageInfo,
44
};
5+
use alloy_network::AnyRpcTransaction;
56
use alloy_primitives::{TxHash, B256};
67
use alloy_rpc_types::{pubsub::SubscriptionResult, FilteredParams, Log, Transaction};
78
use anvil_core::eth::{block::Block, subscription::SubscriptionId, transaction::TypedReceipt};
@@ -13,6 +14,7 @@ use std::{
1314
pin::Pin,
1415
task::{Context, Poll},
1516
};
17+
use tokio::sync::mpsc::UnboundedReceiver;
1618

1719
/// Listens for new blocks and matching logs emitted in that block
1820
#[derive(Debug)]
@@ -86,6 +88,7 @@ pub enum EthSubscription {
8688
Logs(Box<LogsSubscription>),
8789
Header(NewBlockNotifications, StorageInfo, SubscriptionId),
8890
PendingTransactions(Receiver<TxHash>, SubscriptionId),
91+
FullPendingTransactions(UnboundedReceiver<AnyRpcTransaction>, SubscriptionId),
8992
}
9093

9194
impl EthSubscription {
@@ -120,6 +123,13 @@ impl EthSubscription {
120123
});
121124
Poll::Ready(res)
122125
}
126+
Self::FullPendingTransactions(tx, id) => {
127+
let res = ready!(tx.poll_recv(cx)).map(to_rpc_result).map(|result| {
128+
let params = EthSubscriptionParams { subscription: id.clone(), result };
129+
EthSubscriptionResponse::new(params)
130+
});
131+
Poll::Ready(res)
132+
}
123133
}
124134
}
125135
}

crates/anvil/src/server/handler.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,20 +58,22 @@ impl PubSubEthRpcHandler {
5858
let canceled = cx.remove_subscription(&id).is_some();
5959
ResponseResult::Success(canceled.into())
6060
}
61-
EthPubSub::EthSubscribe(kind, params) => {
62-
let filter = match *params {
61+
EthPubSub::EthSubscribe(kind, raw_params) => {
62+
let filter = match &*raw_params {
6363
Params::None => None,
64-
Params::Logs(filter) => Some(*filter),
65-
Params::Bool(_) => {
66-
return ResponseResult::Error(RpcError::invalid_params(
67-
"Expected params for logs subscription",
68-
))
69-
}
64+
Params::Logs(filter) => Some(filter.clone()),
65+
Params::Bool(_) => None,
7066
};
71-
let params = FilteredParams::new(filter);
67+
let params = FilteredParams::new(filter.map(|b| *b));
7268

7369
let subscription = match kind {
7470
SubscriptionKind::Logs => {
71+
if raw_params.is_bool() {
72+
return ResponseResult::Error(RpcError::invalid_params(
73+
"Expected params for logs subscription",
74+
))
75+
}
76+
7577
trace!(target: "rpc::ws", "received logs subscription {:?}", params);
7678
let blocks = self.api.new_block_notifications();
7779
let storage = self.api.storage_info();
@@ -91,10 +93,23 @@ impl PubSubEthRpcHandler {
9193
}
9294
SubscriptionKind::NewPendingTransactions => {
9395
trace!(target: "rpc::ws", "received pending transactions subscription");
94-
EthSubscription::PendingTransactions(
95-
self.api.new_ready_transactions(),
96-
id.clone(),
97-
)
96+
match *raw_params {
97+
Params::Bool(true) => EthSubscription::FullPendingTransactions(
98+
self.api.full_pending_transactions(),
99+
id.clone(),
100+
),
101+
Params::Bool(false) | Params::None => {
102+
EthSubscription::PendingTransactions(
103+
self.api.new_ready_transactions(),
104+
id.clone(),
105+
)
106+
}
107+
_ => {
108+
return ResponseResult::Error(RpcError::invalid_params(
109+
"Expected boolean parameter for newPendingTransactions",
110+
))
111+
}
112+
}
98113
}
99114
SubscriptionKind::Syncing => {
100115
return RpcError::internal_error_with("Not implemented").into()

crates/anvil/tests/it/transaction.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
};
55
use alloy_network::{EthereumWallet, TransactionBuilder, TransactionResponse};
66
use alloy_primitives::{address, hex, map::B256HashSet, Address, Bytes, FixedBytes, U256};
7-
use alloy_provider::Provider;
7+
use alloy_provider::{Provider, WsConnect};
88
use alloy_rpc_types::{
99
state::{AccountOverride, StateOverride},
1010
AccessList, AccessListItem, BlockId, BlockNumberOrTag, BlockTransactions, TransactionRequest,
@@ -715,6 +715,34 @@ async fn can_get_pending_transaction() {
715715
assert_eq!(mined.tx_hash(), pending.unwrap().unwrap().tx_hash());
716716
}
717717

718+
#[tokio::test(flavor = "multi_thread")]
719+
async fn can_listen_full_pending_transaction() {
720+
let (api, handle) = spawn(NodeConfig::test()).await;
721+
722+
// Disable auto-mining so transactions remain pending
723+
api.anvil_set_auto_mine(false).await.unwrap();
724+
725+
let provider = alloy_provider::ProviderBuilder::new()
726+
.on_ws(WsConnect::new(handle.ws_endpoint()))
727+
.await
728+
.unwrap();
729+
730+
// Subscribe to full pending transactions
731+
let sub = provider.subscribe_full_pending_transactions().await;
732+
tokio::time::sleep(Duration::from_millis(1000)).await;
733+
let mut stream = sub.expect("Failed to subscribe to pending tx").into_stream().take(5);
734+
735+
let from = handle.dev_wallets().next().unwrap().address();
736+
let tx = TransactionRequest::default().from(from).value(U256::from(1337)).to(Address::random());
737+
738+
let tx = provider.send_transaction(tx).await.unwrap();
739+
740+
// Wait for the subscription to yield a transaction
741+
let received = stream.next().await.expect("Failed to receive pending tx");
742+
743+
assert_eq!(received.tx_hash(), *tx.tx_hash());
744+
}
745+
718746
#[tokio::test(flavor = "multi_thread")]
719747
async fn can_get_raw_transaction() {
720748
let (api, handle) = spawn(NodeConfig::test()).await;

0 commit comments

Comments
 (0)