From 39443d9db3c2a2eb19b286672410ef45ead9e7d4 Mon Sep 17 00:00:00 2001 From: simzzz Date: Tue, 27 Jul 2021 14:24:30 +0300 Subject: [PATCH 1/9] created new route for all spenders --- sentry/src/lib.rs | 15 +++++++++++++-- sentry/src/routes/channel.rs | 30 ++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 502567a96..a8d33ce4c 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -63,6 +63,7 @@ lazy_static! { static ref PUBLISHER_ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/for-publisher/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid"); static ref CREATE_EVENTS_BY_CHANNEL_ID: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/events/?$").expect("The regex should be valid"); static ref CHANNEL_SPENDER_LEAF_AND_TOTAL_DEPOSITED: Regex = Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/spender/0x([a-zA-Z0-9]{40})/?$").expect("This regex should be valid"); + static ref CHANNEL_ALL_SPENDER_LIMITS: Regex = Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/spender/all/?$").expect("This regex should be valid"); } static INSERT_EVENTS_BY_CAMPAIGN_ID: Lazy = Lazy::new(|| { @@ -392,8 +393,18 @@ async fn channels_router( req = ChannelLoad.call(req, app).await?; get_total_deposited_and_spender_leaf(req, app).await - } - else { + } else if let(Some(caps), &Method::GET) = (CHANNEL_ALL_SPENDER_LIMITS.captures(&path), method) { + req = AuthRequired.call(req, app).await?; + + let param = let param = RouteParams(vec![caps + .get(1) + .map_or("".to_string(), |m| m.as_str().to_string())]); + req.extensions_mut().insert(param); + + req = ChannelLoad.call(req, app).await?; + + get_all_spender_limits(req, app).await + } else { Err(ResponseError::NotFound) } } diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index 8814c06e3..190973392 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -319,6 +319,36 @@ pub async fn get_total_deposited_and_spender_leaf( }; Ok(success_response(serde_json::to_string(&res)?)) } +pub async fn get_all_spender_limits( + req: Request, + app: &Application, +) -> Result, ResponseError> { + let route_params = req + .extensions() + .get::() + .expect("request should have route params"); + + let channel = req + .extensions() + .get::() + .expect("Request should have Channel") + .to_owned(); + + let spender = Address::from_str(&route_params.index(0))?; + + let default_response = Response::builder() + .header("Content-type", "application/json") + .body( + serde_json::to_string(&LastApprovedResponse { + last_approved: None, + heartbeats: None, + })? + .into(), + ) + .expect("should build response"); + + Ok(success_response(serde_json::to_string(&res)?)) +} #[cfg(test)] mod test { From 7a54cbbca40ea4dab5ebb140b1197e74cd5ca255 Mon Sep 17 00:00:00 2001 From: simzzz Date: Mon, 2 Aug 2021 18:14:13 +0300 Subject: [PATCH 2/9] finished with the route --- primitives/src/sentry.rs | 5 ++ sentry/src/db/accounting.rs | 14 ---- sentry/src/lib.rs | 8 ++- sentry/src/routes/channel.rs | 122 +++++++++++++++++++++-------------- 4 files changed, 84 insertions(+), 65 deletions(-) diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index 1b5ac1487..bc93a6569 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -203,6 +203,11 @@ pub struct SpenderResponse { pub spender_leaf: Option, } +#[derive(Serialize, Deserialize, Debug)] +pub struct AllSpendersResponse { + pub spenders: Option>, +} + // TODO: Maybe there is a better place for this struct #[derive(Serialize, Deserialize, Debug)] pub struct SpenderLeaf { diff --git a/sentry/src/db/accounting.rs b/sentry/src/db/accounting.rs index d3ba6b815..ffc8e3db4 100644 --- a/sentry/src/db/accounting.rs +++ b/sentry/src/db/accounting.rs @@ -2,7 +2,6 @@ use std::convert::TryFrom; use chrono::{DateTime, Utc}; use primitives::{ - balances_map::UnifiedMap, channel_v5::Channel, sentry::accounting::{Accounting, Balances, CheckedState}, Address, ChannelId, UnifiedNum, @@ -65,19 +64,6 @@ async fn insert_accounting( Accounting::try_from(&row).map_err(Error::Balances) } -async fn get_spenders_for_channel(pool: DbPool, channel_id: &ChannelId) { - let client = pool.get().await?; - - let statement = client - .prepare("SELECT spenders FROM accounting WHERE channel_id = $1") - .await?; - - let rows = client.query(&statement, &[channel_id]).await?; - let spenders: UnifiedMap = serde_json::from_value(rows)?; - - Ok(spenders) -} - #[cfg(test)] mod test { use primitives::util::tests::prep_db::{ADDRESSES, DUMMY_CAMPAIGN}; diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 004855b27..9d8be12b7 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -25,7 +25,8 @@ use routes::analytics::{advanced_analytics, advertiser_analytics, analytics, pub use routes::campaign::{create_campaign, update_campaign}; use routes::cfg::config; use routes::channel::{ - channel_list, channel_validate, create_channel, create_validator_messages, last_approved, get_spender_limits, get_all_spender_limits + channel_list, channel_validate, create_channel, create_validator_messages, + get_all_spender_limits, get_spender_limits, last_approved, }; use slog::Logger; use std::collections::HashMap; @@ -398,10 +399,11 @@ async fn channels_router( req = ChannelLoad.call(req, app).await?; get_spender_limits(req, app).await - } else if let(Some(caps), &Method::GET) = (CHANNEL_ALL_SPENDER_LIMITS.captures(&path), method) { + } else if let (Some(caps), &Method::GET) = (CHANNEL_ALL_SPENDER_LIMITS.captures(&path), method) + { req = AuthRequired.call(req, app).await?; - let param = let param = RouteParams(vec![caps + let param = RouteParams(vec![caps .get(1) .map_or("".to_string(), |m| m.as_str().to_string())]); req.extensions_mut().insert(param); diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index 7ca42c73d..f1a7bbe84 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -12,12 +12,12 @@ use primitives::{ adapter::Adapter, sentry::{ channel_list::{ChannelListQuery, LastApprovedQuery}, - LastApproved, LastApprovedResponse, SuccessResponse, SpenderLeaf, SpenderResponse + LastApproved, LastApprovedResponse, MessageResponse, SuccessResponse, SpenderLeaf, SpenderResponse, AllSpendersResponse }, spender::{Spendable, Deposit}, - validator::MessageTypes, + validator::{MessageTypes, NewState}, channel_v5::Channel as ChannelV5, - Address, Channel, ChannelId, UnifiedNum + Address, BigNum, Channel, ChannelId, UnifiedNum }; use slog::error; use std::{ @@ -273,25 +273,7 @@ pub async fn get_spender_limits( let channel_id = ChannelId::from_hex(route_params.index(0))?; let spender = Address::from_str(&route_params.index(1))?; - let default_response = Response::builder() - .header("Content-type", "application/json") - .body( - serde_json::to_string(&SpenderResponse { - total_deposited: None, - spender_leaf: None, - })? - .into(), - ) - .expect("should build response"); - - let approve_state = match latest_approve_state_v5(&app.pool, &channel).await? { - Some(approve_state) => approve_state, - None => return Ok(default_response), - }; - - let state_root = approve_state.msg.state_root.clone(); - - let new_state = latest_new_state_v5(&app.pool, &channel, &state_root).await?.ok_or(ResponseError::BadRequest("non-existing NewState".to_string()))?; + let new_state = get_corresponding_new_state(&app.pool, &channel).await.ok_or_else(|| ResponseError::NotFound)?; let latest_spendable = fetch_spendable(app.pool.clone(), &spender, &channel_id) .await?; @@ -306,54 +288,98 @@ pub async fn get_spender_limits( // Calculate total_spent let spender_balance = new_state.msg.balances.get(&spender).ok_or_else(|| ResponseError::BadRequest("No balance for this spender".to_string()))?; - let spender_balance = UnifiedNum::from_u64(spender_balance.to_u64().expect("Consider replacing this in a way that uses UnifiedMap")); - let total_spent = total_deposited.checked_sub(&spender_balance).ok_or_else(|| ResponseError::BadRequest("Total spent is too large".to_string()))?; - - let spender_leaf = SpenderLeaf { - total_spent, - // merkle_proof: [u8; 32], // TODO - }; + // TODO: Maybe throw error if None? + let spender_leaf = get_spender_leaf_for_spender(spender_balance, &total_deposited); // Return let res = SpenderResponse { total_deposited: Some(total_deposited), - spender_leaf: Some(spender_leaf), + spender_leaf, }; Ok(success_response(serde_json::to_string(&res)?)) } + pub async fn get_all_spender_limits( req: Request, app: &Application, ) -> Result, ResponseError> { - let route_params = req - .extensions() - .get::() - .expect("request should have route params"); - let channel = req .extensions() .get::() .expect("Request should have Channel") .to_owned(); - // Get all spenders for this channel + let new_state = get_corresponding_new_state(&app.pool, &channel).await.ok_or_else(|| ResponseError::NotFound)?; + + let mut all_spender_limits: HashMap = HashMap::new(); + + // Using for loop to avoid async closures + for (spender, balance) in new_state.msg.balances.iter() { + let latest_spendable = fetch_spendable(app.pool.clone(), &spender, &channel.id()).await.expect("todo: fix"); + + let latest_spendable = match latest_spendable { + Some(spendable) => spendable, + None => create_spendable_document(&app.adapter, app.pool.clone(), &channel, &spender).await.expect("todo"), + }; + + let total_deposited = latest_spendable.deposit.total.checked_add(&latest_spendable.deposit.still_on_create2).ok_or_else(|| ResponseError::BadRequest("Total Deposited is too large".to_string())).expect("todo"); + + // TODO: Maybe throw error if None? + let spender_leaf = get_spender_leaf_for_spender(balance, &total_deposited); + + let spender_response = SpenderResponse { + total_deposited: Some(total_deposited), + spender_leaf, + }; - // Calculate spender limits for every spender + all_spender_limits.insert(spender.clone(), spender_response); + } // Format and return output - let default_response = Response::builder() - .header("Content-type", "application/json") - .body( - serde_json::to_string(&LastApprovedResponse { - last_approved: None, - heartbeats: None, - })? - .into(), - ) - .expect("should build response"); + let res = AllSpendersResponse { + spenders: Some(all_spender_limits), + }; + + Ok(success_response(serde_json::to_string(&res)?)) +} + +async fn get_corresponding_new_state(pool: &DbPool, channel: &ChannelV5) -> Option> { + // Get all spenders for this channel + let approve_state = match latest_approve_state_v5(&pool, &channel).await { + Ok(Some(approve_state)) => approve_state, + _ => return None, + }; + + let state_root = approve_state.msg.state_root.clone(); + + let new_state = match latest_new_state_v5(&pool, &channel, &state_root).await { + Ok(Some(new_state)) => Some(new_state), + _ => None, + }; + + new_state +} + +// TODO pass UnifiedNum instead +fn get_spender_leaf_for_spender(spender_balance: &BigNum, total_deposited: &UnifiedNum) -> Option { + let spender_balance = match spender_balance.to_u64() { + Some(balance) => UnifiedNum::from_u64(balance), + None => return None, + }; + + let total_spent = match total_deposited.checked_sub(&spender_balance) { + Some(spent) => spent, + None => return None, + }; + + // Return + let leaf = SpenderLeaf { + total_spent, + // merkle_proof: [u8; 32], // TODO + }; - Ok(success_response(serde_json::to_string(&default_response)?)) + Some(leaf) } #[cfg(test)] From 10d8e41fc255eaefc54d30d072f4159b9ad10330 Mon Sep 17 00:00:00 2001 From: simzzz Date: Fri, 20 Aug 2021 17:05:45 +0300 Subject: [PATCH 3/9] merged branch for #391, fixed compiler errors and refactored --- primitives/src/sentry.rs | 6 ++ sentry/src/routes/channel.rs | 134 +++++++++++++++++------------------ 2 files changed, 73 insertions(+), 67 deletions(-) diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index 4de6531fc..8e724c107 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -204,6 +204,12 @@ pub struct SpenderResponse { pub spender: Spender, } +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AllSpendersResponse { + pub spenders: HashMap, +} + #[derive(Serialize, Deserialize, Debug)] pub struct ValidatorMessage { pub from: ValidatorId, diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index cedbfbd85..3bd01dc4e 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -14,13 +14,14 @@ use hyper::{Body, Request, Response}; use primitives::{ adapter::Adapter, channel_v5::Channel as ChannelV5, - config::TokenInfo, + config::{Config, TokenInfo}, sentry::{ channel_list::{ChannelListQuery, LastApprovedQuery}, - LastApproved, LastApprovedResponse, SpenderResponse, SuccessResponse, + AllSpendersResponse, LastApproved, LastApprovedResponse, MessageResponse, SpenderResponse, + SuccessResponse, }, spender::{Deposit, Spendable, Spender, SpenderLeaf}, - validator::MessageTypes, + validator::{MessageTypes, NewState}, Address, Channel, ChannelId, UnifiedNum, }; use slog::error; @@ -298,38 +299,18 @@ pub async fn get_spender_limits( .expect("Request should have Channel") .to_owned(); - let channel_id = channel.id(); let spender = Address::from_str(&route_params.index(1))?; - let latest_spendable = fetch_spendable(app.pool.clone(), &spender, &channel_id).await?; - let token_info = app - .config - .token_address_whitelist - .get(&channel.token) - .ok_or_else(|| ResponseError::FailedValidation("Unsupported Channel Token".to_string()))?; - - let latest_spendable = match latest_spendable { - Some(spendable) => spendable, - None => { - create_spendable_document( - &app.adapter, - token_info, - app.pool.clone(), - &channel, - spender, - ) - .await? - } - }; - - let approve_state = match latest_approve_state_v5(&app.pool, &channel).await? { - Some(approve_state) => approve_state, - None => return spender_response_without_leaf(latest_spendable.deposit.total), - }; - - let state_root = approve_state.msg.state_root.clone(); + let latest_spendable = get_latest_spendable( + app.pool.clone(), + &app.config, + &app.adapter, + &spender, + &channel, + ) + .await?; - let new_state = match latest_new_state_v5(&app.pool, &channel, &state_root).await? { + let new_state = match get_corresponding_new_state(&app.pool, &channel).await { Some(new_state) => new_state, None => return spender_response_without_leaf(latest_spendable.deposit.total), }; @@ -351,6 +332,29 @@ pub async fn get_spender_limits( Ok(success_response(serde_json::to_string(&res)?)) } +async fn get_latest_spendable( + pool: DbPool, + config: &Config, + adapter: &impl Adapter, + spender: &Address, + channel: &ChannelV5, +) -> Result { + let latest_spendable = fetch_spendable(pool.clone(), spender, &channel.id()).await?; + let token_info = config + .token_address_whitelist + .get(&channel.token) + .ok_or_else(|| ResponseError::FailedValidation("Unsupported Channel Token".to_string()))?; + + let latest_spendable = match latest_spendable { + Some(spendable) => spendable, + None => { + create_spendable_document(adapter, token_info, pool.clone(), channel, *spender).await? + } + }; + + Ok(latest_spendable) +} + pub async fn get_all_spender_limits( req: Request, app: &Application, @@ -361,66 +365,62 @@ pub async fn get_all_spender_limits( .expect("Request should have Channel") .to_owned(); - let new_state = get_corresponding_new_state(&app.pool, &channel).await.ok_or_else(|| ResponseError::NotFound)?; + let new_state = get_corresponding_new_state(&app.pool, &channel) + .await + .ok_or(ResponseError::NotFound)?; - let mut all_spender_limits: HashMap = HashMap::new(); + let mut all_spender_limits: HashMap = HashMap::new(); // Using for loop to avoid async closures - for (spender, balance) in new_state.msg.balances.iter() { - let latest_spendable = fetch_spendable(app.pool.clone(), &spender, &channel.id()).await.expect("todo: fix"); - - let latest_spendable = match latest_spendable { - Some(spendable) => spendable, - None => create_spendable_document(&app.adapter, app.pool.clone(), &channel, &spender).await.expect("todo"), - }; - - let total_deposited = latest_spendable.deposit.total.checked_add(&latest_spendable.deposit.still_on_create2).ok_or_else(|| ResponseError::BadRequest("Total Deposited is too large".to_string())).expect("todo"); + for (spender_addr, balance) in new_state.msg.balances.spenders.iter() { + let latest_spendable = get_latest_spendable( + app.pool.clone(), + &app.config, + &app.adapter, + spender_addr, + &channel, + ) + .await?; - // TODO: Maybe throw error if None? + let total_deposited = latest_spendable.deposit.total; let spender_leaf = get_spender_leaf_for_spender(balance, &total_deposited); - let spender_response = SpenderResponse { - total_deposited: Some(total_deposited), + let spender_info = Spender { + total_deposited, spender_leaf, }; - all_spender_limits.insert(spender.clone(), spender_response); + all_spender_limits.insert(*spender_addr, spender_info); } - // Format and return output - let res = AllSpendersResponse { - spenders: Some(all_spender_limits), + spenders: all_spender_limits, }; Ok(success_response(serde_json::to_string(&res)?)) } -async fn get_corresponding_new_state(pool: &DbPool, channel: &ChannelV5) -> Option> { - // Get all spenders for this channel - let approve_state = match latest_approve_state_v5(&pool, &channel).await { - Ok(Some(approve_state)) => approve_state, - _ => return None, +async fn get_corresponding_new_state( + pool: &DbPool, + channel: &ChannelV5, +) -> Option> { + let approve_state = match latest_approve_state_v5(pool, channel).await.ok()? { + Some(approve_state) => approve_state, + None => return None, }; let state_root = approve_state.msg.state_root.clone(); - let new_state = match latest_new_state_v5(&pool, &channel, &state_root).await { - Ok(Some(new_state)) => Some(new_state), - _ => None, - }; + let new_state = latest_new_state_v5(pool, channel, &state_root).await.ok()?; new_state } -// TODO pass UnifiedNum instead -fn get_spender_leaf_for_spender(spender_balance: &BigNum, total_deposited: &UnifiedNum) -> Option { - let spender_balance = match spender_balance.to_u64() { - Some(balance) => UnifiedNum::from_u64(balance), - None => return None, - }; - - let total_spent = match total_deposited.checked_sub(&spender_balance) { +fn get_spender_leaf_for_spender( + spender_balance: &UnifiedNum, + total_deposited: &UnifiedNum, +) -> Option { + let total_spent = match total_deposited.checked_sub(spender_balance) { Some(spent) => spent, None => return None, }; From a0b8488f999b93066d921463b190f5d5e1a05aef Mon Sep 17 00:00:00 2001 From: simzzz Date: Fri, 27 Aug 2021 19:00:50 +0300 Subject: [PATCH 4/9] merged adex v5 branch, changed code accordingly --- sentry/src/routes/channel.rs | 72 +++++++++++++----------------------- 1 file changed, 26 insertions(+), 46 deletions(-) diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index b1983e8a4..ea251b4cb 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -15,7 +15,7 @@ use primitives::{ adapter::Adapter, balances::UncheckedState, channel_v5::Channel as ChannelV5, - config::{Config, TokenInfo}, + config::TokenInfo, sentry::{ channel_list::{ChannelListQuery, LastApprovedQuery}, AllSpendersResponse, LastApproved, LastApprovedResponse, MessageResponse, SpenderResponse, @@ -290,21 +290,27 @@ pub async fn get_spender_limits( let spender = Address::from_str(&route_params.index(1))?; - let latest_spendable = get_latest_spendable( - app.pool.clone(), - &app.config, - &app.adapter, - &spender, - &channel, - ) - .await?; + let latest_spendable = fetch_spendable(app.pool.clone(), &spender, &channel.id()).await?; - let approve_state = match latest_approve_state_v5(&app.pool, &channel).await? { - Some(approve_state) => approve_state, - None => return spender_response_without_leaf(latest_spendable.deposit.total), - }; + let token_info = app + .config + .token_address_whitelist + .get(&channel.token) + .ok_or_else(|| ResponseError::FailedValidation("Unsupported Channel Token".to_string()))?; - let state_root = approve_state.msg.state_root.clone(); + let latest_spendable = match latest_spendable { + Some(spendable) => spendable, + None => { + create_or_update_spendable_document( + &app.adapter, + token_info, + app.pool.clone(), + &channel, + spender, + ) + .await? + } + }; let new_state = match get_corresponding_new_state(&app.pool, &channel).await { Some(new_state) => new_state, @@ -330,29 +336,6 @@ pub async fn get_spender_limits( Ok(success_response(serde_json::to_string(&res)?)) } -async fn get_latest_spendable( - pool: DbPool, - config: &Config, - adapter: &impl Adapter, - spender: &Address, - channel: &ChannelV5, -) -> Result { - let latest_spendable = fetch_spendable(pool.clone(), spender, &channel.id()).await?; - let token_info = config - .token_address_whitelist - .get(&channel.token) - .ok_or_else(|| ResponseError::FailedValidation("Unsupported Channel Token".to_string()))?; - - let latest_spendable = match latest_spendable { - Some(spendable) => spendable, - None => { - create_spendable_document(adapter, token_info, pool.clone(), channel, *spender).await? - } - }; - - Ok(latest_spendable) -} - pub async fn get_all_spender_limits( req: Request, app: &Application, @@ -371,14 +354,11 @@ pub async fn get_all_spender_limits( // Using for loop to avoid async closures for (spender_addr, balance) in new_state.msg.balances.spenders.iter() { - let latest_spendable = get_latest_spendable( - app.pool.clone(), - &app.config, - &app.adapter, - spender_addr, - &channel, - ) - .await?; + let latest_spendable = + match fetch_spendable(app.pool.clone(), spender_addr, &channel.id()).await? { + Some(spendable) => spendable, + None => return Err(ResponseError::NotFound), + }; let total_deposited = latest_spendable.deposit.total; let spender_leaf = get_spender_leaf_for_spender(balance, &total_deposited); @@ -401,7 +381,7 @@ pub async fn get_all_spender_limits( async fn get_corresponding_new_state( pool: &DbPool, channel: &ChannelV5, -) -> Option> { +) -> Option>> { let approve_state = match latest_approve_state_v5(pool, channel).await.ok()? { Some(approve_state) => approve_state, None => return None, From 1db26af872b79b8c6751c2a61e5ce5e103efd119 Mon Sep 17 00:00:00 2001 From: simzzz Date: Fri, 27 Aug 2021 19:26:52 +0300 Subject: [PATCH 5/9] modified route params --- sentry/src/lib.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 7b66b23f4..3674168f3 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -65,7 +65,6 @@ lazy_static! { static ref PUBLISHER_ANALYTICS_BY_CHANNEL_ID: Regex = Regex::new(r"^/analytics/for-publisher/0x([a-zA-Z0-9]{64})/?$").expect("The regex should be valid"); static ref CREATE_EVENTS_BY_CHANNEL_ID: Regex = Regex::new(r"^/channel/0x([a-zA-Z0-9]{64})/events/?$").expect("The regex should be valid"); static ref CHANNEL_SPENDER_LEAF_AND_TOTAL_DEPOSITED: Regex = Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/spender/0x([a-zA-Z0-9]{40})/?$").expect("This regex should be valid"); - static ref CHANNEL_ALL_SPENDER_LIMITS: Regex = Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/spender/all/?$").expect("This regex should be valid"); } static INSERT_EVENTS_BY_CAMPAIGN_ID: Lazy = Lazy::new(|| { @@ -77,6 +76,9 @@ static CLOSE_CAMPAIGN_BY_CAMPAIGN_ID: Lazy = Lazy::new(|| { static CAMPAIGN_UPDATE_BY_ID: Lazy = Lazy::new(|| { Regex::new(r"^/v5/campaign/0x([a-zA-Z0-9]{32})/?$").expect("The regex should be valid") }); +static CHANNEL_ALL_SPENDER_LIMITS: Lazy = Lazy::new(|| { + Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/spender/all/?$").expect("The regex should be valid") +}); #[derive(Debug, Clone)] pub struct RouteParams(pub Vec); @@ -396,14 +398,17 @@ async fn channels_router( get_spender_limits(req, app).await } else if let (Some(caps), &Method::GET) = (CHANNEL_ALL_SPENDER_LIMITS.captures(&path), method) { - req = AuthRequired.call(req, app).await?; let param = RouteParams(vec![caps .get(1) .map_or("".to_string(), |m| m.as_str().to_string())]); req.extensions_mut().insert(param); - req = ChannelLoad.call(req, app).await?; + req = Chain::new() + .chain(AuthRequired) + .chain(ChannelLoad) + .apply(req, app) + .await?; get_all_spender_limits(req, app).await } else { From f049565050d957dfc89b69147faf9b6813aca248 Mon Sep 17 00:00:00 2001 From: simzzz Date: Tue, 7 Sep 2021 21:56:44 +0300 Subject: [PATCH 6/9] did requested changes from PR --- sentry/src/lib.rs | 4 +-- sentry/src/routes/channel.rs | 50 +++++++++++++++--------------------- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 3674168f3..aba3fee95 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -77,7 +77,8 @@ static CAMPAIGN_UPDATE_BY_ID: Lazy = Lazy::new(|| { Regex::new(r"^/v5/campaign/0x([a-zA-Z0-9]{32})/?$").expect("The regex should be valid") }); static CHANNEL_ALL_SPENDER_LIMITS: Lazy = Lazy::new(|| { - Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/spender/all/?$").expect("The regex should be valid") + Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/spender/all/?$") + .expect("The regex should be valid") }); #[derive(Debug, Clone)] @@ -398,7 +399,6 @@ async fn channels_router( get_spender_limits(req, app).await } else if let (Some(caps), &Method::GET) = (CHANNEL_ALL_SPENDER_LIMITS.captures(&path), method) { - let param = RouteParams(vec![caps .get(1) .map_or("".to_string(), |m| m.as_str().to_string())]); diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index ea251b4cb..0485a266b 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -312,7 +312,7 @@ pub async fn get_spender_limits( } }; - let new_state = match get_corresponding_new_state(&app.pool, &channel).await { + let new_state = match get_corresponding_new_state(&app.pool, &channel).await? { Some(new_state) => new_state, None => return spender_response_without_leaf(latest_spendable.deposit.total), }; @@ -346,9 +346,10 @@ pub async fn get_all_spender_limits( .expect("Request should have Channel") .to_owned(); - let new_state = get_corresponding_new_state(&app.pool, &channel) - .await - .ok_or(ResponseError::NotFound)?; + let new_state = match get_corresponding_new_state(&app.pool, &channel).await? { + Some(new_state) => new_state, + None => return Err(ResponseError::NotFound), + }; let mut all_spender_limits: HashMap = HashMap::new(); @@ -357,15 +358,22 @@ pub async fn get_all_spender_limits( let latest_spendable = match fetch_spendable(app.pool.clone(), spender_addr, &channel.id()).await? { Some(spendable) => spendable, - None => return Err(ResponseError::NotFound), + None => continue, // skipping spender if not found }; let total_deposited = latest_spendable.deposit.total; - let spender_leaf = get_spender_leaf_for_spender(balance, &total_deposited); + let total_spent = total_deposited.checked_sub(balance).ok_or_else(|| { + ResponseError::FailedValidation("Couldn't calculate total_spent".to_string()) + })?; + + let spender_leaf = SpenderLeaf { + total_spent, + // merkle_proof: [u8; 32], // TODO + }; let spender_info = Spender { total_deposited, - spender_leaf, + spender_leaf: Some(spender_leaf), }; all_spender_limits.insert(*spender_addr, spender_info); @@ -381,35 +389,17 @@ pub async fn get_all_spender_limits( async fn get_corresponding_new_state( pool: &DbPool, channel: &ChannelV5, -) -> Option>> { - let approve_state = match latest_approve_state_v5(pool, channel).await.ok()? { +) -> Result>>, ResponseError> { + let approve_state = match latest_approve_state_v5(pool, channel).await? { Some(approve_state) => approve_state, - None => return None, + None => return Err(ResponseError::NotFound), }; let state_root = approve_state.msg.state_root.clone(); - let new_state = latest_new_state_v5(pool, channel, &state_root).await.ok()?; - - new_state -} - -fn get_spender_leaf_for_spender( - spender_balance: &UnifiedNum, - total_deposited: &UnifiedNum, -) -> Option { - let total_spent = match total_deposited.checked_sub(spender_balance) { - Some(spent) => spent, - None => return None, - }; - - // Return - let leaf = SpenderLeaf { - total_spent, - // merkle_proof: [u8; 32], // TODO - }; + let new_state = latest_new_state_v5(pool, channel, &state_root).await?; - Some(leaf) + Ok(new_state) } #[cfg(test)] From 3fc3b18838469e023332a3a6eda7dca0d141d3f6 Mon Sep 17 00:00:00 2001 From: simzzz Date: Thu, 9 Sep 2021 20:20:18 +0300 Subject: [PATCH 7/9] changes to all spenders for loop --- sentry/src/db/spendable.rs | 16 +++++++++++++++ sentry/src/routes/channel.rs | 40 +++++++++++++++++++----------------- 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/sentry/src/db/spendable.rs b/sentry/src/db/spendable.rs index 18c02c03e..f3f18fb05 100644 --- a/sentry/src/db/spendable.rs +++ b/sentry/src/db/spendable.rs @@ -44,6 +44,22 @@ pub async fn fetch_spendable( Ok(row.map(Spendable::from)) } +static GET_ALL_SPENDERS_STATEMENT: &str = "SELECT spender, channel_id, channel, total, still_on_create2 FROM spendable WHERE channel_id = $1"; + +// TODO: Include pagination +pub async fn get_all_spendables_for_channel( + pool: DbPool, + channel_id: &ChannelId, +) -> Result, PoolError> { + let client = pool.get().await?; + let statement = client.prepare(GET_ALL_SPENDERS_STATEMENT).await?; + + let rows = client.query(&statement, &[channel_id]).await?; + let spendables: Vec = rows.into_iter().map(Spendable::from).collect(); + + Ok(spendables) +} + static UPDATE_SPENDABLE_STATEMENT: &str = "INSERT INTO spendable(spender, channel_id, channel, total, still_on_create2) VALUES($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT spendable_pkey DO UPDATE SET total = $4, still_on_create2 = $5 WHERE spendable.spender = $1 AND spendable.channel_id = $2 RETURNING spender, channel_id, channel, total, still_on_create2"; // Updates spendable entry deposit or inserts a new spendable entry if it doesn't exist diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index 0485a266b..1f63dd143 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -4,7 +4,7 @@ use crate::db::{ latest_new_state_v5, }, get_channel_by_id, insert_channel, insert_validator_messages, list_channels, - spendable::{fetch_spendable, update_spendable}, + spendable::{fetch_spendable, get_all_spendables_for_channel, update_spendable}, DbPool, PoolError, }; use crate::{success_response, Application, Auth, ResponseError, RouteParams}; @@ -353,30 +353,32 @@ pub async fn get_all_spender_limits( let mut all_spender_limits: HashMap = HashMap::new(); + let all_spendables = get_all_spendables_for_channel(app.pool.clone(), &channel.id()).await?; + // Using for loop to avoid async closures - for (spender_addr, balance) in new_state.msg.balances.spenders.iter() { - let latest_spendable = - match fetch_spendable(app.pool.clone(), spender_addr, &channel.id()).await? { - Some(spendable) => spendable, - None => continue, // skipping spender if not found - }; - - let total_deposited = latest_spendable.deposit.total; - let total_spent = total_deposited.checked_sub(balance).ok_or_else(|| { - ResponseError::FailedValidation("Couldn't calculate total_spent".to_string()) - })?; - - let spender_leaf = SpenderLeaf { - total_spent, - // merkle_proof: [u8; 32], // TODO + for spendable in all_spendables { + let spender = spendable.spender; + let spender_leaf = if new_state.msg.balances.spenders.contains_key(&spender) { + let balance = new_state.msg.balances.spenders.get(&spender).unwrap(); + + Some(SpenderLeaf { + total_spent: spendable + .deposit + .total + .checked_sub(balance) + .unwrap_or_default(), + // merkle_proof: [u8; 32], // TODO + }) + } else { + None }; let spender_info = Spender { - total_deposited, - spender_leaf: Some(spender_leaf), + total_deposited: spendable.deposit.total, + spender_leaf, }; - all_spender_limits.insert(*spender_addr, spender_info); + all_spender_limits.insert(spender, spender_info); } let res = AllSpendersResponse { From e07992b90fa89f756f145be181f0d7e7493df1b9 Mon Sep 17 00:00:00 2001 From: simzzz Date: Fri, 10 Sep 2021 20:20:18 +0300 Subject: [PATCH 8/9] fixes to logic --- primitives/src/sentry.rs | 2 ++ sentry/src/routes/channel.rs | 61 +++++++++++++++++++++++------------- 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index 3c5aab57a..800265007 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -225,6 +225,8 @@ pub struct SpenderResponse { #[serde(rename_all = "camelCase")] pub struct AllSpendersResponse { pub spenders: HashMap, + #[serde(flatten)] + pub pagination: Pagination, } #[derive(Serialize, Deserialize, Debug)] diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index 2ef4c9116..34a1c9a5e 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -18,8 +18,8 @@ use primitives::{ config::TokenInfo, sentry::{ channel_list::{ChannelListQuery, LastApprovedQuery}, - AllSpendersResponse, LastApproved, LastApprovedResponse, MessageResponse, SpenderResponse, - SuccessResponse, + AllSpendersResponse, LastApproved, LastApprovedResponse, MessageResponse, Pagination, + SpenderResponse, SuccessResponse, }, spender::{Deposit, Spendable, Spender, SpenderLeaf}, validator::{MessageTypes, NewState}, @@ -346,10 +346,7 @@ pub async fn get_all_spender_limits( .expect("Request should have Channel") .to_owned(); - let new_state = match get_corresponding_new_state(&app.pool, &channel).await? { - Some(new_state) => new_state, - None => return Err(ResponseError::NotFound), - }; + let new_state = get_corresponding_new_state(&app.pool, &channel).await?; let mut all_spender_limits: HashMap = HashMap::new(); @@ -358,19 +355,23 @@ pub async fn get_all_spender_limits( // Using for loop to avoid async closures for spendable in all_spendables { let spender = spendable.spender; - let spender_leaf = if new_state.msg.balances.spenders.contains_key(&spender) { - let balance = new_state.msg.balances.spenders.get(&spender).unwrap(); - - Some(SpenderLeaf { - total_spent: spendable - .deposit - .total - .checked_sub(balance) - .unwrap_or_default(), - // merkle_proof: [u8; 32], // TODO - }) - } else { - None + let spender_leaf = match new_state { + Some(ref new_state) => new_state + .msg + .balances + .spenders + .get(&spender) + .map(|balance| { + SpenderLeaf { + total_spent: spendable + .deposit + .total + .checked_sub(balance) + .unwrap_or_default(), + // merkle_proof: [u8; 32], // TODO + } + }), + None => None, }; let spender_info = Spender { @@ -383,6 +384,12 @@ pub async fn get_all_spender_limits( let res = AllSpendersResponse { spenders: all_spender_limits, + pagination: Pagination { + // TODO + page: 1, + total: 1, + total_pages: 1, + }, }; Ok(success_response(serde_json::to_string(&res)?)) @@ -394,14 +401,24 @@ async fn get_corresponding_new_state( ) -> Result>>, ResponseError> { let approve_state = match latest_approve_state_v5(pool, channel).await? { Some(approve_state) => approve_state, - None => return Err(ResponseError::NotFound), + None => return Ok(None), }; let state_root = approve_state.msg.state_root.clone(); - let new_state = latest_new_state_v5(pool, channel, &state_root).await?; + let new_state = match latest_new_state_v5(pool, channel, &state_root).await? { + // TODO: Since it's an approved NewState, then it's safe to make sure it's in `CheckedState`, or if it's not - log the error that the balances are not aligned with the fact it's an Approved NewState and return ResponseError::BadRequest + Some(new_state) => Ok(Some(new_state)), + + None => { + // TODO: Log the error since this should never happen and its crucial to the Channel + return Err(ResponseError::BadRequest( + "Fatal error! The NewState for the last ApproveState was not found".to_string(), + )); + } + }; - Ok(new_state) + new_state } #[cfg(test)] From d4ea9938b625b58944f02cb1c45a7e8cdb4bb4be Mon Sep 17 00:00:00 2001 From: simzzz Date: Mon, 13 Sep 2021 17:55:21 +0300 Subject: [PATCH 9/9] did todo's in the code --- primitives/src/sentry.rs | 2 +- sentry/src/routes/channel.rs | 56 +++++++++++++++++------------------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index 800265007..e5ae70135 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -42,7 +42,7 @@ pub mod message { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] #[serde(try_from = "MessageTypes", into = "MessageTypes")] - pub struct Message(T); + pub struct Message(pub T); impl Message { pub fn new(message: T) -> Self { diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index 34a1c9a5e..7a91d2aa7 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -13,19 +13,19 @@ use hex::FromHex; use hyper::{Body, Request, Response}; use primitives::{ adapter::Adapter, - balances::UncheckedState, + balances::{CheckedState, UncheckedState}, channel_v5::Channel as ChannelV5, config::TokenInfo, sentry::{ channel_list::{ChannelListQuery, LastApprovedQuery}, - AllSpendersResponse, LastApproved, LastApprovedResponse, MessageResponse, Pagination, - SpenderResponse, SuccessResponse, + AllSpendersResponse, LastApproved, LastApprovedResponse, Pagination, SpenderResponse, + SuccessResponse, }, spender::{Deposit, Spendable, Spender, SpenderLeaf}, validator::{MessageTypes, NewState}, Address, Channel, ChannelId, UnifiedNum, }; -use slog::error; +use slog::{error, Logger}; use std::{collections::HashMap, str::FromStr}; use tokio_postgres::error::SqlState; @@ -312,14 +312,12 @@ pub async fn get_spender_limits( } }; - let new_state = match get_corresponding_new_state(&app.pool, &channel).await? { + let new_state = match get_corresponding_new_state(&app.pool, &app.logger, &channel).await? { Some(new_state) => new_state, None => return spender_response_without_leaf(latest_spendable.deposit.total), }; - let new_state_checked = new_state.msg.into_inner().try_checked()?; - - let total_spent = new_state_checked.balances.spenders.get(&spender); + let total_spent = new_state.balances.spenders.get(&spender); let spender_leaf = total_spent.map(|total_spent| SpenderLeaf { total_spent: *total_spent, @@ -346,7 +344,7 @@ pub async fn get_all_spender_limits( .expect("Request should have Channel") .to_owned(); - let new_state = get_corresponding_new_state(&app.pool, &channel).await?; + let new_state = get_corresponding_new_state(&app.pool, &app.logger, &channel).await?; let mut all_spender_limits: HashMap = HashMap::new(); @@ -356,21 +354,16 @@ pub async fn get_all_spender_limits( for spendable in all_spendables { let spender = spendable.spender; let spender_leaf = match new_state { - Some(ref new_state) => new_state - .msg - .balances - .spenders - .get(&spender) - .map(|balance| { - SpenderLeaf { - total_spent: spendable - .deposit - .total - .checked_sub(balance) - .unwrap_or_default(), - // merkle_proof: [u8; 32], // TODO - } - }), + Some(ref new_state) => new_state.balances.spenders.get(&spender).map(|balance| { + SpenderLeaf { + total_spent: spendable + .deposit + .total + .checked_sub(balance) + .unwrap_or_default(), + // merkle_proof: [u8; 32], // TODO + } + }), None => None, }; @@ -397,8 +390,9 @@ pub async fn get_all_spender_limits( async fn get_corresponding_new_state( pool: &DbPool, + logger: &Logger, channel: &ChannelV5, -) -> Result>>, ResponseError> { +) -> Result>, ResponseError> { let approve_state = match latest_approve_state_v5(pool, channel).await? { Some(approve_state) => approve_state, None => return Ok(None), @@ -407,11 +401,15 @@ async fn get_corresponding_new_state( let state_root = approve_state.msg.state_root.clone(); let new_state = match latest_new_state_v5(pool, channel, &state_root).await? { - // TODO: Since it's an approved NewState, then it's safe to make sure it's in `CheckedState`, or if it's not - log the error that the balances are not aligned with the fact it's an Approved NewState and return ResponseError::BadRequest - Some(new_state) => Ok(Some(new_state)), - + Some(new_state) => { + let new_state = new_state.msg.into_inner().try_checked().map_err(|err| { + error!(&logger, "Balances are not aligned in an approved NewState: {}", &err; "module" => "get_spender_limits"); + ResponseError::BadRequest("Balances are not aligned in an approved NewState".to_string()) + })?; + Ok(Some(new_state)) + } None => { - // TODO: Log the error since this should never happen and its crucial to the Channel + error!(&logger, "{}", "Fatal error! The NewState for the last ApproveState was not found"; "module" => "get_spender_limits"); return Err(ResponseError::BadRequest( "Fatal error! The NewState for the last ApproveState was not found".to_string(), ));