diff --git a/Cargo.lock b/Cargo.lock index baad23faa..7fd4ee44c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4060,6 +4060,7 @@ dependencies = [ "thiserror", "tokio", "toml", + "wiremock", ] [[package]] diff --git a/primitives/src/spender.rs b/primitives/src/spender.rs index 7aba539e3..474b50837 100644 --- a/primitives/src/spender.rs +++ b/primitives/src/spender.rs @@ -1,14 +1,14 @@ use crate::{Address, Channel, Deposit, UnifiedNum}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] #[serde(rename_all = "camelCase")] pub struct SpenderLeaf { pub total_spent: UnifiedNum, // merkle_proof: [u8; 32], // TODO } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct Spender { pub total_deposited: UnifiedNum, pub spender_leaf: Option, diff --git a/sentry/src/db.rs b/sentry/src/db.rs index 9b64fcac3..7126d81a4 100644 --- a/sentry/src/db.rs +++ b/sentry/src/db.rs @@ -334,13 +334,19 @@ pub mod tests_postgres { // set the database in the configuration of the inside Pool (used for tests) config.dbname(&database.name); - let manager = - deadpool_postgres::Manager::from_config(config, NoTls, self.manager_config.clone()); - + let manager = deadpool_postgres::Manager::from_config( + config, + NoTls, + self.manager_config.clone(), + ); + deadpool_postgres::Pool::new(manager, 15) }; - let result = database.pool.get().await? + let result = database + .pool + .get() + .await? .simple_query(queries) .await .map_err(PoolError::Backend) diff --git a/test_harness/src/lib.rs b/test_harness/src/lib.rs index af12e22ec..cc1290332 100644 --- a/test_harness/src/lib.rs +++ b/test_harness/src/lib.rs @@ -730,7 +730,9 @@ pub mod run { let postgres = postgres_connection(42, postgres_config).await; let mut redis = redis_connection(app_config.redis_url).await?; - Manager::flush_db(&mut redis).await.expect("Should flush redis database"); + Manager::flush_db(&mut redis) + .await + .expect("Should flush redis database"); let campaign_remaining = CampaignRemaining::new(redis.clone()); diff --git a/validator_worker/Cargo.toml b/validator_worker/Cargo.toml index e8400fe62..f846d05ef 100644 --- a/validator_worker/Cargo.toml +++ b/validator_worker/Cargo.toml @@ -39,3 +39,6 @@ serde_urlencoded = "0.7" toml = "0.5" # CLI clap = "^2.33" + +[dev-dependencies] +wiremock = "0.5" diff --git a/validator_worker/src/sentry_interface.rs b/validator_worker/src/sentry_interface.rs index db3f75f57..6f551a310 100644 --- a/validator_worker/src/sentry_interface.rs +++ b/validator_worker/src/sentry_interface.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, time::Duration}; use chrono::{DateTime, Utc}; -use futures::future::{join_all, TryFutureExt}; +use futures::future::{join_all, try_join_all, TryFutureExt}; use reqwest::{Client, Method}; use slog::Logger; @@ -9,8 +9,8 @@ use primitives::{ adapter::Adapter, balances::{CheckedState, UncheckedState}, sentry::{ - AccountingResponse, EventAggregateResponse, LastApprovedResponse, SuccessResponse, - ValidatorMessageResponse, + AccountingResponse, AllSpendersResponse, EventAggregateResponse, LastApprovedResponse, + SuccessResponse, ValidatorMessageResponse, }, spender::Spender, util::ApiUrl, @@ -161,15 +161,16 @@ impl SentryApi { .map_err(Error::Request) } - // TODO: Pagination & use of `AllSpendersResponse` - pub async fn get_all_spenders( + /// page always starts from 0 + pub async fn get_spenders_page( &self, - channel: ChannelId, - ) -> Result, Error> { + channel: &ChannelId, + page: u64, + ) -> Result { let url = self .whoami .url - .join(&format!("v5/channel/{}/spender/all", channel)) + .join(&format!("v5/channel/{}/spender/all?page={}", channel, page)) .expect("Should not error when creating endpoint"); self.client @@ -177,12 +178,34 @@ impl SentryApi { .bearer_auth(&self.whoami.token) .send() .await? - // TODO: Should be `AllSpendersResponse` and should have pagination! .json() .map_err(Error::Request) .await } + pub async fn get_all_spenders( + &self, + channel: ChannelId, + ) -> Result, Error> { + let first_page = self.get_spenders_page(&channel, 0).await?; + + if first_page.pagination.total_pages < 2 { + Ok(first_page.spenders) + } else { + let all: Vec = try_join_all( + (1..first_page.pagination.total_pages).map(|i| self.get_spenders_page(&channel, i)), + ) + .await?; + + let result_all: HashMap = std::iter::once(first_page) + .chain(all.into_iter()) + .flat_map(|p| p.spenders) + .collect(); + + Ok(result_all) + } + } + /// Get the accounting from Sentry /// `Balances` should always be in `CheckedState` pub async fn get_accounting( @@ -378,3 +401,179 @@ pub mod campaigns { client.get(endpoint).send().await?.json().await } } + +#[cfg(test)] +mod test { + use super::*; + use adapter::DummyAdapter; + use primitives::{ + adapter::DummyAdapterOptions, + config::{configuration, Environment}, + sentry::Pagination, + util::tests::{ + discard_logger, + prep_db::{ + ADDRESSES, DUMMY_CAMPAIGN, DUMMY_VALIDATOR_LEADER, IDS, + }, + }, + UnifiedNum, + }; + use std::str::FromStr; + use wiremock::{ + matchers::{method, path, query_param}, + Mock, MockServer, ResponseTemplate, + }; + + #[tokio::test] + async fn test_get_all_spenders() { + let server = MockServer::start().await; + let test_spender = Spender { + total_deposited: UnifiedNum::from(100_000_000), + spender_leaf: None, + }; + let mut all_spenders = HashMap::new(); + all_spenders.insert(ADDRESSES["user"], test_spender.clone()); + all_spenders.insert(ADDRESSES["publisher"], test_spender.clone()); + all_spenders.insert(ADDRESSES["publisher2"], test_spender.clone()); + all_spenders.insert(ADDRESSES["creator"], test_spender.clone()); + all_spenders.insert(ADDRESSES["tester"], test_spender.clone()); + + let first_page_response = AllSpendersResponse { + spenders: vec![ + ( + ADDRESSES["user"], + all_spenders.get(&ADDRESSES["user"]).unwrap().to_owned(), + ), + ( + ADDRESSES["publisher"], + all_spenders + .get(&ADDRESSES["publisher"]) + .unwrap() + .to_owned(), + ), + ] + .into_iter() + .collect(), + pagination: Pagination { + page: 0, + total_pages: 3, + }, + }; + + let second_page_response = AllSpendersResponse { + spenders: vec![ + ( + ADDRESSES["publisher2"], + all_spenders + .get(&ADDRESSES["publisher2"]) + .unwrap() + .to_owned(), + ), + ( + ADDRESSES["creator"], + all_spenders.get(&ADDRESSES["creator"]).unwrap().to_owned(), + ), + ] + .into_iter() + .collect(), + pagination: Pagination { + page: 1, + total_pages: 3, + }, + }; + + let third_page_response = AllSpendersResponse { + spenders: vec![( + ADDRESSES["tester"], + all_spenders.get(&ADDRESSES["tester"]).unwrap().to_owned(), + )] + .into_iter() + .collect(), + pagination: Pagination { + page: 2, + total_pages: 3, + }, + }; + + Mock::given(method("GET")) + .and(path(format!( + "/v5/channel/{}/spender/all", + DUMMY_CAMPAIGN.channel.id() + ))) + .and(query_param("page", "0")) + .respond_with(ResponseTemplate::new(200).set_body_json(&first_page_response)) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path(format!( + "/v5/channel/{}/spender/all", + DUMMY_CAMPAIGN.channel.id() + ))) + .and(query_param("page", "1")) + .respond_with(ResponseTemplate::new(200).set_body_json(&second_page_response)) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path(format!( + "/v5/channel/{}/spender/all", + DUMMY_CAMPAIGN.channel.id() + ))) + .and(query_param("page", "2")) + .respond_with(ResponseTemplate::new(200).set_body_json(&third_page_response)) + .mount(&server) + .await; + + let mut validators = Validators::new(); + validators.insert( + DUMMY_VALIDATOR_LEADER.id, + Validator { + url: ApiUrl::from_str(&server.uri()).expect("Should parse"), + token: AuthToken::default(), + }, + ); + let mut config = configuration(Environment::Development, None).expect("Should get Config"); + config.spendable_find_limit = 2; + + let adapter = DummyAdapter::init( + DummyAdapterOptions { + dummy_identity: IDS["leader"], + dummy_auth: Default::default(), + dummy_auth_tokens: Default::default(), + }, + &config, + ); + let logger = discard_logger(); + + let sentry = + SentryApi::init(adapter, logger, config, validators).expect("Should build sentry"); + + let mut res = sentry + .get_all_spenders(DUMMY_CAMPAIGN.channel.id()) + .await + .expect("should get response"); + + // Checks for page 1 + let res_user = res.remove(&ADDRESSES["user"]); + let res_publisher = res.remove(&ADDRESSES["publisher"]); + assert!(res_user.is_some() && res_publisher.is_some()); + assert_eq!(res_user.unwrap(), test_spender); + assert_eq!(res_publisher.unwrap(), test_spender); + + // Checks for page 2 + let res_publisher2 = res.remove(&ADDRESSES["publisher2"]); + let res_creator = res.remove(&ADDRESSES["creator"]); + assert!(res_publisher2.is_some() && res_creator.is_some()); + assert_eq!(res_publisher2.unwrap(), test_spender); + assert_eq!(res_creator.unwrap(), test_spender); + + // Checks for page 3 + let res_tester = res.remove(&ADDRESSES["tester"]); + assert!(res_tester.is_some()); + assert_eq!(res_tester.unwrap(), test_spender); + + // There should be no remaining elements + assert_eq!(res.len(), 0) + } +}