diff --git a/primitives/src/sentry/accounting.rs b/primitives/src/balances.rs similarity index 59% rename from primitives/src/sentry/accounting.rs rename to primitives/src/balances.rs index 53a4efb44..cab002c8a 100644 --- a/primitives/src/sentry/accounting.rs +++ b/primitives/src/balances.rs @@ -1,26 +1,14 @@ use std::{convert::TryFrom, marker::PhantomData}; -use crate::{balances_map::UnifiedMap, channel_v5::Channel, Address, UnifiedNum}; -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Deserializer, Serialize}; +use crate::{Address, UnifiedMap, UnifiedNum}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use thiserror::Error; -#[derive(Serialize, Debug, Clone, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -pub struct Accounting { - pub channel: Channel, - #[serde(flatten)] - pub balances: Balances, - pub updated: Option>, - pub created: DateTime, -} - #[derive(Serialize, Debug, Clone, PartialEq, Eq, Default)] #[serde(rename_all = "camelCase")] -pub struct Balances { +pub struct Balances { pub earners: UnifiedMap, pub spenders: UnifiedMap, - #[serde(skip_serializing, skip_deserializing)] state: PhantomData, } @@ -82,6 +70,14 @@ impl Balances { .entry(earner) .or_insert_with(UnifiedNum::default); } + + pub fn into_unchecked(self) -> Balances { + Balances { + earners: self.earners, + spenders: self.spenders, + state: PhantomData::default(), + } + } } #[derive(Debug, Error)] @@ -103,15 +99,25 @@ pub enum Error { }, } -pub trait BalancesState {} +pub trait BalancesState: std::fmt::Debug + Eq + Clone + Serialize + DeserializeOwned { + fn from_unchecked(balances: Balances) -> Result, Error>; +} -#[derive(Debug, Clone, PartialEq, Eq, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)] pub struct CheckedState; -impl BalancesState for CheckedState {} +impl BalancesState for CheckedState { + fn from_unchecked(balances: Balances) -> Result, Error> { + balances.check() + } +} -#[derive(Debug, Clone, PartialEq, Eq, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)] pub struct UncheckedState; -impl BalancesState for UncheckedState {} +impl BalancesState for UncheckedState { + fn from_unchecked(balances: Balances) -> Result, Error> { + Ok(balances) + } +} impl TryFrom> for Balances { type Error = Error; @@ -123,52 +129,9 @@ impl TryFrom> for Balances { /// This modules implements the needed non-generic structs that help with Deserialization of the `Balances` mod de { - use super::*; - - #[derive(Deserialize)] - struct DeserializeAccounting { - pub channel: Channel, - #[serde(flatten)] - pub balances: DeserializeBalances, - pub created: DateTime, - pub updated: Option>, - } + use serde::Deserializer; - impl<'de> Deserialize<'de> for Accounting { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let de_acc = DeserializeAccounting::deserialize(deserializer)?; - - Ok(Self { - channel: de_acc.channel, - balances: Balances::::try_from(de_acc.balances) - .map_err(serde::de::Error::custom)?, - created: de_acc.created, - updated: de_acc.updated, - }) - } - } - - impl<'de> Deserialize<'de> for Accounting { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let unchecked_acc = Accounting::::deserialize(deserializer)?; - - Ok(Self { - channel: unchecked_acc.channel, - balances: unchecked_acc - .balances - .check() - .map_err(serde::de::Error::custom)?, - created: unchecked_acc.created, - updated: unchecked_acc.updated, - }) - } - } + use super::*; #[derive(Deserialize, Debug, Clone, PartialEq, Eq)] struct DeserializeBalances { @@ -176,29 +139,20 @@ mod de { pub spenders: UnifiedMap, } - impl<'de> Deserialize<'de> for Balances { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let unchecked_balances = Balances::::deserialize(deserializer)?; - - unchecked_balances.check().map_err(serde::de::Error::custom) - } - } - - impl<'de> Deserialize<'de> for Balances { + impl<'de, S: BalancesState> Deserialize<'de> for Balances { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, { let deser_balances = DeserializeBalances::deserialize(deserializer)?; - Ok(Balances { + let unchecked_balances = Balances { earners: deser_balances.earners, spenders: deser_balances.spenders, state: PhantomData::::default(), - }) + }; + + S::from_unchecked(unchecked_balances).map_err(serde::de::Error::custom) } } diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 6e1412f46..80de5db6d 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -6,6 +6,7 @@ pub use self::{ ad_slot::AdSlot, ad_unit::AdUnit, address::Address, + balances::Balances, balances_map::{BalancesMap, UnifiedMap}, big_num::BigNum, campaign::{Campaign, CampaignId}, @@ -22,6 +23,7 @@ mod ad_unit; pub mod adapter; pub mod address; pub mod analytics; +pub mod balances; pub mod balances_map; pub mod big_num; pub mod campaign; diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index cda0813b9..882ec2bc4 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -1,18 +1,29 @@ use crate::{ + balances::BalancesState, + channel_v5::Channel as ChannelV5, validator::{ApproveState, Heartbeat, MessageTypes, NewState, Type as MessageType}, - Address, BigNum, Channel, ChannelId, ValidatorId, IPFS, + Address, Balances, BigNum, Channel, ChannelId, ValidatorId, IPFS, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, fmt, hash::Hash}; -pub mod accounting; +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct Accounting { + pub channel: ChannelV5, + #[serde(flatten, bound = "S: BalancesState")] + pub balances: Balances, + pub updated: Option>, + pub created: DateTime, +} #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] -pub struct LastApproved { +pub struct LastApproved { /// NewState can be None if the channel is brand new - pub new_state: Option>, + #[serde(bound = "S: BalancesState")] + pub new_state: Option>>, /// ApproveState can be None if the channel is brand new pub approve_state: Option>, } @@ -53,7 +64,7 @@ pub mod message { } impl TryFrom for Message { - type Error = MessageTypeError; + type Error = MessageError; fn try_from(value: MessageTypes) -> Result { >::try_from(value).map(Self) @@ -81,8 +92,7 @@ pub mod message { "type":"ApproveState", "stateRoot":"4739522efc1e81499541621759dadb331eaf08829d6a3851b4b654dfaddc9935", "signature":"0x00128a39b715e87475666c3220fc0400bf34a84d24f77571d2b4e1e88b141d52305438156e526ff4fe96b7a13e707ab2f6f3ca00bd928dabc7f516b56cfe6fd61c", - "isHealthy":true, - "exhausted":false + "isHealthy":true }, "received":"2021-01-05T14:00:48.549Z" }); @@ -96,7 +106,6 @@ pub mod message { state_root: "4739522efc1e81499541621759dadb331eaf08829d6a3851b4b654dfaddc9935".to_string(), signature: "0x00128a39b715e87475666c3220fc0400bf34a84d24f77571d2b4e1e88b141d52305438156e526ff4fe96b7a13e707ab2f6f3ca00bd928dabc7f516b56cfe6fd61c".to_string(), is_healthy: true, - exhausted: false, }), }; @@ -184,8 +193,9 @@ pub struct ChannelListResponse { #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] -pub struct LastApprovedResponse { - pub last_approved: Option, +pub struct LastApprovedResponse { + #[serde(bound = "S: BalancesState")] + pub last_approved: Option>, /// None -> withHeartbeat=true wasn't passed /// Some(vec![]) (empty vec) or Some(heartbeats) - withHeartbeat=true was passed #[serde(default, skip_serializing_if = "Option::is_none")] diff --git a/primitives/src/validator.rs b/primitives/src/validator.rs index e0743a036..1b8f29592 100644 --- a/primitives/src/validator.rs +++ b/primitives/src/validator.rs @@ -136,76 +136,49 @@ pub mod messages { use std::{any::type_name, convert::TryFrom, fmt, marker::PhantomData}; use thiserror::Error; - use crate::sentry::accounting::{Balances, CheckedState}; + use crate::balances::{Balances, BalancesState, CheckedState, UncheckedState}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; #[derive(Error, Debug)] - pub struct MessageTypeError { - expected: PhantomData, - actual: String, + pub enum MessageError { + #[error(transparent)] + Balances(#[from] crate::balances::Error), + #[error("Expected {} message type but the actual is {actual}", type_name::(), )] + Type { + expected: PhantomData, + actual: String, + }, } - impl MessageTypeError { + impl MessageError { pub fn for_actual(_actual: &A) -> Self { - Self { + Self::Type { expected: PhantomData::default(), actual: type_name::().to_string(), } } } - impl fmt::Display for MessageTypeError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Expected {} message type but the actual is {}", - type_name::(), - self.actual - ) - } - } - pub trait Type: fmt::Debug + Into - + TryFrom> + + TryFrom> + Clone + PartialEq + Eq { } - impl Type for Accounting {} - impl TryFrom for Accounting { - type Error = MessageTypeError; - - fn try_from(value: MessageTypes) -> Result { - match value { - MessageTypes::ApproveState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::NewState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::RejectState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::Heartbeat(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::Accounting(accounting) => Ok(accounting), - } - } - } - impl From for MessageTypes { - fn from(accounting: Accounting) -> Self { - MessageTypes::Accounting(accounting) - } - } - impl Type for ApproveState {} impl TryFrom for ApproveState { - type Error = MessageTypeError; + type Error = MessageError; fn try_from(value: MessageTypes) -> Result { match value { - MessageTypes::NewState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::RejectState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::Heartbeat(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::Accounting(msg) => Err(MessageTypeError::for_actual(&msg)), + MessageTypes::NewState(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::RejectState(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::Heartbeat(msg) => Err(MessageError::for_actual(&msg)), MessageTypes::ApproveState(approve_state) => Ok(approve_state), } } @@ -216,58 +189,85 @@ pub mod messages { } } - impl Type for NewState {} - impl TryFrom for NewState { - type Error = MessageTypeError; + impl Type for NewState {} + impl TryFrom for NewState { + type Error = MessageError; fn try_from(value: MessageTypes) -> Result { match value { - MessageTypes::ApproveState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::RejectState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::Heartbeat(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::Accounting(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::NewState(new_state) => Ok(new_state), + MessageTypes::ApproveState(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::RejectState(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::Heartbeat(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::NewState(new_state) => { + let balances = S::from_unchecked(new_state.balances)?; + + Ok(Self { + state_root: new_state.state_root, + signature: new_state.signature, + balances, + }) + } } } } - impl From for MessageTypes { - fn from(new_state: NewState) -> Self { - MessageTypes::NewState(new_state) + impl From> for MessageTypes { + fn from(new_state: NewState) -> Self { + MessageTypes::NewState(NewState { + state_root: new_state.state_root, + signature: new_state.signature, + balances: new_state.balances.into_unchecked(), + }) } } - impl Type for RejectState {} - impl TryFrom for RejectState { - type Error = MessageTypeError; + impl Type for RejectState {} + impl TryFrom for RejectState { + type Error = MessageError; fn try_from(value: MessageTypes) -> Result { match value { - MessageTypes::ApproveState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::NewState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::Heartbeat(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::Accounting(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::RejectState(reject_state) => Ok(reject_state), + MessageTypes::ApproveState(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::NewState(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::Heartbeat(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::RejectState(reject_state) => { + let balances = reject_state.balances.map(S::from_unchecked).transpose()?; + + Ok(Self { + reason: reject_state.reason, + state_root: reject_state.state_root, + signature: reject_state.signature, + balances, + timestamp: reject_state.timestamp, + }) + } } } } - impl From for MessageTypes { - fn from(reject_state: RejectState) -> Self { - MessageTypes::RejectState(reject_state) + impl From> for MessageTypes { + fn from(reject_state: RejectState) -> Self { + MessageTypes::RejectState(RejectState { + reason: reject_state.reason, + state_root: reject_state.state_root, + signature: reject_state.signature, + balances: reject_state + .balances + .map(|balances| balances.into_unchecked()), + timestamp: reject_state.timestamp, + }) } } impl Type for Heartbeat {} impl TryFrom for Heartbeat { - type Error = MessageTypeError; + type Error = MessageError; fn try_from(value: MessageTypes) -> Result { match value { - MessageTypes::ApproveState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::NewState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::RejectState(msg) => Err(MessageTypeError::for_actual(&msg)), - MessageTypes::Accounting(msg) => Err(MessageTypeError::for_actual(&msg)), + MessageTypes::ApproveState(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::NewState(msg) => Err(MessageError::for_actual(&msg)), + MessageTypes::RejectState(msg) => Err(MessageError::for_actual(&msg)), MessageTypes::Heartbeat(heartbeat) => Ok(heartbeat), } } @@ -279,46 +279,41 @@ pub mod messages { } } - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] - #[serde(rename_all = "camelCase")] - pub struct Accounting { - pub balances: Balances, - pub last_aggregate: DateTime, - } - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct ApproveState { pub state_root: String, pub signature: String, pub is_healthy: bool, - // - // TODO: AIP#61 Remove exhausted property - // - #[serde(default)] - pub exhausted: bool, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] - pub struct NewState { + pub struct NewState { pub state_root: String, pub signature: String, - pub balances: Balances, - // - // TODO: AIP#61 Remove exhausted property - // - #[serde(default)] - pub exhausted: bool, + #[serde(flatten, bound = "S: BalancesState")] + pub balances: Balances, + } + + impl NewState { + pub fn try_checked(self) -> Result, crate::balances::Error> { + Ok(NewState { + state_root: self.state_root, + signature: self.signature, + balances: self.balances.check()?, + }) + } } #[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] - pub struct RejectState { + pub struct RejectState { pub reason: String, pub state_root: String, pub signature: String, - pub balances: Option>, + #[serde(flatten, bound = "S: BalancesState")] + pub balances: Option>, pub timestamp: Option>, } @@ -344,10 +339,9 @@ pub mod messages { #[serde(tag = "type")] pub enum MessageTypes { ApproveState(ApproveState), - NewState(NewState), - RejectState(RejectState), + NewState(NewState), + RejectState(RejectState), Heartbeat(Heartbeat), - Accounting(Accounting), } } #[cfg(feature = "postgres")] diff --git a/sentry/src/db/accounting.rs b/sentry/src/db/accounting.rs index f09741a4b..13d004890 100644 --- a/sentry/src/db/accounting.rs +++ b/sentry/src/db/accounting.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use primitives::{ - sentry::accounting::{Balances, CheckedState}, + balances::{Balances, CheckedState}, Address, ChannelId, UnifiedNum, }; use tokio_postgres::{ @@ -16,7 +16,7 @@ static UPDATE_ACCOUNTING_STATEMENT: &str = "INSERT INTO accounting(channel_id, s #[derive(Debug, Error)] pub enum Error { #[error("Accounting Balances error: {0}")] - Balances(#[from] primitives::sentry::accounting::Error), + Balances(#[from] primitives::balances::Error), #[error("Fetching Accounting from postgres error: {0}")] Postgres(#[from] PoolError), } diff --git a/sentry/src/db/channel.rs b/sentry/src/db/channel.rs index 528cc2dde..6aaadfc36 100644 --- a/sentry/src/db/channel.rs +++ b/sentry/src/db/channel.rs @@ -101,6 +101,7 @@ pub async fn insert_validator_messages( Ok(inserted) } +#[deprecated = "No longer needed for V5"] pub async fn update_exhausted_channel( pool: &DbPool, channel: &Channel, diff --git a/sentry/src/db/event_aggregate.rs b/sentry/src/db/event_aggregate.rs index 73b73d138..0c1a09bfc 100644 --- a/sentry/src/db/event_aggregate.rs +++ b/sentry/src/db/event_aggregate.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; use futures::pin_mut; use primitives::{ + balances::UncheckedState, sentry::{EventAggregate, MessageResponse}, validator::{ApproveState, Heartbeat, NewState}, Address, BigNum, Channel, ChannelId, ValidatorId, @@ -37,7 +38,7 @@ pub async fn latest_new_state( pool: &DbPool, channel: &Channel, state_root: &str, -) -> Result>, PoolError> { +) -> Result>>, PoolError> { let client = pool.get().await?; let select = client.prepare("SELECT \"from\", msg, received FROM validator_messages WHERE channel_id = $1 AND \"from\" = $2 AND msg ->> 'type' = 'NewState' AND msg->> 'stateRoot' = $3 ORDER BY received DESC LIMIT 1").await?; @@ -53,7 +54,7 @@ pub async fn latest_new_state( .await?; rows.get(0) - .map(MessageResponse::::try_from) + .map(MessageResponse::>::try_from) .transpose() .map_err(PoolError::Backend) } diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index 4e4b3f653..236a13807 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -378,10 +378,8 @@ pub mod insert_events { use hyper::{Body, Request, Response}; use primitives::{ adapter::Adapter, - sentry::{ - accounting::{Balances, CheckedState, OverflowError}, - Event, SuccessResponse, - }, + balances::{Balances, CheckedState, OverflowError}, + sentry::{Event, SuccessResponse}, Address, Campaign, CampaignId, DomainError, UnifiedNum, ValidatorDesc, }; use thiserror::Error; diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index f15c717b7..63d7cf577 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -1,7 +1,6 @@ use crate::db::{ event_aggregate::{latest_approve_state, latest_heartbeats, latest_new_state}, - get_channel_by_id, insert_channel, insert_validator_messages, list_channels, - update_exhausted_channel, PoolError, + get_channel_by_id, insert_channel, insert_validator_messages, list_channels, PoolError, }; use crate::{success_response, Application, Auth, ResponseError, RouteParams}; use futures::future::try_join_all; @@ -9,6 +8,7 @@ use hex::FromHex; use hyper::{Body, Request, Response}; use primitives::{ adapter::Adapter, + balances::UncheckedState, sentry::{ channel_list::{ChannelListQuery, LastApprovedQuery}, LastApproved, LastApprovedResponse, SuccessResponse, @@ -128,7 +128,7 @@ pub async fn last_approved( let default_response = Response::builder() .header("Content-type", "application/json") .body( - serde_json::to_string(&LastApprovedResponse { + serde_json::to_string(&LastApprovedResponse:: { last_approved: None, heartbeats: None, })? @@ -202,12 +202,6 @@ pub async fn create_validator_messages( .get("messages") .ok_or_else(|| ResponseError::BadRequest("missing messages body".to_string()))?; - let channel_is_exhausted = messages.iter().any(|message| match message { - MessageTypes::ApproveState(approve) => approve.exhausted, - MessageTypes::NewState(new_state) => new_state.exhausted, - _ => false, - }); - match channel.spec.validators.find(&session.uid) { None => Err(ResponseError::Unauthorized), _ => { @@ -216,12 +210,6 @@ pub async fn create_validator_messages( })) .await?; - if channel_is_exhausted { - if let Some(validator_index) = channel.spec.validators.find_index(&session.uid) { - update_exhausted_channel(&app.pool, &channel, validator_index).await?; - } - } - Ok(success_response(serde_json::to_string(&SuccessResponse { success: true, })?)) diff --git a/sentry/src/routes/event_aggregate.rs b/sentry/src/routes/event_aggregate.rs index 9e31eea2c..987653fcb 100644 --- a/sentry/src/routes/event_aggregate.rs +++ b/sentry/src/routes/event_aggregate.rs @@ -4,17 +4,18 @@ use serde::Deserialize; use primitives::{adapter::Adapter, sentry::EventAggregateResponse, Channel}; -use crate::{db::list_event_aggregates, success_response, Application, Auth, ResponseError}; +use crate::{success_response, Application, Auth, ResponseError}; #[derive(Deserialize)] pub struct EventAggregatesQuery { #[serde(default, with = "ts_milliseconds_option")] + #[allow(dead_code)] after: Option>, } pub async fn list_channel_event_aggregates( req: Request, - app: &Application, + _app: &Application, ) -> Result, ResponseError> { let channel = req .extensions() @@ -26,23 +27,24 @@ pub async fn list_channel_event_aggregates( .get::() .ok_or(ResponseError::Unauthorized)?; - let query = + let _query = serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?; - let from = if channel.spec.validators.find(&auth.uid).is_some() { + let _from = if channel.spec.validators.find(&auth.uid).is_some() { None } else { Some(auth.uid) }; - let event_aggregates = list_event_aggregates( - &app.pool, - &channel.id, - app.config.events_find_limit, - &from, - &query.after, - ) - .await?; + let event_aggregates = vec![]; + // let event_aggregates = list_event_aggregates( + // &app.pool, + // &channel.id, + // app.config.events_find_limit, + // &from, + // &query.after, + // ) + // .await?; let response = EventAggregateResponse { channel: channel.clone(), diff --git a/validator_worker/src/core/events.rs b/validator_worker/src/core/events.rs deleted file mode 100644 index 4321e83a9..000000000 --- a/validator_worker/src/core/events.rs +++ /dev/null @@ -1,186 +0,0 @@ -use num_traits::CheckedSub; - -use primitives::sentry::{accounting::{Balances, CheckedState}, AggregateEvents, EventAggregate}; -use primitives::validator::Accounting; -use primitives::{BalancesMap, BigNum, Channel, DomainError}; - -// -// TODO #381: AIP#61 Use the new Spender Aggregate and Sum all balances for the new Accounting -// & Temporary allow unnecessary_wraps -#[allow(clippy::unnecessary_wraps)] -pub(crate) fn merge_aggrs( - accounting: &Accounting, - aggregates: &[EventAggregate], - // - // TODO: AIP#61 Use Campaign and if we should check the total sum of the Balances < campaign.budget - // - _channel: &Channel, -) -> Result { - let last_aggregate = [accounting.last_aggregate] - .iter() - .chain(aggregates.iter().map(|aggr| &aggr.created)) - .max() - .unwrap_or(&accounting.last_aggregate) - .to_owned(); - - // Build an intermediary balances representation - // - // TODO: AIP#61 Sum all Spender Aggregates and use that for the new Accounting - // - let balances = Balances::::default(); - - let new_accounting = Accounting { - balances, - last_aggregate, - }; - - Ok(new_accounting) -} - -// -// TODO: AIP#61 Check how this should apply for the new Campaigns -// -fn _merge_payouts_into_balances<'a, T: Iterator>( - balances: &BalancesMap, - events: T, - deposit: &BigNum, -) -> Result { - let mut new_balances = balances.clone(); - - let total = balances.values().sum(); - let mut remaining = deposit.checked_sub(&total).ok_or_else(|| { - DomainError::RuleViolation("remaining starts negative: total>depositAmount".to_string()) - })?; - - let all_payouts = events.map(|aggr_ev| aggr_ev.event_payouts.iter()).flatten(); - - for (acc, payout) in all_payouts { - let to_add = payout.min(&remaining); - - let new_balance = new_balances.entry(*acc).or_insert_with(|| 0.into()); - - *new_balance += to_add; - - remaining = remaining.checked_sub(to_add).ok_or_else(|| { - DomainError::RuleViolation("remaining must never be negative".to_string()) - })?; - } - - Ok(new_balances) -} - -#[cfg(test)] -mod test { - use chrono::Utc; - - use primitives::{ - util::tests::prep_db::{ - ADDRESSES, DUMMY_CHANNEL, DUMMY_VALIDATOR_FOLLOWER, DUMMY_VALIDATOR_LEADER, - }, - Address, Channel, ChannelSpec, ValidatorDesc, - }; - - use super::*; - - #[test] - #[ignore] - fn should_merge_event_aggrs_and_apply_fees() { - // fees: 100 - // deposit: 10 000 - let leader = ValidatorDesc { - fee: 50.into(), - ..DUMMY_VALIDATOR_LEADER.clone() - }; - let follower = ValidatorDesc { - fee: 50.into(), - ..DUMMY_VALIDATOR_FOLLOWER.clone() - }; - - let mut channel = Channel { - deposit_amount: 10_000.into(), - ..DUMMY_CHANNEL.clone() - }; - channel.spec.validators = (leader, follower).into(); - - let acc = Accounting { - balances: BalancesMap::default(), - last_aggregate: Utc::now(), - }; - - let new_accounting = - merge_aggrs(&acc, &[gen_ev_aggr(5, &ADDRESSES["publisher"])], &channel) - .expect("Something went wrong"); - - assert_eq!( - new_accounting.balances[&ADDRESSES["publisher"]], - 148.into(), - "balances is ok" - ); - } - - #[test] - #[ignore] - fn should_never_allow_exceeding_the_deposit() { - let leader = ValidatorDesc { - fee: 50.into(), - ..DUMMY_VALIDATOR_LEADER.clone() - }; - let follower = ValidatorDesc { - fee: 50.into(), - ..DUMMY_VALIDATOR_FOLLOWER.clone() - }; - - let spec = ChannelSpec { - validators: (leader, follower).into(), - ..DUMMY_CHANNEL.spec.clone() - }; - let channel = Channel { - deposit_amount: 10_000.into(), - spec, - ..DUMMY_CHANNEL.clone() - }; - - let acc = Accounting { - last_aggregate: Utc::now(), - balances: BalancesMap::default(), - }; - - let new_accounting = merge_aggrs( - &acc, - &[gen_ev_aggr(1_001, &ADDRESSES["publisher"])], - &channel, - ) - .expect("Something went wrong"); - - assert_eq!( - new_accounting.balances[&ADDRESSES["publisher"]], - 9_702.into(), - "balances is ok" - ); - assert_eq!( - &new_accounting.balances.values().sum::(), - &channel.deposit_amount, - "sum(balances) == depositAmount" - ); - } - - // - // TODO: AIP#61 Use new Spender Aggregate - // - fn gen_ev_aggr(count: u64, recipient: &Address) -> EventAggregate { - let aggregate_events = AggregateEvents { - event_counts: Some(vec![(*recipient, count.into())].into_iter().collect()), - event_payouts: vec![(*recipient, (count * 10).into())] - .into_iter() - .collect(), - }; - - EventAggregate { - channel_id: DUMMY_CHANNEL.id.to_owned(), - created: Utc::now(), - events: vec![("IMPRESSION".to_string(), aggregate_events)] - .into_iter() - .collect(), - } - } -} diff --git a/validator_worker/src/follower.rs b/validator_worker/src/follower.rs index 140ebefb0..e1531729b 100644 --- a/validator_worker/src/follower.rs +++ b/validator_worker/src/follower.rs @@ -4,14 +4,14 @@ use std::fmt; use primitives::adapter::{Adapter, AdapterErrorKind}; use primitives::validator::{ApproveState, MessageTypes, NewState, RejectState}; use primitives::{ - sentry::accounting::{Balances, CheckedState}, - BalancesMap, BigNum, + balances::{Balances, UncheckedState}, + BalancesMap, }; use crate::core::follower_rules::{get_health, is_valid_transition}; -use crate::heartbeat::{heartbeat, HeartbeatStatus}; +use crate::heartbeat::HeartbeatStatus; use crate::sentry_interface::{PropagationResult, SentryApi}; -use crate::{get_state_root_hash, producer}; +use crate::{get_state_root_hash, heartbeat::heartbeat}; use chrono::Utc; #[derive(Debug)] @@ -52,7 +52,6 @@ pub enum ApproveStateResult { pub struct TickStatus { pub heartbeat: HeartbeatStatus, pub approve_state: ApproveStateResult, - pub producer_tick: producer::TickStatus, } pub async fn tick( @@ -80,13 +79,8 @@ pub async fn tick( _ => false, }; - let producer_tick = producer::tick(iface).await?; - let empty_balances = Balances::::default(); - let _balances = match &producer_tick { - producer::TickStatus::Sent { new_accounting, .. } => &new_accounting.balances, - producer::TickStatus::NoNewEventAggr(balances) => balances, - producer::TickStatus::EmptyBalances => &empty_balances, - }; + let _balances = Balances::::default(); + let approve_state_result = if let (Some(new_state), false) = (new_msg, latest_is_responded_to) { on_new_state(iface, &BalancesMap::default(), &new_state).await? } else { @@ -94,17 +88,15 @@ pub async fn tick( }; Ok(TickStatus { - heartbeat: Default::default(), - // heartbeat: heartbeat(iface, balances).await?, + heartbeat: heartbeat(iface).await?, approve_state: approve_state_result, - producer_tick, }) } async fn on_new_state<'a, A: Adapter + 'static>( iface: &'a SentryApi, balances: &'a BalancesMap, - new_state: &'a NewState, + new_state: &'a NewState, ) -> Result, Box> { let proposed_balances = BalancesMap::default(); // let proposed_balances = new_state.balances.clone(); @@ -146,14 +138,12 @@ async fn on_new_state<'a, A: Adapter + 'static>( let signature = iface.adapter.sign(&new_state.state_root)?; let health_threshold = u64::from(iface.config.health_threshold_promilles); let is_healthy = health >= health_threshold; - let exhausted = proposed_balances.values().sum::() == iface.channel.deposit_amount; let propagation_result = iface .propagate(&[&MessageTypes::ApproveState(ApproveState { state_root: proposed_state_root, signature, is_healthy, - exhausted, })]) .await; @@ -162,7 +152,7 @@ async fn on_new_state<'a, A: Adapter + 'static>( async fn on_error<'a, A: Adapter + 'static>( iface: &'a SentryApi, - new_state: &'a NewState, + new_state: &'a NewState, status: InvalidNewState, ) -> ApproveStateResult { let propagation = iface diff --git a/validator_worker/src/heartbeat.rs b/validator_worker/src/heartbeat.rs index 4e2016558..fb777a811 100644 --- a/validator_worker/src/heartbeat.rs +++ b/validator_worker/src/heartbeat.rs @@ -8,7 +8,6 @@ use byteorder::{BigEndian, ByteOrder}; use primitives::adapter::Adapter; use primitives::merkle_tree::MerkleTree; use primitives::validator::{Heartbeat, MessageTypes}; -use primitives::{BalancesMap, BigNum, Channel}; use crate::sentry_interface::{PropagationResult, SentryApi}; @@ -40,7 +39,6 @@ async fn send_heartbeat( pub async fn heartbeat( iface: &SentryApi, - balances: &BalancesMap, ) -> Result, Box> { let validator_message_response = iface.get_our_latest_msg(&["Heartbeat"]).await?; let heartbeat_msg = match validator_message_response { @@ -51,7 +49,6 @@ pub async fn heartbeat( let should_send = heartbeat_msg.map_or(true, |heartbeat| { let duration = Utc::now() - heartbeat.timestamp; duration > Duration::milliseconds(iface.config.heartbeat_time.into()) - && !is_channel_exhausted(&iface.channel, balances) }); if should_send { @@ -60,7 +57,3 @@ pub async fn heartbeat( Ok(None) } } - -fn is_channel_exhausted(channel: &Channel, balances: &BalancesMap) -> bool { - balances.values().sum::() == channel.deposit_amount -} diff --git a/validator_worker/src/leader.rs b/validator_worker/src/leader.rs index 22acdab21..03683a119 100644 --- a/validator_worker/src/leader.rs +++ b/validator_worker/src/leader.rs @@ -1,63 +1,49 @@ use std::error::Error; use primitives::adapter::{Adapter, AdapterErrorKind}; +use primitives::balances::UncheckedState; use primitives::{ - sentry::accounting::{Balances, CheckedState}, - validator::{Accounting, MessageTypes, NewState}, - BalancesMap, BigNum, + sentry::Accounting, + validator::{MessageTypes, NewState}, + BalancesMap, }; +use crate::get_state_root_hash; use crate::heartbeat::{heartbeat, HeartbeatStatus}; use crate::sentry_interface::{PropagationResult, SentryApi}; -use crate::{get_state_root_hash, producer}; #[derive(Debug)] pub struct TickStatus { pub heartbeat: HeartbeatStatus, /// If None, then the conditions for handling a new state haven't been met pub new_state: Option>>, - pub producer_tick: producer::TickStatus, } pub async fn tick( iface: &SentryApi, ) -> Result, Box> { - let producer_tick = producer::tick(iface).await?; - let empty_balances = Balances::::default(); - let (_balances, new_state) = match &producer_tick { - producer::TickStatus::Sent { new_accounting, .. } => { - let new_state = on_new_accounting(iface, new_accounting).await?; - (&new_accounting.balances, Some(new_state)) - } - producer::TickStatus::NoNewEventAggr(balances) => (balances, None), - producer::TickStatus::EmptyBalances => (&empty_balances, None), - }; + let new_state = None; Ok(TickStatus { - heartbeat: heartbeat(iface, &BalancesMap::default()).await?, + heartbeat: heartbeat(iface).await?, new_state, - producer_tick, }) } -async fn on_new_accounting( +async fn _on_new_accounting( iface: &SentryApi, - new_accounting: &Accounting, + new_accounting: &Accounting, ) -> Result>, Box> { let state_root_raw = get_state_root_hash(iface, &BalancesMap::default())?; let state_root = hex::encode(state_root_raw); let signature = iface.adapter.sign(&state_root)?; - let exhausted = - new_accounting.balances.earners.values().sum::() == iface.channel.deposit_amount; - let propagation_results = iface .propagate(&[&MessageTypes::NewState(NewState { state_root, signature, balances: new_accounting.balances.clone(), - exhausted, })]) .await; diff --git a/validator_worker/src/lib.rs b/validator_worker/src/lib.rs index 0096be1d2..e19b0ca14 100644 --- a/validator_worker/src/lib.rs +++ b/validator_worker/src/lib.rs @@ -14,11 +14,9 @@ pub mod error; pub mod follower; pub mod heartbeat; pub mod leader; -pub mod producer; pub mod sentry_interface; pub mod core { - pub mod events; pub mod follower_rules; } diff --git a/validator_worker/src/producer.rs b/validator_worker/src/producer.rs deleted file mode 100644 index c42e58179..000000000 --- a/validator_worker/src/producer.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::error::Error; - -use chrono::{TimeZone, Utc}; - -use primitives::adapter::{Adapter, AdapterErrorKind}; -use primitives::validator::{Accounting, MessageTypes}; -use primitives::{ - sentry::accounting::{Balances, CheckedState}, - ChannelId, -}; - -use crate::core::events::merge_aggrs; -use crate::sentry_interface::{PropagationResult, SentryApi}; -use slog::info; - -#[derive(Debug)] -pub enum TickStatus { - Sent { - channel: ChannelId, - new_accounting: Accounting, - accounting_propagation: Vec>, - event_counts: usize, - }, - NoNewEventAggr(Balances), - EmptyBalances, -} - -pub async fn tick( - iface: &SentryApi, -) -> Result, Box> { - let validator_msg_resp = iface.get_our_latest_msg(&["Accounting"]).await?; - - let accounting = match validator_msg_resp { - Some(MessageTypes::Accounting(accounting)) => accounting, - _ => Accounting { - last_aggregate: Utc.timestamp(0, 0), - balances: Default::default(), - }, - }; - - // - // TODO #381: AIP#61 Merge all Spender Aggregates and create a new Accounting - // - - let aggrs = iface - .get_event_aggregates(accounting.last_aggregate) - .await?; - - if aggrs.events.is_empty() { - return Ok(TickStatus::NoNewEventAggr(accounting.balances)); - } - - // - // TODO: AIP#61 Merge all Spender Aggregates when it's implemented - // - let new_accounting = merge_aggrs(&accounting, &aggrs.events, &iface.channel)?; - - if new_accounting.balances.earners.is_empty() || new_accounting.balances.spenders.is_empty() { - info!( - iface.logger, - "channel {}: empty Accounting balances, skipping propagation", iface.channel.id - ); - - Ok(TickStatus::EmptyBalances) - } else { - info!( - iface.logger, - "channel {}: processed {} event aggregates", - iface.channel.id, - aggrs.events.len() - ); - - let message_types = MessageTypes::Accounting(new_accounting.clone()); - - Ok(TickStatus::Sent { - channel: iface.channel.id, - accounting_propagation: iface.propagate(&[&message_types]).await, - new_accounting, - event_counts: aggrs.events.len(), - }) - } -} diff --git a/validator_worker/src/sentry_interface.rs b/validator_worker/src/sentry_interface.rs index ca8e8b61f..218c97c97 100644 --- a/validator_worker/src/sentry_interface.rs +++ b/validator_worker/src/sentry_interface.rs @@ -4,6 +4,7 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use futures::future::{join_all, try_join_all, TryFutureExt}; +use primitives::balances::UncheckedState; use reqwest::{Client, Response}; use slog::Logger; @@ -169,23 +170,27 @@ impl SentryApi { .await } - pub async fn get_last_approved(&self) -> Result> { + pub async fn get_last_approved( + &self, + ) -> Result, Error> { self.client .get(&format!("{}/last-approved", self.validator_url)) .send() - .and_then(|res: Response| res.json::()) + .and_then(|res: Response| res.json::>()) .map_err(Error::Request) .await } - pub async fn get_last_msgs(&self) -> Result> { + pub async fn get_last_msgs( + &self, + ) -> Result, Error> { self.client .get(&format!( "{}/last-approved?withHeartbeat=true", self.validator_url )) .send() - .and_then(|res: Response| res.json::()) + .and_then(|res: Response| res.json::>()) .map_err(Error::Request) .await }