diff --git a/adview-manager/serve/src/main.rs b/adview-manager/serve/src/main.rs index 714503d14..900765e71 100644 --- a/adview-manager/serve/src/main.rs +++ b/adview-manager/serve/src/main.rs @@ -13,8 +13,7 @@ use wiremock::{ Mock, MockServer, ResponseTemplate, }; - -use tera::{Tera, Context}; +use tera::{Context, Tera}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -22,9 +21,11 @@ async fn main() -> Result<(), Box> { let serve_dir = match std::env::current_dir().unwrap() { serve_path if serve_path.ends_with("serve") => serve_path, - adview_manager_path if adview_manager_path.ends_with("adview-manager") => adview_manager_path.join("serve"), + adview_manager_path if adview_manager_path.ends_with("adview-manager") => { + adview_manager_path.join("serve") + } // running from the Validator stack workspace - workspace_path => workspace_path.join("adview-manager/serve") + workspace_path => workspace_path.join("adview-manager/serve"), }; let templates_glob = format!("{}/templates/**/*.html", serve_dir.display()); diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index 71442a111..de74dc013 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -3,7 +3,7 @@ use crate::{ balances::BalancesState, spender::Spender, validator::{ApproveState, Heartbeat, MessageTypes, NewState, Type as MessageType}, - Address, Balances, CampaignId, UnifiedNum, ValidatorId, IPFS, + Address, Balances, CampaignId, UnifiedMap, UnifiedNum, ValidatorId, IPFS, }; use chrono::{ serde::ts_milliseconds, Date, DateTime, Datelike, Duration, NaiveDate, TimeZone, Timelike, Utc, @@ -590,6 +590,11 @@ pub struct AllSpendersQuery { pub page: u64, } +#[derive(Debug, Serialize, Deserialize)] +pub struct ChannelPayRequest { + pub payouts: UnifiedMap, +} + #[derive(Serialize, Deserialize, Debug)] pub struct ValidatorMessage { pub from: ValidatorId, diff --git a/sentry/src/db/channel.rs b/sentry/src/db/channel.rs index 9a9ec4330..46383a475 100644 --- a/sentry/src/db/channel.rs +++ b/sentry/src/db/channel.rs @@ -1,4 +1,4 @@ -use primitives::{Channel, ChannelId, ValidatorId}; +use primitives::{Channel, ChannelId}; pub use list_channels::list_channels; diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 7c93ea243..06b780b48 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -26,8 +26,8 @@ use { campaign, campaign::{campaign_list, create_campaign, update_campaign}, channel::{ - add_spender_leaf, channel_list, get_accounting_for_channel, get_all_spender_limits, - get_spender_limits, last_approved, + add_spender_leaf, channel_list, channel_payout, get_accounting_for_channel, + get_all_spender_limits, get_spender_limits, last_approved, validator_message::{ create_validator_messages, extract_params, list_validator_messages, }, @@ -77,6 +77,9 @@ static CHANNEL_ACCOUNTING: Lazy = Lazy::new(|| { Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/accounting/?$") .expect("The regex should be valid") }); +static CHANNEL_PAY: Lazy = Lazy::new(|| { + Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/pay/?$").expect("The regex should be valid") +}); /// Regex extracted parameters. /// This struct is created manually on each of the matched routes. @@ -210,10 +213,6 @@ async fn campaigns_router( } // TODO AIP#61: Add routes for: -// - POST /channel/:id/pay -// #[serde(rename_all = "camelCase")] -// Pay { payout: BalancesMap }, -// // - GET /channel/:id/get-leaf async fn channels_router( mut req: Request, @@ -345,6 +344,21 @@ async fn channels_router( .await?; get_accounting_for_channel(req, app).await + } + // POST /v5/channel/:id/pay + else if let (Some(caps), &Method::POST) = (CHANNEL_PAY.captures(&path), method) { + let param = RouteParams(vec![caps + .get(1) + .map_or("".to_string(), |m| m.as_str().to_string())]); + req.extensions_mut().insert(param); + + req = Chain::new() + .chain(AuthRequired) + .chain(ChannelLoad) + .apply(req, app) + .await?; + + channel_payout(req, app).await } else { Err(ResponseError::NotFound) } diff --git a/sentry/src/routes.rs b/sentry/src/routes.rs index 116305058..0f0bc1999 100644 --- a/sentry/src/routes.rs +++ b/sentry/src/routes.rs @@ -76,13 +76,19 @@ //! //! Response: [`LastApprovedResponse`][primitives::sentry::LastApprovedResponse] //! -//! - `POST /v5/channel/:id/pay` (auth required) +//! - [`POST /v5/channel/:id/pay`](channel::channel_payout) (auth required) //! -//! TODO: implement and document as part of issue #382 +//! Channel Payout with authentication of the spender. +//! +//! This route handles withdrawals of advertiser funds for the authenticated spender. +//! It needs to ensure all campaigns are closed. It accepts a JSON body in the request which contains +//! all of the earners and updates their balances accordingly. Used when an advertiser/spender wants +//! to get their remaining funds back. +//! +//! Request JSON body: [`ChannelPayRequest`](primitives::sentry::ChannelPayRequest) //! -//! Channel Payout with authentication of the spender +//! Response: [`SuccessResponse`](primitives::sentry::SuccessResponse) //! -//! Withdrawals of advertiser funds - re-introduces the PAY event with a separate route. //! //! - `GET /v5/channel/:id/get-leaf` //! diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index 1c70f0189..925a3b2c3 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -14,7 +14,7 @@ use crate::{ }; use adapter::{prelude::*, Adapter, Error as AdaptorError}; use deadpool_postgres::PoolError; -use futures::future::try_join_all; +use futures::{future::try_join_all, TryFutureExt}; use hyper::{Body, Request, Response}; use primitives::{ campaign_validator::Validator, @@ -115,17 +115,24 @@ pub async fn fetch_campaign_ids_for_channel( if total_pages < 2 { Ok(campaign_ids) } else { - let other_pages: Vec> = try_join_all((1..total_pages).map(|i| { - get_campaign_ids_by_channel( - pool, - &channel_id, - limit.into(), - i.checked_mul(limit.into()).expect("TODO"), - ) + let pages_skip: Vec = (1..total_pages) + .map(|i| { + i.checked_mul(limit.into()).ok_or_else(|| { + ResponseError::FailedValidation( + "Calculating skip while fetching campaign ids results in an overflow" + .to_string(), + ) + }) + }) + .collect::>()?; + + let other_pages = try_join_all(pages_skip.into_iter().map(|skip| { + get_campaign_ids_by_channel(pool, &channel_id, limit.into(), skip) + .map_err(|e| ResponseError::BadRequest(e.to_string())) })) .await?; - let all_campaigns: Vec = std::iter::once(campaign_ids) + let all_campaigns = std::iter::once(campaign_ids) .chain(other_pages.into_iter()) .flat_map(|campaign_ids| campaign_ids.into_iter()) .collect(); @@ -1279,13 +1286,19 @@ mod test { let new_budget = UnifiedNum::from_u64(300 * multiplier); let delta_budget = get_delta_budget(&campaign_remaining, &campaign, new_budget).await; - assert!(matches!(&delta_budget, Err(Error::NewBudget(_))), "Got result: {delta_budget:?}"); + assert!( + matches!(&delta_budget, Err(Error::NewBudget(_))), + "Got result: {delta_budget:?}" + ); // campaign_spent == new_budget let new_budget = UnifiedNum::from_u64(400 * multiplier); let delta_budget = get_delta_budget(&campaign_remaining, &campaign, new_budget).await; - assert!(matches!(&delta_budget, Err(Error::NewBudget(_))), "Got result: {delta_budget:?}"); + assert!( + matches!(&delta_budget, Err(Error::NewBudget(_))), + "Got result: {delta_budget:?}" + ); } // Increasing budget { diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index 8ab867791..f40d4c299 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -2,13 +2,18 @@ //! use crate::db::{ - accounting::{get_all_accountings_for_channel, update_accounting, Side}, + accounting::{ + get_accounting, get_all_accountings_for_channel, spend_amount, update_accounting, Side, + }, insert_channel, list_channels, spendable::{fetch_spendable, get_all_spendables_for_channel, update_spendable}, validator_message::{latest_approve_state, latest_heartbeats, latest_new_state}, DbPool, }; -use crate::{success_response, Application, ResponseError, RouteParams}; +use crate::{ + campaign::fetch_campaign_ids_for_channel, success_response, Application, Auth, ResponseError, + RouteParams, +}; use adapter::{client::Locked, Adapter}; use futures::future::try_join_all; use hyper::{Body, Request, Response}; @@ -16,7 +21,8 @@ use primitives::{ balances::{Balances, CheckedState, UncheckedState}, sentry::{ channel_list::ChannelListQuery, AccountingResponse, AllSpendersQuery, AllSpendersResponse, - LastApproved, LastApprovedQuery, LastApprovedResponse, SpenderResponse, SuccessResponse, + ChannelPayRequest, LastApproved, LastApprovedQuery, LastApprovedResponse, SpenderResponse, + SuccessResponse, }, spender::{Spendable, Spender}, validator::NewState, @@ -207,7 +213,7 @@ pub async fn get_spender_limits( } }; - let new_state = match get_corresponding_new_state(&app.pool, &app.logger, &channel).await? { + let new_state = match get_corresponding_new_state(&app.pool, &app.logger, channel).await? { Some(new_state) => new_state, None => return spender_response_without_leaf(latest_spendable.deposit.total), }; @@ -392,6 +398,129 @@ pub async fn get_accounting_for_channel( Ok(success_response(serde_json::to_string(&res)?)) } +/// `POST /v5/channel/0xXXX.../pay` request +/// +/// Body: [`ChannelPayRequest`] +/// +/// Response: [`SuccessResponse`] +pub async fn channel_payout( + req: Request, + app: &Application, +) -> Result, ResponseError> { + let channel_context = req + .extensions() + .get::>() + .expect("Request should have Channel & Chain/TokenInfo") + .to_owned(); + + let auth = req + .extensions() + .get::() + .ok_or(ResponseError::Unauthorized)? + .to_owned(); + + let spender = auth.uid.to_address(); + + let body = hyper::body::to_bytes(req.into_body()).await?; + let to_pay = serde_json::from_slice::(&body) + .map_err(|e| ResponseError::FailedValidation(e.to_string()))?; + + // Handling the case where a request with an empty body comes through + if to_pay.payouts.len() == 0 { + return Err(ResponseError::FailedValidation( + "Request has empty payouts".to_string(), + )); + } + + let channel_campaigns = fetch_campaign_ids_for_channel( + &app.pool, + channel_context.context.id(), + app.config.campaigns_find_limit, + ) + .await?; + + let campaigns_remaining_sum = app + .campaign_remaining + .get_multiple(&channel_campaigns) + .await? + .iter() + .sum::>() + .ok_or_else(|| { + ResponseError::BadRequest("Couldn't sum remaining amount for all campaigns".to_string()) + })?; + + // A campaign is closed when its remaining == 0 + // therefore for all campaigns for a channel to be closed their total remaining sum should be 0 + if campaigns_remaining_sum > UnifiedNum::from_u64(0) { + return Err(ResponseError::FailedValidation( + "All campaigns should be closed or have no budget left".to_string(), + )); + } + + let accounting_spent = get_accounting( + app.pool.clone(), + channel_context.context.id(), + spender, + Side::Spender, + ) + .await? + .map(|accounting_spent| accounting_spent.amount) + .unwrap_or_default(); + let accounting_earned = get_accounting( + app.pool.clone(), + channel_context.context.id(), + spender, + Side::Earner, + ) + .await? + .map(|accounting_spent| accounting_spent.amount) + .unwrap_or_default(); + let latest_spendable = + fetch_spendable(app.pool.clone(), &spender, &channel_context.context.id()) + .await + .map_err(|err| ResponseError::BadRequest(err.to_string()))? + .ok_or(ResponseError::BadRequest( + "There is no spendable amount for the spender in this Channel".to_string(), + ))?; + let total_deposited = latest_spendable.deposit.total; + + let available_for_payout = total_deposited + .checked_add(&accounting_earned) + .ok_or_else(|| { + ResponseError::FailedValidation( + "Overflow while calculating available for payout".to_string(), + ) + })? + .checked_sub(&accounting_spent) + .ok_or_else(|| { + ResponseError::FailedValidation( + "Underflow while calculating available for payout".to_string(), + ) + })?; + + let total_to_pay = to_pay + .payouts + .values() + .sum::>() + .ok_or_else(|| ResponseError::FailedValidation("Payouts amount overflow".to_string()))?; + + if total_to_pay > available_for_payout { + return Err(ResponseError::FailedValidation( + "The total requested payout amount exceeds the available payout".to_string(), + )); + } + + let mut balances: Balances = Balances::new(); + for (earner, amount) in to_pay.payouts.into_iter() { + balances.spend(spender, earner, amount)?; + } + + spend_amount(app.pool.clone(), channel_context.context.id(), balances).await?; + + Ok(success_response(serde_json::to_string(&SuccessResponse { + success: true, + })?)) +} /// [`Channel`] [validator messages](primitives::validator::MessageTypes) routes /// starting with `/v5/channel/0xXXX.../validator-messages` /// @@ -536,14 +665,14 @@ pub mod validator_message { #[cfg(test)] mod test { use super::*; - use crate::db::{accounting::spend_amount, insert_channel}; + use crate::db::{insert_campaign, insert_channel}; use crate::test_util::setup_dummy_app; - use adapter::primitives::Deposit; + use crate::CampaignRemaining; use hyper::StatusCode; use primitives::{ test_util::{ADVERTISER, CREATOR, GUARDIAN, PUBLISHER, PUBLISHER_2}, test_util::{DUMMY_CAMPAIGN, IDS}, - BigNum, + BigNum, Deposit, UnifiedMap, ValidatorId, }; #[tokio::test] @@ -846,4 +975,228 @@ mod test { balances.spenders.get(&CREATOR), ); } + + #[tokio::test] + async fn payouts_for_earners_test() { + let app = setup_dummy_app().await; + let channel_context = app + .config + .find_chain_of(DUMMY_CAMPAIGN.channel.token) + .expect("Dummy channel Token should be present in config!") + .with(DUMMY_CAMPAIGN.channel); + + insert_channel(&app.pool, channel_context.context) + .await + .expect("should insert channel"); + insert_campaign(&app.pool, &DUMMY_CAMPAIGN) + .await + .expect("should insert the campaign"); + + // Setting the initial remaining to 0 + let campaign_remaining = CampaignRemaining::new(app.redis.clone()); + campaign_remaining + .set_initial(DUMMY_CAMPAIGN.id, UnifiedNum::from_u64(0)) + .await + .expect("Should set value in redis"); + + let auth = Auth { + era: 0, + uid: ValidatorId::from(DUMMY_CAMPAIGN.creator), + chain: channel_context.chain.clone(), + }; + + let build_request = |channel_context: &ChainOf, payouts: UnifiedMap| { + let payouts = ChannelPayRequest { payouts }; + + let body = Body::from(serde_json::to_string(&payouts).expect("Should serialize")); + + Request::builder() + .extension(channel_context.clone()) + .extension(auth.clone()) + .body(body) + .expect("Should build Request") + }; + let mut payouts = UnifiedMap::default(); + payouts.insert(*PUBLISHER, UnifiedNum::from_u64(500)); + + // Testing before Accounting/Spendable are inserted + { + let err_response = + channel_payout(build_request(&channel_context, payouts.clone()), &app) + .await + .expect_err("Should return an error when there is no Accounting/Spendable"); + assert_eq!( + err_response, + ResponseError::BadRequest( + "There is no spendable amount for the spender in this Channel".to_string() + ), + "Failed validation because payouts are empty" + ); + } + + // Add accounting for spender = 500 + update_accounting( + app.pool.clone(), + channel_context.context.id(), + auth.uid.to_address(), + Side::Spender, + UnifiedNum::from_u64(500), + ) + .await + .expect("should update"); + + // Add spendable for the channel_context where total deposit = 1000 + let spendable = Spendable { + spender: auth.uid.to_address(), + channel: channel_context.context, + deposit: Deposit { + total: UnifiedNum::from_u64(1000), + still_on_create2: UnifiedNum::from_u64(0), + }, + }; + + // Add accounting for earner = 100 + // available for return will be = 600 + update_accounting( + app.pool.clone(), + channel_context.context.id(), + auth.uid.to_address(), + Side::Earner, + UnifiedNum::from_u64(100), + ) + .await + .expect("should update"); + + // Updating spendable so that we have a value for total_deposited + update_spendable(app.pool.clone(), &spendable) + .await + .expect("Should update spendable"); + + // Test with no body + { + let err_response = + channel_payout(build_request(&channel_context, UnifiedMap::default()), &app) + .await + .expect_err("Should return an error when payouts are empty"); + assert_eq!( + err_response, + ResponseError::FailedValidation("Request has empty payouts".to_string()), + "Failed validation because payouts are empty" + ); + } + + // make a normal request and get accounting to see if its as expected + { + let res = channel_payout(build_request(&channel_context, payouts.clone()), &app) + .await + .expect("This request shouldn't result in an error"); + + let json = hyper::body::to_bytes(res.into_body()) + .await + .expect("Should get json"); + + let success_response: SuccessResponse = + serde_json::from_slice(&json).expect("Should deserialize SuccessResponse"); + assert_eq!( + SuccessResponse { success: true }, + success_response, + "Request with JSON body with one address and no errors triggered on purpose" + ); + } + + // Checking if Earner/Spender balances have been updated by looking up the Accounting in the database + { + let earner_accounting = get_accounting( + app.pool.clone(), + channel_context.context.id(), + *PUBLISHER, + Side::Earner, + ) + .await + .expect("should get accounting") + .expect("Should have value, i.e. Some"); + assert_eq!( + earner_accounting.amount, + UnifiedNum::from_u64(500), + "Accounting is updated to reflect the publisher's earnings" + ); + + let spender_accounting = get_accounting( + app.pool.clone(), + channel_context.context.id(), + auth.uid.to_address(), + Side::Spender, + ) + .await + .expect("should get accounting") + .expect("Should have value, i.e. Some"); + assert_eq!( + spender_accounting.amount, + UnifiedNum::from_u64(1000), // 500 initial + 500 for earners + "Accounting is updated to reflect the amount deducted from the spender" + ); + } + + // make a request where "total_to_pay" will exceed available + { + payouts.insert(*CREATOR, UnifiedNum::from_u64(1000)); + + let response_error = + channel_payout(build_request(&channel_context, payouts.clone()), &app) + .await + .expect_err("Should return an error when total_to_pay > available_for_payout"); + assert_eq!( + ResponseError::FailedValidation( + "The total requested payout amount exceeds the available payout".to_string() + ), + response_error, + "Failed validation because total_to_pay > available_for_payout" + ); + } + + // make a request where total - spent + earned will be a negative balance resulting in an error + { + update_accounting( + app.pool.clone(), + channel_context.context.id(), + auth.uid.to_address(), + Side::Spender, + UnifiedNum::from_u64(1000), + ) + .await + .expect("should update"); // total spent: 500 + 1000 + + let response_error = + channel_payout(build_request(&channel_context, payouts.clone()), &app) + .await + .expect_err("Should return err when available_for_payout is negative"); + assert_eq!( + ResponseError::FailedValidation( + "Underflow while calculating available for payout".to_string() + ), + response_error, + "Failed validation because available_for_payout is negative" + ); + } + + // make a request where campaigns will have available remaining + { + campaign_remaining + .increase_by(DUMMY_CAMPAIGN.id, UnifiedNum::from_u64(1000)) + .await + .expect("Should set value in redis"); + + let response_error = + channel_payout(build_request(&channel_context, payouts.clone()), &app) + .await + .expect_err("Should return an error when a campaign has remaining funds"); + assert_eq!( + ResponseError::FailedValidation( + "All campaigns should be closed or have no budget left".to_string() + ), + response_error, + "Failed validation because the campaign has remaining funds" + ); + } + } }