diff --git a/lib/protocol-eth b/lib/protocol-eth index 77748887d..35ee39f42 160000 --- a/lib/protocol-eth +++ b/lib/protocol-eth @@ -1 +1 @@ -Subproject commit 77748887d74bf861faf58dacad6d90b0c7224b82 +Subproject commit 35ee39f4286b0a855d2368591b9dd8ecb4072c40 diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index c53d1117a..ba07b4035 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -1,7 +1,6 @@ use crate::{ - targeting::Rules, validator::{ApproveState, Heartbeat, MessageTypes, NewState, Type as MessageType}, - Address, BalancesMap, BigNum, Channel, ChannelId, ValidatorId, IPFS, + Address, BigNum, Channel, ChannelId, ValidatorId, IPFS, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -361,6 +360,85 @@ pub mod campaign_create { } } } + + /// This implementation helps with test setup + /// **NOTE:** It erases the CampaignId, since the creation of the campaign gives it's CampaignId + impl From for CreateCampaign { + fn from(campaign: Campaign) -> Self { + Self { + channel: campaign.channel, + creator: campaign.creator, + budget: campaign.budget, + validators: campaign.validators, + title: campaign.title, + pricing_bounds: campaign.pricing_bounds, + event_submission: campaign.event_submission, + ad_units: campaign.ad_units, + targeting_rules: campaign.targeting_rules, + created: campaign.created, + active: campaign.active, + } + } + } + + // All editable fields stored in one place, used for checking when a budget is changed + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] + pub struct ModifyCampaign { + pub budget: Option, + pub validators: Option, + pub title: Option, + pub pricing_bounds: Option, + pub event_submission: Option, + pub ad_units: Option>, + pub targeting_rules: Option, + } + + impl ModifyCampaign { + pub fn from_campaign(campaign: Campaign) -> Self { + ModifyCampaign { + budget: Some(campaign.budget), + validators: Some(campaign.validators), + title: campaign.title, + pricing_bounds: campaign.pricing_bounds, + event_submission: campaign.event_submission, + ad_units: Some(campaign.ad_units), + targeting_rules: Some(campaign.targeting_rules), + } + } + + pub fn apply(self, mut campaign: Campaign) -> Campaign { + if let Some(new_budget) = self.budget { + campaign.budget = new_budget; + } + + if let Some(new_validators) = self.validators { + campaign.validators = new_validators; + } + + // check if it was passed otherwise not sending a Title will result in clearing of the current one + if let Some(new_title) = self.title { + campaign.title = Some(new_title); + } + + if let Some(new_pricing_bounds) = self.pricing_bounds { + campaign.pricing_bounds = Some(new_pricing_bounds); + } + + if let Some(new_event_submission) = self.event_submission { + campaign.event_submission = Some(new_event_submission); + } + + if let Some(new_ad_units) = self.ad_units { + campaign.ad_units = new_ad_units; + } + + if let Some(new_targeting_rules) = self.targeting_rules { + campaign.targeting_rules = new_targeting_rules; + } + + campaign + } + } } #[cfg(feature = "postgres")] diff --git a/primitives/src/sentry/accounting.rs b/primitives/src/sentry/accounting.rs index 4b9256cec..e04167eb9 100644 --- a/primitives/src/sentry/accounting.rs +++ b/primitives/src/sentry/accounting.rs @@ -68,6 +68,16 @@ impl Balances { Ok(()) } + + /// Adds the spender to the Balances with `UnifiedNum::from(0)` if he does not exist + pub fn add_spender(&mut self, spender: Address) { + self.spenders.entry(spender).or_insert(UnifiedNum::from(0)); + } + + /// Adds the earner to the Balances with `UnifiedNum::from(0)` if he does not exist + pub fn add_earner(&mut self, earner: Address) { + self.earners.entry(earner).or_insert(UnifiedNum::from(0)); + } } #[derive(Debug)] diff --git a/primitives/src/validator.rs b/primitives/src/validator.rs index 81452b5c9..72b89f7a1 100644 --- a/primitives/src/validator.rs +++ b/primitives/src/validator.rs @@ -39,6 +39,12 @@ impl From<&Address> for ValidatorId { } } +impl From
for ValidatorId { + fn from(address: Address) -> Self { + Self(address) + } +} + impl From<&[u8; 20]> for ValidatorId { fn from(bytes: &[u8; 20]) -> Self { Self(Address::from(bytes)) diff --git a/sentry/src/db.rs b/sentry/src/db.rs index 854fde42b..63578cac3 100644 --- a/sentry/src/db.rs +++ b/sentry/src/db.rs @@ -7,7 +7,7 @@ use lazy_static::lazy_static; pub mod accounting; pub mod analytics; -mod campaign; +pub mod campaign; mod channel; pub mod event_aggregate; pub mod spendable; diff --git a/sentry/src/db/accounting.rs b/sentry/src/db/accounting.rs index ffc8e3db4..94fcae16e 100644 --- a/sentry/src/db/accounting.rs +++ b/sentry/src/db/accounting.rs @@ -38,9 +38,7 @@ pub async fn get_accounting_spent( Ok(row.get("spent")) } -// TODO This is still WIP -#[allow(dead_code)] -async fn insert_accounting( +pub async fn insert_accounting( pool: DbPool, channel: Channel, balances: Balances, diff --git a/sentry/src/db/campaign.rs b/sentry/src/db/campaign.rs index 240684572..ea24ec6f7 100644 --- a/sentry/src/db/campaign.rs +++ b/sentry/src/db/campaign.rs @@ -1,14 +1,18 @@ use crate::db::{DbPool, PoolError}; -use primitives::{Campaign, CampaignId}; +use primitives::{Campaign, CampaignId, ChannelId}; use tokio_postgres::types::Json; -// TODO: Remove once we use this fn -#[allow(dead_code)] +pub use campaign_remaining::CampaignRemaining; + +/// ```text +/// INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) +/// VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) +/// ``` pub async fn insert_campaign(pool: &DbPool, campaign: &Campaign) -> Result { let client = pool.get().await?; let ad_units = Json(campaign.ad_units.clone()); - let stmt = client.prepare("INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)").await?; - let row = client + let stmt = client.prepare("INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)").await?; + let inserted = client .execute( &stmt, &[ @@ -30,7 +34,7 @@ pub async fn insert_campaign(pool: &DbPool, campaign: &Campaign) -> Result Result, PoolError> { + let client = pool.get().await?; + let statement = client.prepare("SELECT id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to FROM campaigns WHERE channel_id = $1").await?; + + let rows = client.query(&statement, &[&channel_id]).await?; + + let campaigns = rows.iter().map(Campaign::from).collect(); + + Ok(campaigns) +} + +/// ```text +/// UPDATE campaigns SET budget = $1, validators = $2, title = $3, pricing_bounds = $4, event_submission = $5, ad_units = $6, targeting_rules = $7 +/// WHERE id = $8 +/// RETURNING id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to +/// ``` +pub async fn update_campaign(pool: &DbPool, campaign: &Campaign) -> Result { + let client = pool.get().await?; + let statement = client + .prepare("UPDATE campaigns SET budget = $1, validators = $2, title = $3, pricing_bounds = $4, event_submission = $5, ad_units = $6, targeting_rules = $7 WHERE id = $8 RETURNING id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to") + .await?; + + let ad_units = Json(&campaign.ad_units); + + let updated_row = client + .query_one( + &statement, + &[ + &campaign.budget, + &campaign.validators, + &campaign.title, + &campaign.pricing_bounds, + &campaign.event_submission, + &ad_units, + &campaign.targeting_rules, + &campaign.id, + ], + ) + .await?; + + Ok(Campaign::from(&updated_row)) +} + +/// struct that handles redis calls for the Campaign Remaining Budget +mod campaign_remaining { + use crate::db::RedisError; + use primitives::{CampaignId, UnifiedNum}; + use redis::aio::MultiplexedConnection; + + #[derive(Clone)] + pub struct CampaignRemaining { + redis: MultiplexedConnection, + } + + impl CampaignRemaining { + pub const CAMPAIGN_REMAINING_KEY: &'static str = "campaignRemaining"; + + pub fn get_key(campaign: CampaignId) -> String { + format!("{}:{}", Self::CAMPAIGN_REMAINING_KEY, campaign) + } + + pub fn new(redis: MultiplexedConnection) -> Self { + Self { redis } + } + + pub async fn set_initial( + &self, + campaign: CampaignId, + amount: UnifiedNum, + ) -> Result { + redis::cmd("SETNX") + .arg(&Self::get_key(campaign)) + .arg(amount.to_u64()) + .query_async(&mut self.redis.clone()) + .await + } + + pub async fn get_remaining_opt( + &self, + campaign: CampaignId, + ) -> Result, RedisError> { + redis::cmd("GET") + .arg(&Self::get_key(campaign)) + .query_async::<_, Option>(&mut self.redis.clone()) + .await + } + + /// This method uses `max(0, value)` to clamp the value of a campaign, which can be negative and uses `i64`. + /// In addition, it defaults the campaign keys that were not found to `0`. + pub async fn get_multiple( + &self, + campaigns: &[CampaignId], + ) -> Result, RedisError> { + // `MGET` fails on empty keys + if campaigns.is_empty() { + return Ok(vec![]); + } + + let keys: Vec = campaigns + .iter() + .map(|campaign| Self::get_key(*campaign)) + .collect(); + + let campaigns_remaining = redis::cmd("MGET") + .arg(keys) + .query_async::<_, Vec>>(&mut self.redis.clone()) + .await? + .into_iter() + .map(|remaining| match remaining { + Some(remaining) => UnifiedNum::from_u64(remaining.max(0).unsigned_abs()), + None => UnifiedNum::from_u64(0), + }) + .collect(); + + Ok(campaigns_remaining) + } + + pub async fn increase_by( + &self, + campaign: CampaignId, + amount: UnifiedNum, + ) -> Result { + let key = Self::get_key(campaign); + redis::cmd("INCRBY") + .arg(&key) + .arg(amount.to_u64()) + .query_async(&mut self.redis.clone()) + .await + } + + pub async fn decrease_by( + &self, + campaign: CampaignId, + amount: UnifiedNum, + ) -> Result { + let key = Self::get_key(campaign); + redis::cmd("DECRBY") + .arg(&key) + .arg(amount.to_u64()) + .query_async(&mut self.redis.clone()) + .await + } + } + + #[cfg(test)] + mod test { + use primitives::util::tests::prep_db::DUMMY_CAMPAIGN; + + use crate::db::redis_pool::TESTS_POOL; + + use super::*; + + #[tokio::test] + async fn it_sets_initial_increases_and_decreases_remaining_for_campaign() { + let redis = TESTS_POOL.get().await.expect("Should return Object"); + + let campaign = DUMMY_CAMPAIGN.id; + let campaign_remaining = CampaignRemaining::new(redis.connection.clone()); + + // Get remaining on a key which was not set + { + let get_remaining = campaign_remaining + .get_remaining_opt(campaign) + .await + .expect("Should get None"); + + assert_eq!(None, get_remaining); + } + + // Set Initial amount on that key + { + let initial_amount = UnifiedNum::from(1_000_u64); + let set_initial = campaign_remaining + .set_initial(campaign, initial_amount) + .await + .expect("Should set value in redis"); + assert!(set_initial); + + // get the remaining of that key, should be the initial value + let get_remaining = campaign_remaining + .get_remaining_opt(campaign) + .await + .expect("Should get None"); + + assert_eq!( + Some(1_000_i64), + get_remaining, + "should return the initial value that was set" + ); + } + + // Set initial on already existing key, should return `false` + { + let set_failing_initial = campaign_remaining + .set_initial(campaign, UnifiedNum::from(999_u64)) + .await + .expect("Should set value in redis"); + assert!(!set_failing_initial); + } + + // Decrease by amount + { + let decrease_amount = UnifiedNum::from(64); + let decrease_by = campaign_remaining + .decrease_by(campaign, decrease_amount) + .await + .expect("Should decrease remaining amount"); + + assert_eq!(936_i64, decrease_by); + } + + // Increase by amount + { + let increase_amount = UnifiedNum::from(1064); + let increase_by = campaign_remaining + .increase_by(campaign, increase_amount) + .await + .expect("Should increase remaining amount"); + + assert_eq!(2_000_i64, increase_by); + } + + let get_remaining = campaign_remaining + .get_remaining_opt(campaign) + .await + .expect("Should get remaining"); + + assert_eq!(Some(2_000_i64), get_remaining); + + // Decrease by amount > than currently set + { + let decrease_amount = UnifiedNum::from(5_000); + let decrease_by = campaign_remaining + .decrease_by(campaign, decrease_amount) + .await + .expect("Should decrease remaining amount"); + + assert_eq!(-3_000_i64, decrease_by); + } + + // Increase the negative value without going > 0 + { + let increase_amount = UnifiedNum::from(1000); + let increase_by = campaign_remaining + .increase_by(campaign, increase_amount) + .await + .expect("Should increase remaining amount"); + + assert_eq!(-2_000_i64, increase_by); + } + } + + #[tokio::test] + async fn it_gets_multiple_campaigns_remaining() { + let redis = TESTS_POOL.get().await.expect("Should return Object"); + let campaign_remaining = CampaignRemaining::new(redis.connection.clone()); + + // get multiple with empty campaigns slice + // `MGET` throws error on an empty keys argument + assert!( + campaign_remaining + .get_multiple(&[]) + .await + .expect("Should get multiple") + .is_empty(), + "Should return an empty result" + ); + + let campaigns = (CampaignId::new(), CampaignId::new(), CampaignId::new()); + + // set initial amounts + { + assert!(campaign_remaining + .set_initial(campaigns.0, UnifiedNum::from(100)) + .await + .expect("Should set value in redis")); + + assert!(campaign_remaining + .set_initial(campaigns.1, UnifiedNum::from(200)) + .await + .expect("Should set value in redis")); + + assert!(campaign_remaining + .set_initial(campaigns.2, UnifiedNum::from(300)) + .await + .expect("Should set value in redis")); + } + + // set campaigns.1 to negative value, should return `0` because of `max(value, 0)` + assert_eq!( + -300_i64, + campaign_remaining + .decrease_by(campaigns.1, UnifiedNum::from(500)) + .await + .expect("Should decrease remaining") + ); + + let multiple = campaign_remaining + .get_multiple(&[campaigns.0, campaigns.1, campaigns.2]) + .await + .expect("Should get multiple"); + + assert_eq!( + vec![ + UnifiedNum::from(100), + UnifiedNum::from(0), + UnifiedNum::from(300) + ], + multiple + ); + } + } +} + #[cfg(test)] mod test { - use primitives::util::tests::prep_db::DUMMY_CAMPAIGN; + use primitives::{ + campaign, + event_submission::{RateLimit, Rule}, + sentry::campaign_create::ModifyCampaign, + targeting::Rules, + util::tests::prep_db::{DUMMY_AD_UNITS, DUMMY_CAMPAIGN}, + EventSubmission, UnifiedNum, + }; + use std::time::Duration; + use tokio_postgres::error::SqlState; use crate::db::tests_postgres::{setup_test_migrations, DATABASE_POOL}; use super::*; #[tokio::test] - async fn it_inserts_and_fetches_campaign() { + async fn it_inserts_fetches_and_updates_a_campaign() { let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); setup_test_migrations(database.pool.clone()) .await .expect("Migrations should succeed"); - let campaign_for_testing = DUMMY_CAMPAIGN.clone(); + let campaign = DUMMY_CAMPAIGN.clone(); - let non_existent_campaign = fetch_campaign(database.pool.clone(), &campaign_for_testing.id) + let non_existent_campaign = fetch_campaign(database.pool.clone(), &campaign.id) .await .expect("Should fetch successfully"); assert_eq!(None, non_existent_campaign); - let is_inserted = insert_campaign(&database.pool, &campaign_for_testing) + let is_inserted = insert_campaign(&database.pool, &campaign) .await .expect("Should succeed"); assert!(is_inserted); - let fetched_campaign = fetch_campaign(database.pool.clone(), &campaign_for_testing.id) + let is_duplicate_inserted = insert_campaign(&database.pool, &campaign).await; + + assert!(matches!( + is_duplicate_inserted, + Err(PoolError::Backend(error)) if error.code() == Some(&SqlState::UNIQUE_VIOLATION) + )); + + let fetched_campaign = fetch_campaign(database.pool.clone(), &campaign.id) .await .expect("Should fetch successfully"); - assert_eq!(Some(campaign_for_testing), fetched_campaign); + assert_eq!(Some(campaign.clone()), fetched_campaign); + + // Update campaign + { + let rule = Rule { + uids: None, + rate_limit: Some(RateLimit { + limit_type: "sid".to_string(), + time_frame: Duration::from_millis(20_000), + }), + }; + let new_budget = campaign.budget + UnifiedNum::from_u64(1_000_000_000); + let modified_campaign = ModifyCampaign { + budget: Some(new_budget), + validators: None, + title: Some("Modified Campaign".to_string()), + pricing_bounds: Some(campaign::PricingBounds { + impression: Some(campaign::Pricing { + min: 1.into(), + max: 10.into(), + }), + click: Some(campaign::Pricing { + min: 0.into(), + max: 0.into(), + }), + }), + event_submission: Some(EventSubmission { allow: vec![rule] }), + ad_units: Some(DUMMY_AD_UNITS.to_vec()), + targeting_rules: Some(Rules::new()), + }; + + let applied_campaign = modified_campaign.apply(campaign.clone()); + + let updated_campaign = update_campaign(&database.pool, &applied_campaign) + .await + .expect("should update"); + + assert_eq!( + applied_campaign, updated_campaign, + "Postgres should update all modified fields" + ); + } } } diff --git a/sentry/src/db/spendable.rs b/sentry/src/db/spendable.rs index 67dc8fcb2..a6c340c2f 100644 --- a/sentry/src/db/spendable.rs +++ b/sentry/src/db/spendable.rs @@ -37,13 +37,13 @@ pub async fn fetch_spendable( pool: DbPool, spender: &Address, channel_id: &ChannelId, -) -> Result { +) -> Result, PoolError> { let client = pool.get().await?; let statement = client.prepare("SELECT spender, channel_id, channel, total, still_on_create2 FROM spendable WHERE spender = $1 AND channel_id = $2").await?; - let row = client.query_one(&statement, &[spender, channel_id]).await?; + let row = client.query_opt(&statement, &[spender, channel_id]).await?; - Ok(Spendable::try_from(row)?) + Ok(row.map(Spendable::try_from).transpose()?) } #[cfg(test)] @@ -88,6 +88,6 @@ mod test { .await .expect("Should fetch successfully"); - assert_eq!(spendable, fetched_spendable); + assert_eq!(Some(spendable), fetched_spendable); } } diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index cb789481c..0d2126a76 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -7,6 +7,7 @@ use crate::routes::channel::channel_status; use crate::routes::event_aggregate::list_channel_event_aggregates; use crate::routes::validator_message::{extract_params, list_validator_messages}; use chrono::Utc; +use db::CampaignRemaining; use hyper::{Body, Method, Request, Response, StatusCode}; use lazy_static::lazy_static; use middleware::{ @@ -22,6 +23,7 @@ use primitives::{Config, ValidatorId}; use redis::aio::MultiplexedConnection; use regex::Regex; use routes::analytics::{advanced_analytics, advertiser_analytics, analytics, publisher_analytics}; +use routes::campaign::{create_campaign, update_campaign}; use routes::cfg::config; use routes::channel::{ channel_list, channel_validate, create_channel, create_validator_messages, last_approved, @@ -67,7 +69,10 @@ static INSERT_EVENTS_BY_CAMPAIGN_ID: Lazy = Lazy::new(|| { Regex::new(r"^/campaign/0x([a-zA-Z0-9]{32})/events/?$").expect("The regex should be valid") }); static CLOSE_CAMPAIGN_BY_CAMPAIGN_ID: Lazy = Lazy::new(|| { - Regex::new(r"^/campaign/0x([a-zA-Z0-9]{32})/close/?$").expect("The regex should be valid") + Regex::new(r"^/v5/campaign/0x([a-zA-Z0-9]{32})/close/?$").expect("The regex should be valid") +}); +static CAMPAIGN_UPDATE_BY_ID: Lazy = Lazy::new(|| { + Regex::new(r"^/v5/campaign/0x([a-zA-Z0-9]{32})/?$").expect("The regex should be valid") }); #[derive(Debug, Clone)] @@ -86,10 +91,11 @@ impl RouteParams { #[derive(Clone)] pub struct Application { pub adapter: A, + pub config: Config, pub logger: Logger, pub redis: MultiplexedConnection, pub pool: DbPool, - pub config: Config, + pub campaign_remaining: CampaignRemaining, } impl Application { @@ -99,6 +105,7 @@ impl Application { logger: Logger, redis: MultiplexedConnection, pool: DbPool, + campaign_remaining: CampaignRemaining ) -> Self { Self { adapter, @@ -106,6 +113,7 @@ impl Application { logger, redis, pool, + campaign_remaining, } } @@ -158,6 +166,17 @@ impl Application { publisher_analytics(req, &self).await } + // For creating campaigns + ("/v5/campaign", &Method::POST) => { + let req = match AuthRequired.call(req, &self).await { + Ok(req) => req, + Err(error) => { + return map_response_error(error); + } + }; + + create_campaign(req, &self).await + } (route, _) if route.starts_with("/analytics") => analytics_router(req, &self).await, // This is important because it prevents us from doing // expensive regex matching for routes without /channel @@ -179,8 +198,13 @@ async fn campaigns_router( ) -> Result, ResponseError> { let (path, method) = (req.uri().path(), req.method()); - // create events - if let (Some(caps), &Method::POST) = (INSERT_EVENTS_BY_CAMPAIGN_ID.captures(&path), method) { + if let (Some(_caps), &Method::POST) = (CAMPAIGN_UPDATE_BY_ID.captures(&path), method) { + let req = CampaignLoad.call(req, app).await?; + + update_campaign::handle_route(req, app).await + } else if let (Some(caps), &Method::POST) = + (INSERT_EVENTS_BY_CAMPAIGN_ID.captures(&path), method) + { let param = RouteParams(vec![caps .get(1) .map_or("".to_string(), |m| m.as_str().to_string())]); @@ -217,8 +241,6 @@ async fn analytics_router( ) -> Result, ResponseError> { let (route, method) = (req.uri().path(), req.method()); - - // TODO AIP#61: Add routes for: // - POST /channel/:id/pay // #[serde(rename_all = "camelCase")] @@ -295,9 +317,8 @@ async fn channels_router( req.extensions_mut().insert(param); insert_events(req, app).await - } else */ - if let (Some(caps), &Method::GET) = (LAST_APPROVED_BY_CHANNEL_ID.captures(&path), method) - { + } else */ + if let (Some(caps), &Method::GET) = (LAST_APPROVED_BY_CHANNEL_ID.captures(&path), method) { let param = RouteParams(vec![caps .get(1) .map_or("".to_string(), |m| m.as_str().to_string())]); @@ -420,7 +441,7 @@ pub fn bad_response(response_body: String, status_code: StatusCode) -> Response< let mut error_response = HashMap::new(); error_response.insert("message", response_body); - let body = Body::from(serde_json::to_string(&error_response).expect("serialise err response")); + let body = Body::from(serde_json::to_string(&error_response).expect("serialize err response")); let mut response = Response::new(body); response @@ -483,3 +504,51 @@ pub struct Auth { pub era: i64, pub uid: ValidatorId, } + +#[cfg(test)] +pub mod test_util { + use adapter::DummyAdapter; + use primitives::{ + adapter::DummyAdapterOptions, + config::configuration, + util::tests::{ + discard_logger, + prep_db::{IDS}, + }, + }; + + use crate::{Application, db::{CampaignRemaining, redis_pool::TESTS_POOL, tests_postgres::{setup_test_migrations, DATABASE_POOL}}}; + + /// Uses production configuration to setup the correct Contract addresses for tokens. + pub async fn setup_dummy_app() -> Application { + let config = configuration("production", None).expect("Should get Config"); + let adapter = DummyAdapter::init( + DummyAdapterOptions { + dummy_identity: IDS["leader"], + dummy_auth: Default::default(), + dummy_auth_tokens: Default::default(), + }, + &config, + ); + + let redis = TESTS_POOL.get().await.expect("Should return Object"); + let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); + + setup_test_migrations(database.pool.clone()) + .await + .expect("Migrations should succeed"); + + let campaign_remaining = CampaignRemaining::new(redis.connection.clone()); + + let app = Application::new( + adapter, + config, + discard_logger(), + redis.connection.clone(), + database.pool.clone(), + campaign_remaining + ); + + app + } +} diff --git a/sentry/src/main.rs b/sentry/src/main.rs index f8963f562..634831e5d 100644 --- a/sentry/src/main.rs +++ b/sentry/src/main.rs @@ -10,7 +10,7 @@ use primitives::adapter::{Adapter, DummyAdapterOptions, KeystoreOptions}; use primitives::config::configuration; use primitives::util::tests::prep_db::{AUTH, IDS}; use primitives::ValidatorId; -use sentry::db::{postgres_connection, redis_connection, setup_migrations}; +use sentry::db::{CampaignRemaining, postgres_connection, redis_connection, setup_migrations}; use sentry::Application; use slog::{error, info, Logger}; use std::{ @@ -115,18 +115,19 @@ async fn main() -> Result<(), Box> { // Check connection and setup migrations before setting up Postgres setup_migrations(&environment).await; let postgres = postgres_connection(42).await; + let campaign_remaining = CampaignRemaining::new(redis.clone()); match adapter { AdapterTypes::EthereumAdapter(adapter) => { run( - Application::new(*adapter, config, logger, redis, postgres), + Application::new(*adapter, config, logger, redis, postgres, campaign_remaining), socket_addr, ) .await } AdapterTypes::DummyAdapter(adapter) => { run( - Application::new(*adapter, config, logger, redis, postgres), + Application::new(*adapter, config, logger, redis, postgres, campaign_remaining), socket_addr, ) .await diff --git a/sentry/src/middleware/campaign.rs b/sentry/src/middleware/campaign.rs index 42d16b281..024dc1400 100644 --- a/sentry/src/middleware/campaign.rs +++ b/sentry/src/middleware/campaign.rs @@ -37,57 +37,18 @@ impl Middleware for CampaignLoad { #[cfg(test)] mod test { - use adapter::DummyAdapter; use primitives::{ - adapter::DummyAdapterOptions, - config::configuration, - util::tests::{ - discard_logger, - prep_db::{DUMMY_CAMPAIGN, IDS}, - }, + util::tests::prep_db::{DUMMY_CAMPAIGN, IDS}, Campaign, }; - use crate::db::{ - insert_campaign, - redis_pool::TESTS_POOL, - tests_postgres::{setup_test_migrations, DATABASE_POOL}, - }; + use crate::{db::insert_campaign, test_util::setup_dummy_app}; use super::*; - async fn setup_app() -> Application { - let config = configuration("development", None).expect("Should get Config"); - let adapter = DummyAdapter::init( - DummyAdapterOptions { - dummy_identity: IDS["leader"], - dummy_auth: Default::default(), - dummy_auth_tokens: Default::default(), - }, - &config, - ); - - let redis = TESTS_POOL.get().await.expect("Should return Object"); - let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); - - setup_test_migrations(database.pool.clone()) - .await - .expect("Migrations should succeed"); - - let app = Application::new( - adapter, - config, - discard_logger(), - redis.connection.clone(), - database.pool.clone(), - ); - - app - } - #[tokio::test] async fn campaign_loading() { - let app = setup_app().await; + let app = setup_dummy_app().await; let build_request = |params: RouteParams| { Request::builder() diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index 5936b8974..b3db3abe1 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -1,21 +1,63 @@ -use std::collections::HashMap; - use crate::{ access::{self, check_access}, + db::{ + accounting::get_accounting_spent, + campaign::{get_campaigns_by_channel, insert_campaign, update_campaign}, + spendable::fetch_spendable, + CampaignRemaining, DbPool, RedisError, + }, success_response, Application, Auth, ResponseError, Session, }; use chrono::Utc; +use deadpool_postgres::PoolError; use hyper::{Body, Request, Response}; use primitives::{ adapter::Adapter, - sentry::{campaign_create::CreateCampaign, Event, SuccessResponse}, - Campaign, + campaign_validator::Validator, + sentry::{ + campaign_create::{CreateCampaign, ModifyCampaign}, + Event, SuccessResponse, + }, + Address, Campaign, UnifiedNum, +}; +use slog::error; +use std::{ + cmp::{max, Ordering}, + collections::HashMap, }; +use thiserror::Error; +use tokio_postgres::error::SqlState; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Error while updating campaign: {0}")] + FailedUpdate(String), + #[error("Error while performing calculations")] + Calculation, + #[error("Error: Budget has been exceeded")] + BudgetExceeded, + #[error("Error with new budget: {0}")] + NewBudget(String), + #[error("Spendable amount for campaign creator {0} not found")] + SpenderNotFound(Address), + #[error("Campaign was not modified because of spending constraints")] + CampaignNotModified, + #[error("Redis error: {0}")] + Redis(#[from] RedisError), + #[error("DB Pool error: {0}")] + Pool(#[from] PoolError), +} pub async fn create_campaign( req: Request, app: &Application, ) -> Result, ResponseError> { + let auth = req + .extensions() + .get::() + .expect("request should have session") + .to_owned(); + let body = hyper::body::to_bytes(req.into_body()).await?; let campaign = serde_json::from_slice::(&body) @@ -23,34 +65,297 @@ pub async fn create_campaign( // create the actual `Campaign` with random `CampaignId` .into_campaign(); - // TODO AIP#61: Validate Campaign + campaign + .validate(&app.config, &app.adapter.whoami()) + .map_err(|err| ResponseError::FailedValidation(err.to_string()))?; + + if auth.uid.to_address() != campaign.creator { + return Err(ResponseError::Forbidden( + "Request not sent by campaign creator".to_string(), + )); + } - let error_response = ResponseError::BadRequest("err occurred; please try again later".into()); + let error_response = + ResponseError::BadRequest("err occurred; please try again later".to_string()); - // insert Campaign + let total_remaining = + { + let accounting_spent = + get_accounting_spent(app.pool.clone(), &campaign.creator, &campaign.channel.id()) + .await?; + + let latest_spendable = + fetch_spendable(app.pool.clone(), &campaign.creator, &campaign.channel.id()) + .await? + .ok_or(ResponseError::BadRequest( + "No spendable amount found for the Campaign creator".to_string(), + ))?; + // Gets the latest Spendable for this (spender, channelId) pair + let total_deposited = latest_spendable.deposit.total; + + total_deposited.checked_sub(&accounting_spent).ok_or( + ResponseError::FailedValidation("No more budget remaining".to_string()), + )? + }; + + let channel_campaigns = get_campaigns_by_channel(&app.pool, &campaign.channel.id()) + .await? + .iter() + .map(|c| c.id) + .collect::>(); + + let campaigns_remaining_sum = app + .campaign_remaining + .get_multiple(&channel_campaigns) + .await? + .iter() + .sum::>() + .ok_or(Error::Calculation)? + // DO NOT FORGET to add the Campaign being created right now! + .checked_add(&campaign.budget) + .ok_or(Error::Calculation)?; - // match insert_campaign(&app.pool, &campaign).await { - // Err(error) => { - // error!(&app.logger, "{}", &error; "module" => "create_channel"); - - // match error { - // PoolError::Backend(error) if error.code() == Some(&SqlState::UNIQUE_VIOLATION) => { - // Err(ResponseError::Conflict( - // "channel already exists".to_string(), - // )) - // } - // _ => Err(error_response), - // } - // } - // Ok(false) => Err(error_response), - // _ => Ok(()), - // }?; - - let create_response = SuccessResponse { success: true }; + if !(campaigns_remaining_sum <= total_remaining) || campaign.budget > total_remaining { + return Err(ResponseError::BadRequest( + "Not enough deposit left for the new campaign's budget".to_string(), + )); + } + + // If the campaign is being created, the amount spent is 0, therefore remaining = budget + let remaining_set = CampaignRemaining::new(app.redis.clone()) + .set_initial(campaign.id, campaign.budget) + .await + .map_err(|_| { + ResponseError::BadRequest("Couldn't set remaining while creating campaign".to_string()) + })?; + + // If for some reason the randomly generated `CampaignId` exists in Redis + // This should **NOT** happen! + if !remaining_set { + return Err(ResponseError::Conflict( + "The generated CampaignId already exists, please repeat the request".to_string(), + )); + } + + // insert Campaign + match insert_campaign(&app.pool, &campaign).await { + Err(error) => { + error!(&app.logger, "{}", &error; "module" => "create_campaign"); + match error { + PoolError::Backend(error) if error.code() == Some(&SqlState::UNIQUE_VIOLATION) => { + Err(ResponseError::Conflict( + "Campaign already exists".to_string(), + )) + } + _ => Err(error_response), + } + } + Ok(false) => Err(ResponseError::BadRequest( + "Encountered error while creating Campaign; please try again".to_string(), + )), + _ => Ok(()), + }?; Ok(success_response(serde_json::to_string(&campaign)?)) } +pub mod update_campaign { + use crate::db::CampaignRemaining; + + use super::*; + + pub async fn handle_route( + req: Request, + app: &Application, + ) -> Result, ResponseError> { + let campaign_being_mutated = req + .extensions() + .get::() + .expect("We must have a campaign in extensions") + .to_owned(); + + let body = hyper::body::to_bytes(req.into_body()).await?; + + let modify_campaign_fields = serde_json::from_slice::(&body) + .map_err(|e| ResponseError::FailedValidation(e.to_string()))?; + + // modify Campaign + let modified_campaign = modify_campaign( + &app.pool, + &app.campaign_remaining, + campaign_being_mutated, + modify_campaign_fields, + ) + .await + .map_err(|err| ResponseError::BadRequest(err.to_string()))?; + + Ok(success_response(serde_json::to_string(&modified_campaign)?)) + } + + pub async fn modify_campaign( + pool: &DbPool, + campaign_remaining: &CampaignRemaining, + campaign: Campaign, + modify_campaign: ModifyCampaign, + ) -> Result { + // *NOTE*: When updating campaigns make sure sum(campaigns.map(getRemaining)) <= totalDepoisted - totalspent + // !WARNING!: totalSpent != sum(campaign.map(c => c.spending)) therefore we must always calculate remaining funds based on total_deposit - lastApprovedNewState.spenders[user] + // *NOTE*: To close a campaign set campaignBudget to campaignSpent so that spendable == 0 + + let delta_budget = if let Some(new_budget) = modify_campaign.budget { + get_delta_budget(campaign_remaining, &campaign, new_budget).await? + } else { + None + }; + + // if we are going to update the budget + // validate the totalDeposit - totalSpent for all campaign + // sum(AllChannelCampaigns.map(getRemaining)) + DeltaBudgetForMutatedCampaign <= totalDeposited - totalSpent + // sum(AllChannelCampaigns.map(getRemaining)) - DeltaBudgetForMutatedCampaign <= totalDeposited - totalSpent + if let Some(delta_budget) = delta_budget { + let accounting_spent = + get_accounting_spent(pool.clone(), &campaign.creator, &campaign.channel.id()) + .await?; + + let latest_spendable = + fetch_spendable(pool.clone(), &campaign.creator, &campaign.channel.id()) + .await? + .ok_or(Error::SpenderNotFound(campaign.creator))?; + + // Gets the latest Spendable for this (spender, channelId) pair + let total_deposited = latest_spendable.deposit.total; + + let total_remaining = total_deposited + .checked_sub(&accounting_spent) + .ok_or(Error::Calculation)?; + let channel_campaigns = get_campaigns_by_channel(&pool, &campaign.channel.id()) + .await? + .iter() + .map(|c| c.id) + .collect::>(); + + // this will include the Campaign we are currently modifying + let campaigns_current_remaining_sum = campaign_remaining + .get_multiple(&channel_campaigns) + .await? + .iter() + .sum::>() + .ok_or(Error::Calculation)?; + + // apply the delta_budget to the sum + let new_campaigns_remaining = match delta_budget { + DeltaBudget::Increase(increase_by) => { + campaigns_current_remaining_sum.checked_add(&increase_by) + } + DeltaBudget::Decrease(decrease_by) => { + campaigns_current_remaining_sum.checked_sub(&decrease_by) + } + } + .ok_or(Error::Calculation)?; + + if !(new_campaigns_remaining <= total_remaining) { + return Err(Error::NewBudget( + "Not enough deposit left for the campaign's new budget".to_string(), + )); + } + + // there is a chance that the new remaining will be negative even when increasing the budget + // We don't currently use this value but can be used to perform additional checks or return messages accordingly + let _campaign_remaining = match delta_budget { + DeltaBudget::Increase(increase_by) => { + campaign_remaining + .increase_by(campaign.id, increase_by) + .await? + } + DeltaBudget::Decrease(decrease_by) => { + campaign_remaining + .decrease_by(campaign.id, decrease_by) + .await? + } + }; + } + + let modified_campaign = modify_campaign.apply(campaign); + update_campaign(&pool, &modified_campaign).await?; + + Ok(modified_campaign) + } + + /// Delta Budget describes the difference between the New and Old budget + /// It is used to decrease or increase the remaining budget instead of setting it up directly + /// This way if a new event alters the remaining budget in Redis while the modification of campaign hasn't finished + /// it will correctly update the remaining using an atomic redis operation with `INCRBY` or `DECRBY` instead of using `SET` + enum DeltaBudget { + Increase(T), + Decrease(T), + } + + async fn get_delta_budget( + campaign_remaining: &CampaignRemaining, + campaign: &Campaign, + new_budget: UnifiedNum, + ) -> Result>, Error> { + let current_budget = campaign.budget; + + let budget_action = match new_budget.cmp(¤t_budget) { + // if there is no difference in budgets - no action needed + Ordering::Equal => return Ok(None), + Ordering::Greater => DeltaBudget::Increase(()), + Ordering::Less => DeltaBudget::Decrease(()), + }; + + let old_remaining = campaign_remaining + .get_remaining_opt(campaign.id) + .await? + .map(|remaining| UnifiedNum::from(max(0, remaining).unsigned_abs())) + .ok_or(Error::FailedUpdate( + "No remaining entry for campaign".to_string(), + ))?; + + let campaign_spent = campaign + .budget + .checked_sub(&old_remaining) + .ok_or(Error::Calculation)?; + + if campaign_spent >= new_budget { + return Err(Error::NewBudget( + "New budget should be greater than the spent amount".to_string(), + )); + } + + let budget = match budget_action { + DeltaBudget::Increase(()) => { + // delta budget = New budget - Old budget ( the difference between the new and old when New > Old) + let new_remaining = new_budget + .checked_sub(¤t_budget) + .and_then(|delta_budget| old_remaining.checked_add(&delta_budget)) + .ok_or(Error::Calculation)?; + // new remaining > old remaining + let increase_by = new_remaining + .checked_sub(&old_remaining) + .ok_or(Error::Calculation)?; + + DeltaBudget::Increase(increase_by) + } + DeltaBudget::Decrease(()) => { + // delta budget = Old budget - New budget ( the difference between the new and old when New < Old) + let new_remaining = ¤t_budget + .checked_sub(&new_budget) + .and_then(|delta_budget| old_remaining.checked_sub(&delta_budget)) + .ok_or(Error::Calculation)?; + // old remaining > new remaining + let decrease_by = old_remaining + .checked_sub(&new_remaining) + .ok_or(Error::Calculation)?; + + DeltaBudget::Decrease(decrease_by) + } + }; + + Ok(Some(budget)) + } +} + pub async fn insert_events( req: Request, app: &Application, @@ -123,3 +428,229 @@ async fn process_events( Ok(true) } + +#[cfg(test)] +mod test { + use super::{update_campaign::modify_campaign, *}; + use crate::{ + db::{accounting::insert_accounting, spendable::insert_spendable}, + test_util::setup_dummy_app, + }; + use hyper::StatusCode; + use primitives::{ + sentry::accounting::{Balances, CheckedState}, + spender::{Deposit, Spendable}, + util::tests::prep_db::DUMMY_CAMPAIGN, + ValidatorId, + }; + + #[tokio::test] + /// Test single campaign creation and modification + // & + /// Test with multiple campaigns (because of Budget) a modification of campaign + async fn create_and_modify_with_multiple_campaigns() { + let app = setup_dummy_app().await; + + let build_request = |create_campaign: CreateCampaign| -> Request { + let auth = Auth { + era: 0, + uid: ValidatorId::from(create_campaign.creator), + }; + + let body = + Body::from(serde_json::to_string(&create_campaign).expect("Should serialize")); + + Request::builder() + .extension(auth) + .body(body) + .expect("Should build Request") + }; + + let campaign: Campaign = { + // erases the CampaignId for the CreateCampaign request + let mut create = CreateCampaign::from(DUMMY_CAMPAIGN.clone()); + // 500.00000000 + create.budget = UnifiedNum::from(50_000_000_000); + + let spendable = Spendable { + spender: create.creator, + channel: create.channel.clone(), + deposit: Deposit { + // a deposit equal to double the Campaign Budget + total: UnifiedNum::from(200_000_000_000), + still_on_create2: UnifiedNum::from(0), + }, + }; + assert!(insert_spendable(app.pool.clone(), &spendable) + .await + .expect("Should insert Spendable for Campaign creator")); + + let mut balances = Balances::::default(); + balances.add_spender(create.creator); + + // TODO: Replace this once https://github.com/AdExNetwork/adex-validator-stack-rust/pull/413 is merged + let _accounting = insert_accounting(app.pool.clone(), create.channel.clone(), balances) + .await + .expect("Should create Accounting"); + + let create_response = create_campaign(build_request(create), &app) + .await + .expect("Should create campaign"); + + assert_eq!(StatusCode::OK, create_response.status()); + let json = hyper::body::to_bytes(create_response.into_body()) + .await + .expect("Should get json"); + + let campaign: Campaign = + serde_json::from_slice(&json).expect("Should get new Campaign"); + + assert_ne!(DUMMY_CAMPAIGN.id, campaign.id); + + let campaign_remaining = CampaignRemaining::new(app.redis.clone()); + + let remaining = campaign_remaining + .get_remaining_opt(campaign.id) + .await + .expect("Should get remaining from redis") + .expect("There should be value for the Campaign"); + + assert_eq!( + UnifiedNum::from(50_000_000_000), + UnifiedNum::from(remaining.unsigned_abs()) + ); + campaign + }; + + // modify campaign + let modified = { + // 1000.00000000 + let new_budget = UnifiedNum::from(100_000_000_000); + let modify = ModifyCampaign { + budget: Some(new_budget.clone()), + validators: None, + title: Some("Updated title".to_string()), + pricing_bounds: None, + event_submission: None, + ad_units: None, + targeting_rules: None, + }; + + let modified_campaign = + modify_campaign(&app.pool, &app.campaign_remaining, campaign.clone(), modify) + .await + .expect("Should modify campaign"); + + assert_eq!(new_budget, modified_campaign.budget); + assert_eq!(Some("Updated title".to_string()), modified_campaign.title); + + modified_campaign + }; + + // we have 1000 left from our deposit, so we are using half of it + let _second_campaign = { + // erases the CampaignId for the CreateCampaign request + let mut create_second = CreateCampaign::from(DUMMY_CAMPAIGN.clone()); + // 500.00000000 + create_second.budget = UnifiedNum::from(50_000_000_000); + + let create_response = create_campaign(build_request(create_second), &app) + .await + .expect("Should create campaign"); + + assert_eq!(StatusCode::OK, create_response.status()); + let json = hyper::body::to_bytes(create_response.into_body()) + .await + .expect("Should get json"); + + let second_campaign: Campaign = + serde_json::from_slice(&json).expect("Should get new Campaign"); + + second_campaign + }; + + // No budget left for new campaigns + // remaining: 500 + // new campaign budget: 600 + { + // erases the CampaignId for the CreateCampaign request + let mut create = CreateCampaign::from(DUMMY_CAMPAIGN.clone()); + // 600.00000000 + create.budget = UnifiedNum::from(60_000_000_000); + + let create_err = create_campaign(build_request(create), &app) + .await + .expect_err("Should return Error response"); + + assert_eq!(ResponseError::BadRequest("Not enough deposit left for the new campaign's budget".to_string()), create_err); + } + + // modify first campaign, by lowering the budget from 1000 to 900 + let modified = { + let lower_budget = UnifiedNum::from(90_000_000_000); + let modify = ModifyCampaign { + budget: Some(lower_budget.clone()), + validators: None, + title: None, + pricing_bounds: None, + event_submission: None, + ad_units: None, + targeting_rules: None, + }; + + let modified_campaign = + modify_campaign(&app.pool, &app.campaign_remaining, modified, modify) + .await + .expect("Should modify campaign"); + + assert_eq!(lower_budget, modified_campaign.budget); + + modified_campaign + }; + + // Just enough budget to create this Campaign + // remaining: 600 + // new campaign budget: 600 + { + // erases the CampaignId for the CreateCampaign request + let mut create = CreateCampaign::from(DUMMY_CAMPAIGN.clone()); + // 600.00000000 + create.budget = UnifiedNum::from(60_000_000_000); + + let create_response = create_campaign(build_request(create), &app) + .await + .expect("Should return create campaign"); + + let json = hyper::body::to_bytes(create_response.into_body()) + .await + .expect("Should get json"); + + let _campaign: Campaign = + serde_json::from_slice(&json).expect("Should get new Campaign"); + } + + // Modify a campaign without enough budget + // remaining: 0 + // new campaign budget: 1100 + // current campaign budget: 900 + { + let new_budget = UnifiedNum::from(110_000_000_000); + let modify = ModifyCampaign { + budget: Some(new_budget), + validators: None, + title: None, + pricing_bounds: None, + event_submission: None, + ad_units: None, + targeting_rules: None, + }; + + let modify_err = + modify_campaign(&app.pool, &app.campaign_remaining, modified, modify) + .await + .expect_err("Should return Error response"); + + assert!(matches!(modify_err, Error::NewBudget(string) if string == "Not enough deposit left for the campaign's new budget")); + } + } +} diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index 304bd3432..ca337b6f7 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -3,7 +3,7 @@ use crate::db::{ get_channel_by_id, insert_channel, insert_validator_messages, list_channels, update_exhausted_channel, PoolError, }; -use crate::{success_response, Application, Auth, ResponseError, RouteParams, Session}; +use crate::{success_response, Application, Auth, ResponseError, RouteParams}; use futures::future::try_join_all; use hex::FromHex; use hyper::{Body, Request, Response}; @@ -11,7 +11,7 @@ use primitives::{ adapter::Adapter, sentry::{ channel_list::{ChannelListQuery, LastApprovedQuery}, - Event, LastApproved, LastApprovedResponse, SuccessResponse, + LastApproved, LastApprovedResponse, SuccessResponse, }, validator::MessageTypes, Channel, ChannelId,