Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 55c9d14

Browse files
authored
Prepare for asynchronous transaction validation in tx pool (#3650)
* async txpool API * Update core/rpc/src/author/mod.rs Co-Authored-By: Tomasz Drwięga <[email protected]> * Update core/transaction-pool/graph/src/pool.rs Co-Authored-By: Tomasz Drwięga <[email protected]> * Pool -> Pool + ValidatedPool * removed lost block_on when importing xt from network * fix grumbles * alias for future::Executor in rpc * removed executor from Author RPCs * Pool + SharedValidatedPool -> Pool * fix compilation after merge * another fix * another fix
1 parent aae1a9e commit 55c9d14

File tree

29 files changed

+912
-497
lines changed

29 files changed

+912
-497
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/basic-authorship/src/basic_authorship.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,12 @@ mod tests {
247247
fn should_cease_building_block_when_deadline_is_reached() {
248248
// given
249249
let client = Arc::new(test_client::new());
250-
let chain_api = transaction_pool::ChainApi::new(client.clone());
250+
let chain_api = transaction_pool::FullChainApi::new(client.clone());
251251
let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));
252252

253-
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap();
253+
futures::executor::block_on(
254+
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false)
255+
).unwrap();
254256

255257
let mut proposer_factory = ProposerFactory {
256258
client: client.clone(),

core/basic-authorship/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
//! # use test_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring};
2727
//! # use transaction_pool::txpool::{self, Pool as TransactionPool};
2828
//! # let client = Arc::new(test_client::new());
29-
//! # let chain_api = transaction_pool::ChainApi::new(client.clone());
29+
//! # let chain_api = transaction_pool::FullChainApi::new(client.clone());
3030
//! # let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));
3131
//! // The first step is to create a `ProposerFactory`.
3232
//! let mut proposer_factory = ProposerFactory {

core/network/src/protocol.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
963963
who: PeerId,
964964
extrinsics: message::Transactions<B::Extrinsic>
965965
) {
966+
// sending extrinsic to light node is considered a bad behavior
967+
if !self.config.roles.is_full() {
968+
trace!(target: "sync", "Peer {} is trying to send extrinsic to the light node", who);
969+
self.behaviour.disconnect_peer(&who);
970+
self.peerset_handle.report_peer(who, i32::min_value());
971+
return;
972+
}
973+
966974
// Accept extrinsics only when fully synced
967975
if self.sync.status().state != SyncState::Idle {
968976
trace!(target: "sync", "{} Ignoring extrinsics while syncing", who);
@@ -971,12 +979,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
971979
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);
972980
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
973981
for t in extrinsics {
974-
if let Some(hash) = self.transaction_pool.import(&t) {
975-
self.peerset_handle.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE);
976-
peer.known_extrinsics.insert(hash);
977-
} else {
978-
trace!(target: "sync", "Extrinsic rejected");
979-
}
982+
let hash = self.transaction_pool.hash_of(&t);
983+
peer.known_extrinsics.insert(hash);
984+
985+
self.transaction_pool.import(
986+
self.peerset_handle.clone().into(),
987+
who.clone(),
988+
NEW_EXTRINSIC_REPUTATION_CHANGE,
989+
t,
990+
);
980991
}
981992
}
982993
}
@@ -995,6 +1006,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
9951006
let extrinsics = self.transaction_pool.transactions();
9961007
let mut propagated_to = HashMap::new();
9971008
for (who, peer) in self.context_data.peers.iter_mut() {
1009+
// never send extrinsics to the light node
1010+
if !peer.info.roles.is_full() {
1011+
continue;
1012+
}
1013+
9981014
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
9991015
.iter()
10001016
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))

core/network/src/service.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,18 @@ impl<T> ExHashT for T where
6565
pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
6666
/// Get transactions from the pool that are ready to be propagated.
6767
fn transactions(&self) -> Vec<(H, B::Extrinsic)>;
68+
/// Get hash of transaction.
69+
fn hash_of(&self, transaction: &B::Extrinsic) -> H;
6870
/// Import a transaction into the pool.
69-
fn import(&self, transaction: &B::Extrinsic) -> Option<H>;
71+
///
72+
/// Peer reputation is changed by reputation_change if transaction is accepted by the pool.
73+
fn import(
74+
&self,
75+
report_handle: ReportHandle,
76+
who: PeerId,
77+
reputation_change: i32,
78+
transaction: B::Extrinsic,
79+
);
7080
/// Notify the pool about transactions broadcast.
7181
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
7282
}
@@ -77,6 +87,12 @@ pub struct ReportHandle {
7787
inner: PeersetHandle, // wraps it so we don't have to worry about breaking API.
7888
}
7989

90+
impl From<PeersetHandle> for ReportHandle {
91+
fn from(peerset_handle: PeersetHandle) -> Self {
92+
ReportHandle { inner: peerset_handle }
93+
}
94+
}
95+
8096
impl ReportHandle {
8197
/// Report a given peer as either beneficial (+) or costly (-) according to the
8298
/// given scalar.

core/network/src/test/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use consensus::Error as ConsensusError;
4747
use consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport};
4848
use futures::prelude::*;
4949
use futures03::{StreamExt as _, TryStreamExt as _};
50-
use crate::{NetworkWorker, NetworkService, config::ProtocolId};
50+
use crate::{NetworkWorker, NetworkService, ReportHandle, config::ProtocolId};
5151
use crate::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder};
5252
use libp2p::PeerId;
5353
use parking_lot::Mutex;
@@ -400,10 +400,12 @@ impl TransactionPool<Hash, Block> for EmptyTransactionPool {
400400
Vec::new()
401401
}
402402

403-
fn import(&self, _transaction: &Extrinsic) -> Option<Hash> {
404-
None
403+
fn hash_of(&self, _transaction: &Extrinsic) -> Hash {
404+
Hash::default()
405405
}
406406

407+
fn import(&self, _report_handle: ReportHandle, _who: PeerId, _rep_change: i32, _transaction: Extrinsic) {}
408+
407409
fn on_broadcasted(&self, _: HashMap<Hash, Vec<String>>) {}
408410
}
409411

core/offchain/src/api.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -302,29 +302,28 @@ impl<A: ChainApi> AsyncApi<A> {
302302
match msg {
303303
ExtMessage::SubmitExtrinsic(ext) => self.submit_extrinsic(ext),
304304
}
305-
future::ready(())
306305
});
307306

308307
future::join(extrinsics, http)
309308
.map(|((), ())| ())
310309
}
311310

312-
fn submit_extrinsic(&mut self, ext: Vec<u8>) {
311+
fn submit_extrinsic(&mut self, ext: Vec<u8>) -> impl Future<Output = ()> {
313312
let xt = match <A::Block as traits::Block>::Extrinsic::decode(&mut &*ext) {
314313
Ok(xt) => xt,
315314
Err(e) => {
316315
warn!("Unable to decode extrinsic: {:?}: {}", ext, e.what());
317-
return
316+
return future::Either::Left(future::ready(()))
318317
},
319318
};
320319

321320
info!("Submitting to the pool: {:?} (isSigned: {:?})", xt, xt.is_signed());
322-
match self.transaction_pool.submit_one(&self.at, xt.clone()) {
323-
Ok(hash) => debug!("[{:?}] Offchain transaction added to the pool.", hash),
324-
Err(e) => {
325-
debug!("Couldn't submit transaction: {:?}", e);
326-
},
327-
}
321+
future::Either::Right(self.transaction_pool
322+
.submit_one(&self.at, xt.clone())
323+
.map(|result| match result {
324+
Ok(hash) => { debug!("[{:?}] Offchain transaction added to the pool.", hash); },
325+
Err(e) => { debug!("Couldn't submit transaction: {:?}", e); },
326+
}))
328327
}
329328
}
330329

@@ -354,7 +353,7 @@ mod tests {
354353
let db = LocalStorage::new_test();
355354
let client = Arc::new(test_client::new());
356355
let pool = Arc::new(
357-
Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone()))
356+
Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone()))
358357
);
359358

360359
let mock = Arc::new(MockNetworkStateInfo());

core/offchain/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ mod tests {
171171
// given
172172
let _ = env_logger::try_init();
173173
let client = Arc::new(test_client::new());
174-
let pool = Arc::new(Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone())));
174+
let pool = Arc::new(Pool::new(Default::default(), transaction_pool::FullChainApi::new(client.clone())));
175175
let db = client_db::offchain::LocalStorage::new_test();
176176
let network_state = Arc::new(MockNetworkStateInfo());
177177

core/rpc/api/src/author/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ use jsonrpc_core as rpc;
2222
/// Author RPC Result type.
2323
pub type Result<T> = std::result::Result<T, Error>;
2424

25+
/// Author RPC future Result type.
26+
pub type FutureResult<T> = Box<dyn rpc::futures::Future<Item = T, Error = Error> + Send>;
27+
2528
/// Author RPC errors.
2629
#[derive(Debug, derive_more::Display, derive_more::From)]
2730
pub enum Error {

core/rpc/api/src/author/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
2424
use primitives::{
2525
Bytes
2626
};
27-
use self::error::Result;
27+
use self::error::{FutureResult, Result};
2828
use txpool::watcher::Status;
2929

3030
pub use self::gen_client::Client as AuthorClient;
@@ -37,7 +37,7 @@ pub trait AuthorApi<Hash, BlockHash> {
3737

3838
/// Submit hex-encoded extrinsic for inclusion in block.
3939
#[rpc(name = "author_submitExtrinsic")]
40-
fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>;
40+
fn submit_extrinsic(&self, extrinsic: Bytes) -> FutureResult<Hash>;
4141

4242
/// Insert a key into the keystore.
4343
#[rpc(name = "author_insertKey")]

core/rpc/api/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ mod helpers;
2525
mod subscriptions;
2626

2727
pub use jsonrpc_core::IoHandlerExtension as RpcExtension;
28-
pub use subscriptions::Subscriptions;
28+
pub use subscriptions::{Subscriptions, TaskExecutor};
2929
pub use helpers::Receiver;
3030

3131
pub mod author;

core/rpc/api/src/subscriptions.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ use jsonrpc_core::futures::{Future, future};
2525

2626
type Id = u64;
2727

28+
/// Alias for a an implementation of `futures::future::Executor`.
29+
pub type TaskExecutor = Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;
30+
2831
/// Generate unique ids for subscriptions.
2932
#[derive(Clone, Debug)]
3033
pub struct IdProvider {
@@ -53,12 +56,12 @@ impl IdProvider {
5356
pub struct Subscriptions {
5457
next_id: IdProvider,
5558
active_subscriptions: Arc<Mutex<HashMap<Id, oneshot::Sender<()>>>>,
56-
executor: Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>,
59+
executor: TaskExecutor,
5760
}
5861

5962
impl Subscriptions {
6063
/// Creates new `Subscriptions` object.
61-
pub fn new(executor: Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>) -> Self {
64+
pub fn new(executor: TaskExecutor) -> Self {
6265
Subscriptions {
6366
next_id: Default::default(),
6467
active_subscriptions: Default::default(),

core/rpc/src/author/mod.rs

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@
2020
mod tests;
2121

2222
use std::{sync::Arc, convert::TryInto};
23+
use futures03::future::{FutureExt, TryFutureExt};
24+
use log::warn;
2325

2426
use client::{self, Client};
25-
use rpc::futures::{Sink, Future};
27+
use rpc::futures::{
28+
Sink, Future,
29+
stream::Stream as _,
30+
future::result,
31+
};
2632
use futures03::{StreamExt as _, compat::Compat};
2733
use api::Subscriptions;
2834
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
29-
use log::warn;
3035
use codec::{Encode, Decode};
3136
use primitives::{Bytes, Blake2Hasher, H256, traits::BareCryptoStorePtr};
3237
use sr_primitives::{generic, traits::{self, ProvideRuntimeApi}};
@@ -44,7 +49,7 @@ use session::SessionKeys;
4449

4550
/// Re-export the API for backward compatibility.
4651
pub use api::author::*;
47-
use self::error::{Error, Result};
52+
use self::error::{Error, FutureResult, Result};
4853

4954
/// Authoring API
5055
pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {
@@ -108,15 +113,19 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
108113
).map(Into::into).map_err(|e| Error::Client(Box::new(e)))
109114
}
110115

111-
fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> {
112-
let xt = Decode::decode(&mut &ext[..])?;
116+
fn submit_extrinsic(&self, ext: Bytes) -> FutureResult<ExHash<P>> {
117+
let xt = match Decode::decode(&mut &ext[..]) {
118+
Ok(xt) => xt,
119+
Err(err) => return Box::new(result(Err(err.into()))),
120+
};
113121
let best_block_hash = self.client.info().chain.best_hash;
114-
self.pool
122+
Box::new(self.pool
115123
.submit_one(&generic::BlockId::hash(best_block_hash), xt)
124+
.compat()
116125
.map_err(|e| e.into_pool_error()
117126
.map(Into::into)
118-
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
119-
)
127+
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into()))
128+
)
120129
}
121130

122131
fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
@@ -151,30 +160,44 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
151160
) {
152161
let submit = || -> Result<_> {
153162
let best_block_hash = self.client.info().chain.best_hash;
154-
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])?;
155-
self.pool
163+
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])
164+
.map_err(error::Error::from)?;
165+
Ok(self.pool
156166
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
167+
.boxed()
168+
.compat()
157169
.map_err(|e| e.into_pool_error()
158-
.map(Into::into)
170+
.map(error::Error::from)
159171
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
160-
)
172+
))
161173
};
162174

163-
let watcher = match submit() {
164-
Ok(watcher) => watcher,
175+
let future_watcher = match submit() {
176+
Ok(future_watcher) => future_watcher,
165177
Err(err) => {
166178
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
167179
let _ = subscriber.reject(err.into());
168180
return;
169181
},
170182
};
171183

184+
// make 'future' watcher be a future with output = stream of watcher events
185+
let future_watcher = future_watcher
186+
.map_err(|err| { warn!("Failed to submit extrinsic: {}", err); })
187+
.map(|watcher| Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))));
188+
189+
// convert a 'future' watcher into the stream with single element = stream of watcher events
190+
let watcher_stream = future_watcher.into_stream();
191+
192+
// and now flatten the 'watcher_stream' so that we'll have the stream with watcher events
193+
let watcher_stream = watcher_stream.flatten();
194+
172195
self.subscriptions.add(subscriber, move |sink| {
173196
sink
174197
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
175-
.send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
198+
.send_all(watcher_stream)
176199
.map(|_| ())
177-
})
200+
});
178201
}
179202

180203
fn unwatch_extrinsic(&self, _metadata: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {

0 commit comments

Comments
 (0)