Skip to content

POST /v5/channel/:id/pay #477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions primitives/src/sentry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,11 @@ pub struct AllSpendersQuery {
pub page: u64,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ChannelPayBody {
pub payouts: HashMap<Address, UnifiedNum>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ValidatorMessage {
pub from: ValidatorId,
Expand Down
5 changes: 3 additions & 2 deletions sentry/src/db/accounting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio_postgres::{
};

use super::{DbPool, PoolError};
use serde::Serialize;
use thiserror::Error;

static UPDATE_ACCOUNTING_STATEMENT: &str = "INSERT INTO accounting(channel_id, side, address, amount, updated, created) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT accounting_pkey DO UPDATE SET amount = accounting.amount + $4, updated = $6 WHERE accounting.channel_id = $1 AND accounting.side = $2 AND accounting.address = $3 RETURNING channel_id, side, address, amount, updated, created";
Expand All @@ -27,7 +28,7 @@ impl From<tokio_postgres::Error> for Error {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct Accounting {
pub channel_id: ChannelId,
pub side: Side,
Expand All @@ -50,7 +51,7 @@ impl From<&Row> for Accounting {
}
}

#[derive(Debug, Clone, Copy, ToSql, FromSql, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, ToSql, FromSql, PartialEq, Eq, Serialize)]
#[postgres(name = "accountingside")]
pub enum Side {
Earner,
Expand Down
22 changes: 20 additions & 2 deletions sentry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use {
campaign,
campaign::{campaign_list, create_campaign, update_campaign},
channel::{
add_spender_leaf, channel_list, get_accounting_for_channel, get_all_spender_limits,
get_spender_limits, last_approved,
add_spender_leaf, channel_list, channel_payout, get_accounting_for_channel,
get_all_spender_limits, get_spender_limits, last_approved,
validator_message::{
create_validator_messages, extract_params, list_validator_messages,
},
Expand Down Expand Up @@ -80,6 +80,9 @@ static CHANNEL_ACCOUNTING: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/accounting/?$")
.expect("The regex should be valid")
});
static CHANNEL_PAY: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/pay/?$").expect("The regex should be valid")
});

/// Regex extracted parameters.
/// This struct is created manually on each of the matched routes.
Expand Down Expand Up @@ -394,6 +397,21 @@ async fn channels_router<C: Locked + 'static>(
.await?;

get_accounting_for_channel(req, app).await
}
// POST /v5/channel/:id/pay
else if let (Some(caps), &Method::POST) = (CHANNEL_PAY.captures(&path), method) {
let param = RouteParams(vec![caps
.get(1)
.map_or("".to_string(), |m| m.as_str().to_string())]);
req.extensions_mut().insert(param);

req = Chain::new()
.chain(AuthRequired)
.chain(ChannelLoad)
.apply(req, app)
.await?;

channel_payout(req, app).await
} else {
Err(ResponseError::NotFound)
}
Expand Down
91 changes: 87 additions & 4 deletions sentry/src/routes/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,27 @@
//!

use crate::db::{
accounting::{get_all_accountings_for_channel, update_accounting, Side},
accounting::{
get_accounting, get_all_accountings_for_channel, spend_amount, update_accounting, Side,
},
insert_channel, list_channels,
spendable::{fetch_spendable, get_all_spendables_for_channel, update_spendable},
validator_message::{latest_approve_state, latest_heartbeats, latest_new_state},
DbPool,
};
use crate::{success_response, Application, ResponseError, RouteParams};
use crate::{
campaign::fetch_campaign_ids_for_channel, success_response, Application, ResponseError,
RouteParams,
};
use adapter::{client::Locked, Adapter};
use futures::future::try_join_all;
use hyper::{Body, Request, Response};
use primitives::{
balances::{Balances, CheckedState, UncheckedState},
sentry::{
channel_list::ChannelListQuery, AccountingResponse, AllSpendersQuery, AllSpendersResponse,
LastApproved, LastApprovedQuery, LastApprovedResponse, SpenderResponse, SuccessResponse,
ChannelPayBody, LastApproved, LastApprovedQuery, LastApprovedResponse, SpenderResponse,
SuccessResponse,
},
spender::{Spendable, Spender},
validator::NewState,
Expand Down Expand Up @@ -392,6 +398,83 @@ pub async fn get_accounting_for_channel<C: Locked + 'static>(
Ok(success_response(serde_json::to_string(&res)?))
}

pub async fn channel_payout<C: Locked + 'static>(
req: Request<Body>,
app: &Application<C>,
) -> Result<Response<Body>, ResponseError> {
let channel = req
.extensions()
.get::<Channel>()
.expect("Request should have Channel")
.to_owned();

let body = hyper::body::to_bytes(req.into_body()).await?;

let body = serde_json::from_slice::<ChannelPayBody>(&body)
.map_err(|e| ResponseError::FailedValidation(e.to_string()))?;

let channel_campaigns =
fetch_campaign_ids_for_channel(&app.pool, &channel.id(), app.config.campaigns_find_limit)
.await?;

let campaigns_remaining_sum = app
.campaign_remaining
.get_multiple(channel_campaigns.as_slice())
.await?
.iter()
.sum::<Option<UnifiedNum>>()
.ok_or_else(|| ResponseError::BadRequest(
"Couldn't sum remaining amount for all campaigns".to_string(),
))?;

// A campaign is closed when its remaining == 0
// therefore for all campaigns for a channel to be closed their total remaining sum should be 0
if campaigns_remaining_sum.ge(&UnifiedNum::from_u64(0)) {
return Err(ResponseError::BadRequest(
"Not all campaigns are closed!".to_string(),
));
}

let mut unchecked_balances: Balances<UncheckedState> = Balances::default();
for (address, amount) in body.payouts {
let accounting =
get_accounting(app.pool.clone(), channel.id(), address, Side::Spender).await?;
match accounting {
Some(accounting) => {
// We need to check if the amount this spender has is available for withdrawal
if accounting.amount.lt(&amount) {
return Err(ResponseError::BadRequest(format!(
"Spender amount for {} is less than the requested withdrawal amount",
address
)));
}
// If so, we add it to the earner balances for the same address
unchecked_balances
.earners
.insert(accounting.address, accounting.amount);
}
None => {
return Err(ResponseError::BadRequest(
"No accounting earner entry for channel/address pair".to_string(),
));
}
}
}

let balances = match unchecked_balances.check() {
Ok(balances) => balances,
Err(error) => {
error!(&app.logger, "{}", &error; "module" => "channel_payout");
return Err(ResponseError::FailedValidation(
"Earners sum is not equal to spenders sum for channel".to_string(),
));
}
};

let res = spend_amount(app.pool.clone(), channel.id(), balances).await?;

Ok(success_response(serde_json::to_string(&res)?))
}
/// [`Channel`] [validator messages](primitives::validator::MessageTypes) routes
/// starting with `/v5/channel/0xXXX.../validator-messages`
///
Expand Down Expand Up @@ -536,7 +619,7 @@ pub mod validator_message {
#[cfg(test)]
mod test {
use super::*;
use crate::db::{accounting::spend_amount, insert_channel};
use crate::db::insert_channel;
use crate::test_util::setup_dummy_app;
use adapter::primitives::Deposit;
use hyper::StatusCode;
Expand Down