diff --git a/docs/config/dev.toml b/docs/config/dev.toml index 1fb582a64..97d1a9a97 100644 --- a/docs/config/dev.toml +++ b/docs/config/dev.toml @@ -19,7 +19,8 @@ health_unsignable_promilles = 750 propagation_timeout = 1000 fetch_timeout = 5000 -validator_tick_timeout = 5000 +all_campaigns_timeout = 5000 +channel_tick_timeout = 5000 ip_rate_limit = { type = 'ip', timeframe = 20000 } sid_rate_limit = { type = 'sid', timeframe = 20000 } diff --git a/docs/config/prod.toml b/docs/config/prod.toml index b41656965..87c8b3f56 100644 --- a/docs/config/prod.toml +++ b/docs/config/prod.toml @@ -19,7 +19,8 @@ health_unsignable_promilles = 770 propagation_timeout = 3000 fetch_timeout = 10000 -validator_tick_timeout = 10000 +all_campaigns_timeout = 10000 +channel_tick_timeout = 10000 ip_rate_limit = { type = 'ip', timeframe = 1200000 } sid_rate_limit = { type = 'sid', timeframe = 0 } diff --git a/primitives/src/config.rs b/primitives/src/config.rs index df7bb8b21..184dccdea 100644 --- a/primitives/src/config.rs +++ b/primitives/src/config.rs @@ -32,18 +32,24 @@ pub struct Config { pub events_find_limit: u32, pub msgs_find_limit: u32, pub analytics_find_limit_v5: u32, - // in milliseconds + /// In milliseconds pub analytics_maxtime_v5: u32, - // in milliseconds + /// In milliseconds pub heartbeat_time: u32, pub health_threshold_promilles: u32, pub health_unsignable_promilles: u32, + /// Sets the timeout for propagating a Validator message to a validator + /// In Milliseconds + pub propagation_timeout: u32, /// in milliseconds - /// set's the Client timeout for [`SentryApi`] - /// This includes requests made for propagating new messages + /// Set's the Client timeout for [`SentryApi`] + /// This includes all requests made to sentry except propagating messages. + /// When propagating messages we make requests to foreign Sentry instances as well. pub fetch_timeout: u32, - /// in milliseconds - pub validator_tick_timeout: u32, + /// In Milliseconds + pub all_campaigns_timeout: u32, + /// In Milliseconds + pub channel_tick_timeout: u32, pub ip_rate_limit: RateLimit, // HashMap?? pub sid_rate_limit: RateLimit, // HashMap ?? #[serde(with = "SerHex::")] diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 5defdfbb8..35c46a994 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -172,7 +172,12 @@ async fn campaigns_router( ) -> Result, ResponseError> { let (path, method) = (req.uri().path(), req.method()); - if let (Some(_caps), &Method::POST) = (CAMPAIGN_UPDATE_BY_ID.captures(path), method) { + // For creating campaigns + if (path, method) == ("/v5/campaign", &Method::POST) { + let req = AuthRequired.call(req, app).await?; + + create_campaign(req, app).await + } else if let (Some(_caps), &Method::POST) = (CAMPAIGN_UPDATE_BY_ID.captures(path), method) { let req = CampaignLoad.call(req, app).await?; update_campaign::handle_route(req, app).await diff --git a/validator_worker/src/channel.rs b/validator_worker/src/channel.rs index 256c1a14c..f62cf2323 100644 --- a/validator_worker/src/channel.rs +++ b/validator_worker/src/channel.rs @@ -5,15 +5,20 @@ use crate::{ SentryApi, }; use primitives::{adapter::Adapter, config::Config, util::ApiUrl, Channel, ChannelId}; -use slog::Logger; -use std::collections::{hash_map::Entry, HashSet}; +use slog::{info, Logger}; +use std::{ + collections::{hash_map::Entry, HashSet}, + time::Duration, +}; +use tokio::time::timeout; pub async fn channel_tick( sentry: &SentryApi, config: &Config, channel: Channel, - // validators: &Validators, -) -> Result { +) -> Result<(ChannelId, Box), Error> { + let logger = sentry.logger.clone(); + let adapter = &sentry.adapter; let tick = channel .find_validator(adapter.whoami()) @@ -46,24 +51,47 @@ pub async fn channel_tick( .get(&channel.token) .ok_or(Error::ChannelTokenNotWhitelisted)?; - // TODO: Add timeout + let duration = Duration::from_millis(config.channel_tick_timeout as u64); + match tick { - primitives::Validator::Leader(_v) => { - let _leader_tick_status = leader::tick(sentry, channel, accounting.balances, token) - .await - .map_err(|err| Error::LeaderTick(channel.id(), TickError::Tick(Box::new(err))))?; - } + primitives::Validator::Leader(_v) => match timeout( + duration, + leader::tick(sentry, channel, accounting.balances, token), + ) + .await + { + Err(timeout_e) => Err(Error::LeaderTick( + channel.id(), + TickError::TimedOut(timeout_e), + )), + Ok(Err(tick_e)) => Err(Error::LeaderTick( + channel.id(), + TickError::Tick(Box::new(tick_e)), + )), + Ok(Ok(tick_status)) => { + info!(&logger, "Leader tick"; "status" => ?tick_status); + Ok((channel.id(), Box::new(tick_status))) + } + }, primitives::Validator::Follower(_v) => { - let _follower_tick_status = - follower::tick(sentry, channel, all_spenders, accounting.balances, token) - .await - .map_err(|err| { - Error::FollowerTick(channel.id(), TickError::Tick(Box::new(err))) - })?; + let follower_fut = + follower::tick(sentry, channel, all_spenders, accounting.balances, token); + match timeout(duration, follower_fut).await { + Err(timeout_e) => Err(Error::FollowerTick( + channel.id(), + TickError::TimedOut(timeout_e), + )), + Ok(Err(tick_e)) => Err(Error::FollowerTick( + channel.id(), + TickError::Tick(Box::new(tick_e)), + )), + Ok(Ok(tick_status)) => { + info!(&logger, "Follower tick"; "status" => ?tick_status); + Ok((channel.id(), Box::new(tick_status))) + } + } } - }; - - Ok(channel.id()) + } } /// Fetches all `Campaign`s from Sentry and builds the `Channel`s to be processed @@ -71,13 +99,15 @@ pub async fn channel_tick( pub async fn collect_channels( adapter: &A, sentry_url: &ApiUrl, - _config: &Config, + config: &Config, _logger: &Logger, ) -> Result<(HashSet, Validators), reqwest::Error> { let whoami = adapter.whoami(); - // TODO: Move client creation - let client = reqwest::Client::new(); + let all_campaigns_timeout = Duration::from_millis(config.all_campaigns_timeout as u64); + let client = reqwest::Client::builder() + .timeout(all_campaigns_timeout) + .build()?; let campaigns = all_campaigns(client, sentry_url, whoami).await?; let channels = campaigns .iter() diff --git a/validator_worker/src/core/follower_rules.rs b/validator_worker/src/core/follower_rules.rs index a5a339e19..6473b4a32 100644 --- a/validator_worker/src/core/follower_rules.rs +++ b/validator_worker/src/core/follower_rules.rs @@ -166,7 +166,7 @@ mod test { #[test] fn get_health_the_approved_balance_tree_gte_our_accounting_is_healthy() { let all_spenders_sum = UnifiedNum::from(50); - let our = vec![(ADDRESSES["publisher"].clone(), 50.into())] + let our = vec![(ADDRESSES["publisher"], 50.into())] .into_iter() .collect(); assert!( @@ -178,7 +178,7 @@ mod test { get_health( all_spenders_sum, &our, - &vec![(ADDRESSES["publisher"].clone(), 60.into())] + &vec![(ADDRESSES["publisher"], 60.into())] .into_iter() .collect() ) @@ -189,7 +189,7 @@ mod test { #[test] fn get_health_the_approved_balance_tree_is_positive_our_accounting_is_0_and_it_is_healthy() { - let approved = vec![(ADDRESSES["publisher"].clone(), 50.into())] + let approved = vec![(ADDRESSES["publisher"], 50.into())] .into_iter() .collect(); @@ -207,10 +207,10 @@ mod test { assert!( get_health( all_spenders_sum, - &vec![(ADDRESSES["publisher"].clone(), 80.into())] + &vec![(ADDRESSES["publisher"], 80.into())] .into_iter() .collect(), - &vec![(ADDRESSES["publisher"].clone(), 79.into())] + &vec![(ADDRESSES["publisher"], 79.into())] .into_iter() .collect() ) @@ -221,10 +221,10 @@ mod test { assert!( get_health( all_spenders_sum, - &vec![(ADDRESSES["publisher"].clone(), 2.into())] + &vec![(ADDRESSES["publisher"], 2.into())] .into_iter() .collect(), - &vec![(ADDRESSES["publisher"].clone(), 1.into())] + &vec![(ADDRESSES["publisher"], 1.into())] .into_iter() .collect() ) @@ -238,10 +238,10 @@ mod test { assert!( get_health( UnifiedNum::from(80), - &vec![(ADDRESSES["publisher"].clone(), 80.into())] + &vec![(ADDRESSES["publisher"], 80.into())] .into_iter() .collect(), - &vec![(ADDRESSES["publisher"].clone(), 70.into())] + &vec![(ADDRESSES["publisher"], 70.into())] .into_iter() .collect() ) @@ -257,10 +257,10 @@ mod test { assert!( get_health( all_spenders_sum, - &vec![(ADDRESSES["publisher"].clone(), 80.into())] + &vec![(ADDRESSES["publisher"], 80.into())] .into_iter() .collect(), - &vec![(ADDRESSES["publisher2"].clone(), 80.into())] + &vec![(ADDRESSES["publisher2"], 80.into())] .into_iter() .collect() ) @@ -271,12 +271,12 @@ mod test { assert!( get_health( all_spenders_sum, - &vec![(ADDRESSES["publisher"].clone(), 80.into())] + &vec![(ADDRESSES["publisher"], 80.into())] .into_iter() .collect(), &vec![ - (ADDRESSES["publisher2"].clone(), 40.into()), - (ADDRESSES["publisher"].clone(), 40.into()) + (ADDRESSES["publisher2"], 40.into()), + (ADDRESSES["publisher"], 40.into()) ] .into_iter() .collect() @@ -288,12 +288,12 @@ mod test { assert!( get_health( all_spenders_sum, - &vec![(ADDRESSES["publisher"].clone(), 80.into())] + &vec![(ADDRESSES["publisher"], 80.into())] .into_iter() .collect(), &vec![ - (ADDRESSES["publisher2"].clone(), 20.into()), - (ADDRESSES["publisher"].clone(), 60.into()) + (ADDRESSES["publisher2"], 20.into()), + (ADDRESSES["publisher"], 60.into()) ] .into_iter() .collect() @@ -305,12 +305,12 @@ mod test { assert!( get_health( all_spenders_sum, - &vec![(ADDRESSES["publisher"].clone(), 80.into())] + &vec![(ADDRESSES["publisher"], 80.into())] .into_iter() .collect(), &vec![ - (ADDRESSES["publisher2"].clone(), 2.into()), - (ADDRESSES["publisher"].clone(), 78.into()) + (ADDRESSES["publisher2"], 2.into()), + (ADDRESSES["publisher"], 78.into()) ] .into_iter() .collect() @@ -323,12 +323,12 @@ mod test { get_health( all_spenders_sum, &vec![ - (ADDRESSES["publisher"].clone(), 100.into()), - (ADDRESSES["publisher2"].clone(), 1.into()) + (ADDRESSES["publisher"], 100.into()), + (ADDRESSES["publisher2"], 1.into()) ] .into_iter() .collect(), - &vec![(ADDRESSES["publisher"].clone(), 100.into())] + &vec![(ADDRESSES["publisher"], 100.into())] .into_iter() .collect() ) diff --git a/validator_worker/src/follower.rs b/validator_worker/src/follower.rs index 872195a0a..0e8e3bcf9 100644 --- a/validator_worker/src/follower.rs +++ b/validator_worker/src/follower.rs @@ -10,13 +10,11 @@ use primitives::{ Address, Channel, ChannelId, UnifiedNum, }; -use crate::core::follower_rules::{get_health, is_valid_transition}; -use crate::StateRootHashError; -// use crate::core::follower_rules::{get_health, is_valid_transition}; use crate::{ - get_state_root_hash, + core::follower_rules::{get_health, is_valid_transition}, heartbeat::{heartbeat, HeartbeatStatus}, sentry_interface::{Error as SentryApiError, PropagationResult, SentryApi}, + GetStateRoot, GetStateRootError, }; use chrono::Utc; use thiserror::Error; @@ -28,7 +26,7 @@ pub enum Error { #[error("The Channel's Token is not whitelisted")] TokenNotWhitelisted, #[error("Couldn't get state root hash of the proposed balances")] - StateRootHash(#[from] StateRootHashError), + StateRootHash(#[from] GetStateRootError), #[error("Adapter error: {0}")] Adapter(#[from] AdapterError), #[error("Sentry API: {0}")] @@ -70,7 +68,7 @@ impl fmt::Display for InvalidNewState { #[derive(Debug)] pub enum ApproveStateResult { - /// When `None` the conditions for approving the `NewState` (`ApproveState`) haven't been met + /// When `None` the conditions for approving the `NewState` (and generating `ApproveState`) haven't been met Sent(Option>), RejectedState { reason: InvalidNewState, @@ -146,7 +144,7 @@ pub async fn tick( } async fn on_new_state<'a, A: Adapter + 'static>( - iface: &'a SentryApi, + sentry: &'a SentryApi, channel: Channel, accounting_balances: Balances, new_state: NewState, @@ -157,7 +155,7 @@ async fn on_new_state<'a, A: Adapter + 'static>( Ok(balances) => balances, // TODO: Should we show the Payout Mismatch between Spent & Earned? Err(balances::Error::PayoutMismatch { .. }) => { - return Ok(on_error(iface, channel.id(), new_state, InvalidNewState::Transition).await) + return Ok(on_error(sentry, channel.id(), new_state, InvalidNewState::Transition).await) } // TODO: Add context for `proposed_balances.check()` overflow error Err(_) => return Err(Error::Overflow), @@ -165,24 +163,18 @@ async fn on_new_state<'a, A: Adapter + 'static>( let proposed_state_root = new_state.state_root.clone(); - if proposed_state_root - != hex::encode(get_state_root_hash( - channel.id(), - &proposed_balances, - token_info.precision.get(), - )?) - { - return Ok(on_error(iface, channel.id(), new_state, InvalidNewState::RootHash).await); + if proposed_state_root != proposed_balances.encode(channel.id(), token_info.precision.get())? { + return Ok(on_error(sentry, channel.id(), new_state, InvalidNewState::RootHash).await); } - if !iface + if !sentry .adapter .verify(channel.leader, &proposed_state_root, &new_state.signature)? { - return Ok(on_error(iface, channel.id(), new_state, InvalidNewState::Signature).await); + return Ok(on_error(sentry, channel.id(), new_state, InvalidNewState::Signature).await); } - let last_approve_response = iface.get_last_approved(channel.id()).await?; + let last_approve_response = sentry.get_last_approved(channel.id()).await?; let prev_balances = match last_approve_response .last_approved .and_then(|last_approved| last_approved.new_state) @@ -193,7 +185,7 @@ async fn on_new_state<'a, A: Adapter + 'static>( Ok(None) => Default::default(), // TODO: Add Context for Transition error Err(_err) => { - return Ok(on_error(iface, channel.id(), new_state, InvalidNewState::Transition).await) + return Ok(on_error(sentry, channel.id(), new_state, InvalidNewState::Transition).await) } }; @@ -210,7 +202,7 @@ async fn on_new_state<'a, A: Adapter + 'static>( .ok_or(Error::Overflow)? { // TODO: Add context for error in Spenders transition - return Ok(on_error(iface, channel.id(), new_state, InvalidNewState::Transition).await); + return Ok(on_error(sentry, channel.id(), new_state, InvalidNewState::Transition).await); } // 2. Check the transition of previous and proposed Earners maps @@ -226,7 +218,7 @@ async fn on_new_state<'a, A: Adapter + 'static>( .ok_or(Error::Overflow)? { // TODO: Add context for error in Earners transition - return Ok(on_error(iface, channel.id(), new_state, InvalidNewState::Transition).await); + return Ok(on_error(sentry, channel.id(), new_state, InvalidNewState::Transition).await); } let health_earners = get_health( @@ -235,9 +227,9 @@ async fn on_new_state<'a, A: Adapter + 'static>( &proposed_balances.earners, ) .ok_or(Error::Overflow)?; - if health_earners < u64::from(iface.config.health_unsignable_promilles) { + if health_earners < u64::from(sentry.config.health_unsignable_promilles) { return Ok(on_error( - iface, + sentry, channel.id(), new_state, InvalidNewState::Health(Health::Earners(health_earners)), @@ -251,9 +243,9 @@ async fn on_new_state<'a, A: Adapter + 'static>( &proposed_balances.spenders, ) .ok_or(Error::Overflow)?; - if health_spenders < u64::from(iface.config.health_unsignable_promilles) { + if health_spenders < u64::from(sentry.config.health_unsignable_promilles) { return Ok(on_error( - iface, + sentry, channel.id(), new_state, InvalidNewState::Health(Health::Spenders(health_spenders)), @@ -261,11 +253,11 @@ async fn on_new_state<'a, A: Adapter + 'static>( .await); } - let signature = iface.adapter.sign(&new_state.state_root)?; - let health_threshold = u64::from(iface.config.health_threshold_promilles); + let signature = sentry.adapter.sign(&new_state.state_root)?; + let health_threshold = u64::from(sentry.config.health_threshold_promilles); let is_healthy = health_earners >= health_threshold && health_spenders >= health_threshold; - let propagation_result = iface + let propagation_result = sentry .propagate( channel.id(), &[&MessageTypes::ApproveState(ApproveState { @@ -280,12 +272,12 @@ async fn on_new_state<'a, A: Adapter + 'static>( } async fn on_error<'a, A: Adapter + 'static>( - iface: &'a SentryApi, + sentry: &'a SentryApi, channel: ChannelId, new_state: NewState, status: InvalidNewState, ) -> ApproveStateResult { - let propagation = iface + let propagation = sentry .propagate( channel, &[&MessageTypes::RejectState(RejectState { diff --git a/validator_worker/src/leader.rs b/validator_worker/src/leader.rs index a65b664a4..f2f1cfcd3 100644 --- a/validator_worker/src/leader.rs +++ b/validator_worker/src/leader.rs @@ -10,10 +10,9 @@ use primitives::{ }; use crate::{ - get_state_root_hash, heartbeat::{heartbeat, Error as HeartbeatError, HeartbeatStatus}, sentry_interface::{Error as SentryApiError, PropagationResult, SentryApi}, - StateRootHashError, + GetStateRoot, GetStateRootError, }; #[derive(Debug)] @@ -28,7 +27,7 @@ pub enum Error { #[error("SentryApi: {0}")] SentryApi(#[from] SentryApiError), #[error("StateRootHash: {0}")] - StateRootHash(#[from] StateRootHashError), + StateRootHash(#[from] GetStateRootError), #[error("Adapter: {0}")] Adapter(#[from] AdapterError), #[error("Heartbeat: {0}")] @@ -107,8 +106,7 @@ async fn on_new_accounting( accounting_balances: Balances, token: &TokenInfo, ) -> Result, Error> { - let state_root_raw = get_state_root_hash(channel, &accounting_balances, token.precision.get())?; - let state_root = hex::encode(state_root_raw); + let state_root = accounting_balances.encode(channel, token.precision.get())?; let signature = sentry.adapter.sign(&state_root)?; diff --git a/validator_worker/src/lib.rs b/validator_worker/src/lib.rs index 24baaca18..e5cb4682f 100644 --- a/validator_worker/src/lib.rs +++ b/validator_worker/src/lib.rs @@ -1,5 +1,6 @@ #![deny(rust_2018_idioms)] #![deny(clippy::all)] +#![allow(deprecated)] use adapter::{get_balance_leaf, get_signable_state_root, BalanceLeafError}; use primitives::{ @@ -23,18 +24,34 @@ pub mod core { } #[derive(Debug, Error)] -pub enum StateRootHashError { +pub enum GetStateRootError { #[error("Failed to get balance leaf")] BalanceLeaf(#[from] BalanceLeafError), #[error(transparent)] MerkleTree(#[from] MerkleTreeError), } -pub(crate) fn get_state_root_hash( +pub trait GetStateRoot { + /// Hashes the struct to produce a StateRoot `[u8; 32]` + fn hash(&self, channel: ChannelId, token_precision: u8) -> Result<[u8; 32], GetStateRootError>; + + /// Calls `hash()` and then `hex::encode`s the result ready to be used in a Validator Message + fn encode(&self, channel: ChannelId, token_precision: u8) -> Result { + self.hash(channel, token_precision).map(hex::encode) + } +} + +impl GetStateRoot for Balances { + fn hash(&self, channel: ChannelId, token_precision: u8) -> Result<[u8; 32], GetStateRootError> { + get_state_root_hash(channel, self, token_precision) + } +} + +fn get_state_root_hash( channel: ChannelId, balances: &Balances, token_precision: u8, -) -> Result<[u8; 32], StateRootHashError> { +) -> Result<[u8; 32], GetStateRootError> { let spenders = balances.spenders.iter().map(|(address, amount)| { get_balance_leaf(true, address, &amount.to_precision(token_precision)) }); diff --git a/validator_worker/src/main.rs b/validator_worker/src/main.rs index 00a748e87..ca29f5596 100644 --- a/validator_worker/src/main.rs +++ b/validator_worker/src/main.rs @@ -4,7 +4,10 @@ use std::{convert::TryFrom, error::Error, time::Duration}; use clap::{crate_version, App, Arg}; -use futures::future::{join, join_all}; +use futures::{ + future::{join, join_all}, + TryFutureExt, +}; use tokio::{runtime::Runtime, time::sleep}; use adapter::{AdapterTypes, DummyAdapter, EthereumAdapter}; @@ -164,7 +167,6 @@ async fn infinite(args: Args, logger: &Logger) { let arg = args.clone(); let wait_time_future = sleep(Duration::from_millis(arg.config.wait_time as u64)); - // TODO: channels tick! let _result = join(all_channels_tick(arg, logger), wait_time_future).await; } } @@ -195,78 +197,27 @@ async fn all_channels_tick(args: Args, logger: &Logger) ) { Ok(sentry) => sentry, Err(err) => { - error!(logger, "Failed to initialize SentryApi for all channels"; "SentryApi::init()" => ?err); + error!(logger, "Failed to initialize SentryApi for all channels"; "SentryApi::init()" => ?err, "main" => "all_channels_tick"); return; } }; - let tick_results = join_all( - channels - .into_iter() - .map(|channel| channel_tick(&sentry, &args.config, channel)), - ) + let tick_results = join_all(channels.into_iter().map(|channel| { + channel_tick(&sentry, &args.config, channel).map_err(move |err| (channel, err)) + })) .await; - for channel_err in tick_results.into_iter().filter_map(Result::err) { - error!(logger, "Error processing channel"; "channel_error" => ?channel_err, "main" => "iterate_channels"); + for (channel, channel_err) in tick_results.into_iter().filter_map(Result::err) { + error!(logger, "Error processing Channel"; "channel" => ?channel, "error" => ?channel_err, "main" => "all_channels_tick"); } info!(logger, "Processed {} channels", channels_size); if channels_size >= args.config.max_channels as usize { - error!(logger, "WARNING: channel limit cfg.MAX_CHANNELS={} reached", &args.config.max_channels; "main" => "iterate_channels"); + error!(logger, "WARNING: channel limit cfg.MAX_CHANNELS={} reached", &args.config.max_channels; "main" => "all_channels_tick"); } } -// async fn validator_tick( -// adapter: A, -// channel: Channel, -// config: &Config, -// logger: &Logger, -// ) -> Result<(ChannelId, Box), ValidatorWorkerError> { -// let whoami = adapter.whoami(); - -// // Cloning the `Logger` is cheap, see documentation for more info -// let sentry = SentryApi::init(adapter, channel.clone(), config, logger.clone()) -// .map_err(ValidatorWorkerError::SentryApi)?; -// let duration = Duration::from_millis(config.validator_tick_timeout as u64); - -// match channel.spec.validators.find(&whoami) { -// Some(SpecValidator::Leader(_)) => match timeout(duration, leader::tick(&sentry)).await { -// Err(timeout_e) => Err(ValidatorWorkerError::LeaderTick( -// channel.id, -// TickError::TimedOut(timeout_e), -// )), -// Ok(Err(tick_e)) => Err(ValidatorWorkerError::LeaderTick( -// channel.id, -// TickError::Tick(tick_e), -// )), -// Ok(Ok(tick_status)) => { -// info!(&logger, "Leader tick"; "status" => ?tick_status); -// Ok((channel.id, Box::new(tick_status))) -// } -// }, -// Some(SpecValidator::Follower(_)) => { -// match timeout(duration, follower::tick(&sentry)).await { -// Err(timeout_e) => Err(ValidatorWorkerError::FollowerTick( -// channel.id, -// TickError::TimedOut(timeout_e), -// )), -// Ok(Err(tick_e)) => Err(ValidatorWorkerError::FollowerTick( -// channel.id, -// TickError::Tick(tick_e), -// )), -// Ok(Ok(tick_status)) => { -// info!(&logger, "Follower tick"; "status" => ?tick_status); -// Ok((channel.id, Box::new(tick_status))) -// } -// } -// } -// // @TODO: Can we make this so that we don't have this check at all? maybe something with the SentryApi struct? -// None => unreachable!("SentryApi makes a check if validator is in Channel spec on `init()`"), -// } -// } - fn logger() -> Logger { use primitives::util::logging::{Async, PrefixedCompactFormat, TermDecorator}; use slog::{o, Drain}; diff --git a/validator_worker/src/sentry_interface.rs b/validator_worker/src/sentry_interface.rs index 649244bd2..f1e064441 100644 --- a/validator_worker/src/sentry_interface.rs +++ b/validator_worker/src/sentry_interface.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, time::Duration}; use chrono::{DateTime, Utc}; use futures::future::{join_all, TryFutureExt}; -use reqwest::{Client, Response}; +use reqwest::{Client, Method}; use slog::Logger; use primitives::{ @@ -15,7 +15,7 @@ use primitives::{ spender::Spender, util::ApiUrl, validator::MessageTypes, - Address, Campaign, Channel, {ChannelId, Config, ValidatorId}, + Address, ChannelId, Config, ValidatorId, }; use thiserror::Error; @@ -91,7 +91,13 @@ impl SentryApi { messages: &[&MessageTypes], ) -> Vec { join_all(self.propagate_to.iter().map(|(validator_id, validator)| { - propagate_to::(&self.client, channel, (*validator_id, validator), messages) + propagate_to::( + &self.client, + self.config.propagation_timeout, + channel, + (*validator_id, validator), + messages, + ) })) .await } @@ -133,6 +139,7 @@ impl SentryApi { .await } + /// Get's the last approved state and requesting a [`Heartbeat`], see [`LastApprovedResponse`] pub async fn get_last_approved( &self, channel: ChannelId, @@ -141,7 +148,10 @@ impl SentryApi { .get( self.whoami .url - .join(&format!("v5/channel/{}/last-approved", channel)) + .join(&format!( + "v5/channel/{}/last-approved?withHeartbeat=true", + channel + )) .expect("Should not error while creating endpoint"), ) .send() @@ -151,20 +161,6 @@ impl SentryApi { .map_err(Error::Request) } - pub async fn get_last_msgs(&self) -> Result, Error> { - self.client - .get( - self.whoami - .url - .join("last-approved?withHeartbeat=true") - .expect("Should not error while creating endpoint"), - ) - .send() - .and_then(|res: Response| res.json::>()) - .map_err(Error::Request) - .await - } - // TODO: Pagination & use of `AllSpendersResponse` pub async fn get_all_spenders( &self, @@ -209,21 +205,6 @@ impl SentryApi { .await } - /// Fetches all `Campaign`s from `sentry` by going through all pages and collecting the `Campaign`s into a single `Vec` - pub async fn all_campaigns(&self) -> Result, Error> { - Ok( - campaigns::all_campaigns(self.client.clone(), &self.whoami.url, self.adapter.whoami()) - .await?, - ) - } - - pub async fn all_channels(&self) -> Result, Error> { - Ok( - channels::all_channels(self.client.clone(), &self.whoami.url, self.adapter.whoami()) - .await?, - ) - } - #[deprecated = "V5 no longer needs event aggregates"] pub async fn get_event_aggregates( &self, @@ -251,6 +232,7 @@ impl SentryApi { async fn propagate_to( client: &Client, + timeout: u32, channel_id: ChannelId, (validator_id, validator): (ValidatorId, &Validator), messages: &[&MessageTypes], @@ -264,7 +246,8 @@ async fn propagate_to( body.insert("messages", messages); let _response: SuccessResponse = client - .post(endpoint) + .request(Method::POST, endpoint) + .timeout(Duration::from_millis(timeout.into())) .bearer_auth(&validator.token) .json(&body) .send() @@ -277,7 +260,7 @@ async fn propagate_to( Ok(validator_id) } -mod channels { +pub mod channels { use futures::{future::try_join_all, TryFutureExt}; use primitives::{ sentry::channel_list::{ChannelListQuery, ChannelListResponse}, @@ -386,7 +369,7 @@ pub mod campaigns { let endpoint = sentry_url .join(&format!( - "campaign/list?{}", + "v5/campaign/list?{}", serde_urlencoded::to_string(query).expect("Should not fail to serialize") )) .expect("Should not fail to create endpoint URL");