From e892efa6860dfe3628c54ad1f197ee0f3e180c9a Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Fri, 9 Aug 2019 11:04:07 +0300 Subject: [PATCH 01/11] feature(engines): Add trait for saving leftover values Implement the trait for the eth engine --- .../interledger-settlement-engines/Cargo.toml | 1 + .../src/engines/ethereum_ledger/eth_engine.rs | 73 +++++++++++++++++-- .../engines/ethereum_ledger/test_helpers.rs | 16 ++++ .../src/main.rs | 2 + .../src/stores/redis_ethereum_ledger/store.rs | 17 +++++ 5 files changed, 104 insertions(+), 5 deletions(-) diff --git a/crates/interledger-settlement-engines/Cargo.toml b/crates/interledger-settlement-engines/Cargo.toml index 7ed62b447..30cae9d1f 100644 --- a/crates/interledger-settlement-engines/Cargo.toml +++ b/crates/interledger-settlement-engines/Cargo.toml @@ -35,6 +35,7 @@ clap = "2.32.0" clarity = { git = "https://github.com/gakonst/clarity" } sha3 = "0.8.2" num-bigint = "0.2.2" +num-traits = "0.2.8" [dev-dependencies] lazy_static = "1.3" diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index 0932fecaa..3688aab7e 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -17,6 +17,7 @@ use hyper::StatusCode; use interledger_store_redis::RedisStoreBuilder; use log::info; use num_bigint::BigUint; +use num_traits::Zero; use redis::IntoConnectionInfo; use reqwest::r#async::{Client, Response as HttpResponse}; use ring::{digest, hmac}; @@ -489,7 +490,11 @@ where .push("settlements"); let client = Client::new(); debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash); + let self_clone = self.clone(); let action = move || { + // need to make 2 clones, one to own the variables in the function + // and one for the retry closure.. + let self_clone = self_clone.clone(); let account_id = account_id.clone(); client .post(url.clone()) @@ -504,11 +509,7 @@ where }) .and_then(move |response| { trace!("Accounting system responded with {:?}", response); - if response.status().is_success() { - Ok(()) - } else { - Err(()) - } + self_clone.process_connector_response(account_id_clone2, response, amount) }) }; Retry::spawn( @@ -520,6 +521,68 @@ where }) } + fn process_connector_response( + &self, + account_id: String, + response: HttpResponse, + amount: U256, + ) -> impl Future { + let engine_scale = self.asset_scale; + let store = self.store.clone(); + if !response.status().is_success() { + return Either::A(err(())); + } + Either::B( + response + .into_body() + .concat2() + .map_err(|err| { + let err = format!("Couldn't retrieve body {:?}", err); + error!("{}", err); + }) + .and_then(move |body| { + // Get the amount stored by the connector and + // check for any precision loss / overflow + serde_json::from_slice::(&body).map_err(|err| { + let err = format!("Couldn't parse body {:?} into Quantity {:?}", body, err); + error!("{}", err); + }) + }) + .and_then(move |quantity: Quantity| { + // Convert both to BigUInts so we can do arithmetic + join_all(vec![ + result(BigUint::from_str(&quantity.amount)), + result(BigUint::from_str(&amount.to_string())), + ]) + .map_err(|err| { + let error_msg = format!("Error converting to BigUints {:?}", err); + error!("{:?}", error_msg); + }) + .and_then(move |amounts: Vec| { + let connector_amount = amounts[0].clone(); + let engine_amount = amounts[1].clone(); + // Scale the amount settled by the + // connector back up to our scale + result(connector_amount.normalize_scale(ConvertDetails { + from: quantity.scale, + to: engine_scale, + })) + .and_then(move |scaled_connector_amount| { + let diff: BigUint = engine_amount - scaled_connector_amount; + if diff > Zero::zero() { + // connector settled less than we + // instructed it to, so we must save + // the difference for the leftovers + Either::A(store.save_leftovers(account_id, diff)) + } else { + Either::B(ok(())) + } + }) + }) + }), + ) + } + /// Helper function which submits an Ethereum ledger transaction to `to` for `amount`. /// If called with `token_address`, it makes an ERC20 transaction instead. /// Due to the lack of an API to create and sign the transaction diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs index 97f486a22..376affe2d 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs @@ -61,6 +61,22 @@ pub struct TestStore { pub last_observed_block: Arc>, pub saved_hashes: Arc>>, pub cache_hits: Arc>, + pub leftovers: Arc>>, +} + +use crate::stores::LeftoversStore; +use num_bigint::BigUint; + +impl LeftoversStore for TestStore { + fn save_leftovers( + &self, + account_id: String, + leftovers: BigUint, + ) -> Box + Send> { + let mut guard = self.leftovers.write(); + (*guard).insert(account_id, leftovers.to_string()); + Box::new(ok(())) + } } impl EthereumStore for TestStore { diff --git a/crates/interledger-settlement-engines/src/main.rs b/crates/interledger-settlement-engines/src/main.rs index a4f7601e8..58d777d92 100644 --- a/crates/interledger-settlement-engines/src/main.rs +++ b/crates/interledger-settlement-engines/src/main.rs @@ -1,3 +1,5 @@ +#![type_length_limit = "1739269"] + use clap::{value_t, App, Arg, SubCommand}; use hex; use std::str::FromStr; diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index 46badb3e8..629b803ed 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -105,6 +105,23 @@ impl EthereumLedgerRedisStore { } } +impl LeftoversStore for EthereumLedgerRedisStore { + fn save_leftovers( + &self, + account_id: String, + leftovers: BigUint, + ) -> Box + Send> { + let mut pipe = redis::pipe(); + pipe.set(format!("leftovers:{}", account_id), leftovers.to_string()) + .ignore(); + Box::new( + pipe.query_async(self.connection.clone()) + .map_err(move |err| error!("Error saving leftovers {:?}: {:?}", leftovers, err)) + .and_then(move |(_conn, _ret): (_, Value)| Ok(())), + ) + } +} + impl EthereumStore for EthereumLedgerRedisStore { type Account = Account; From 055d36511c3cbaf2cfb8af1d10761c82ea23f159 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Fri, 9 Aug 2019 12:23:48 +0300 Subject: [PATCH 02/11] feat(engine): Add leftovers value when settling to connector --- .../src/engines/ethereum_ledger/eth_engine.rs | 73 +++++++++++-------- .../engines/ethereum_ledger/test_helpers.rs | 13 +++- .../src/main.rs | 2 +- .../src/stores/mod.rs | 15 ++++ .../src/stores/redis_ethereum_ledger/store.rs | 21 ++++++ 5 files changed, 92 insertions(+), 32 deletions(-) diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index 3688aab7e..356e0ae1c 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -373,7 +373,7 @@ where store .load_account_id_from_address(addr) .and_then(move |id| { - self_clone.notify_connector(id.to_string(), amount, tx_hash) + self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash) }) .and_then(move |_| { // only save the transaction hash if the connector @@ -456,7 +456,7 @@ where Either::A( store.load_account_id_from_address(addr) .and_then(move |id| { - self_clone.notify_connector(id.to_string(), amount, tx_hash) + self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash) }) .and_then(move |_| { // only save the transaction hash if the connector @@ -477,7 +477,7 @@ where fn notify_connector( &self, account_id: String, - amount: U256, + amount: String, tx_hash: H256, ) -> impl Future { let mut url = self.connector_url.clone(); @@ -488,36 +488,57 @@ where .push("accounts") .push(&account_id.clone()) .push("settlements"); - let client = Client::new(); debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash); let self_clone = self.clone(); + let store = self.store.clone(); + let amount_clone = amount.clone(); let action = move || { // need to make 2 clones, one to own the variables in the function // and one for the retry closure.. let self_clone = self_clone.clone(); + let store = store.clone(); let account_id = account_id.clone(); - client - .post(url.clone()) - .header("Idempotency-Key", tx_hash.to_string()) - .json(&json!({ "amount": amount.to_string(), "scale" : engine_scale })) - .send() - .map_err(move |err| { - error!( - "Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}", - account_id, amount, err - ) - }) - .and_then(move |response| { - trace!("Accounting system responded with {:?}", response); - self_clone.process_connector_response(account_id_clone2, response, amount) + let account_id_clone2 = account_id.clone(); + let amount = amount.clone(); + let url = url.clone(); + + // settle for amount + leftovers + store.load_leftovers(account_id.clone()) + .and_then(move |leftovers| { + result(BigUint::from_str(&amount.clone()).map_err(move |err| { + let error_msg = format!("Error converting to BigUint {:?}", err); + error!("{:?}", error_msg); + })) + .and_then(move |amount| { + Ok(amount + leftovers) }) + }) + .and_then(move |full_amount| { + let client = Client::new(); + let full_amount_clone = full_amount.clone(); + client + .post(url.clone()) + .header("Idempotency-Key", tx_hash.to_string()) + .json(&json!({ "amount": full_amount.clone().to_string(), "scale" : engine_scale })) + .send() + .map_err(move |err| { + error!( + "Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}", + account_id, full_amount_clone, err + ) + }) + .and_then(move |response| { + trace!("Accounting system responded with {:?}", response); + self_clone.process_connector_response(account_id_clone2, response, full_amount.clone()) + }) + }) }; Retry::spawn( ExponentialBackoff::from_millis(10).take(MAX_RETRIES), action, ) .map_err(move |_| { - error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount, tx_hash) + error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount_clone, tx_hash) }) } @@ -525,7 +546,7 @@ where &self, account_id: String, response: HttpResponse, - amount: U256, + engine_amount: BigUint, ) -> impl Future { let engine_scale = self.asset_scale; let store = self.store.clone(); @@ -549,18 +570,12 @@ where }) }) .and_then(move |quantity: Quantity| { - // Convert both to BigUInts so we can do arithmetic - join_all(vec![ - result(BigUint::from_str(&quantity.amount)), - result(BigUint::from_str(&amount.to_string())), - ]) + result(BigUint::from_str(&quantity.amount)) .map_err(|err| { - let error_msg = format!("Error converting to BigUints {:?}", err); + let error_msg = format!("Error converting to BigUint {:?}", err); error!("{:?}", error_msg); }) - .and_then(move |amounts: Vec| { - let connector_amount = amounts[0].clone(); - let engine_amount = amounts[1].clone(); + .and_then(move |connector_amount: BigUint| { // Scale the amount settled by the // connector back up to our scale result(connector_amount.normalize_scale(ConvertDetails { diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs index 376affe2d..94e7ff54a 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs @@ -61,7 +61,7 @@ pub struct TestStore { pub last_observed_block: Arc>, pub saved_hashes: Arc>>, pub cache_hits: Arc>, - pub leftovers: Arc>>, + pub leftovers: Arc>>, } use crate::stores::LeftoversStore; @@ -74,9 +74,18 @@ impl LeftoversStore for TestStore { leftovers: BigUint, ) -> Box + Send> { let mut guard = self.leftovers.write(); - (*guard).insert(account_id, leftovers.to_string()); + (*guard).insert(account_id, leftovers); Box::new(ok(())) } + + fn load_leftovers(&self, account_id: String) -> Box + Send> { + let guard = self.leftovers.read(); + if let Some(l) = guard.get(&account_id) { + Box::new(ok(l.clone())) + } else { + Box::new(err(())) + } + } } impl EthereumStore for TestStore { diff --git a/crates/interledger-settlement-engines/src/main.rs b/crates/interledger-settlement-engines/src/main.rs index 58d777d92..8d1c69c9a 100644 --- a/crates/interledger-settlement-engines/src/main.rs +++ b/crates/interledger-settlement-engines/src/main.rs @@ -1,4 +1,4 @@ -#![type_length_limit = "1739269"] +#![type_length_limit = "3396805"] use clap::{value_t, App, Arg, SubCommand}; use hex; diff --git a/crates/interledger-settlement-engines/src/stores/mod.rs b/crates/interledger-settlement-engines/src/stores/mod.rs index 26f97f009..121e11e0f 100644 --- a/crates/interledger-settlement-engines/src/stores/mod.rs +++ b/crates/interledger-settlement-engines/src/stores/mod.rs @@ -28,3 +28,18 @@ pub trait IdempotentEngineStore { data: Bytes, ) -> Box + Send>; } + +pub trait LeftoversStore { + /// Saves the leftover data + fn save_leftovers( + &self, + account_id: String, + leftovers: BigUint, + ) -> Box + Send>; + + /// Saves the leftover data + fn load_leftovers( + &self, + account_id: String, + ) -> Box + Send>; +} diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index 629b803ed..1085ec3b3 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -6,6 +6,7 @@ use futures::{ use ethereum_tx_sign::web3::types::{Address as EthAddress, H256, U256}; use interledger_service::Account as AccountTrait; use std::collections::HashMap; +use std::str::FromStr; use crate::engines::ethereum_ledger::{EthereumAccount, EthereumAddresses, EthereumStore}; use redis::{self, cmd, r#async::SharedConnection, ConnectionInfo, PipelineCommands, Value}; @@ -120,6 +121,26 @@ impl LeftoversStore for EthereumLedgerRedisStore { .and_then(move |(_conn, _ret): (_, Value)| Ok(())), ) } + + fn load_leftovers( + &self, + account_id: String, + ) -> Box + Send> { + let mut pipe = redis::pipe(); + pipe.get(format!("leftovers:{}", account_id)) + .ignore(); + Box::new( + pipe.query_async(self.connection.clone()) + .map_err(move |err| error!("Error loading leftovers {:?}: ", err)) + .and_then(move |(_conn, leftovers): (_, String)| { + if let Ok(leftovers) = BigUint::from_str(&leftovers) { + Box::new(ok(leftovers)) + } else { + Box::new(err(())) + } + }) + ) + } } impl EthereumStore for EthereumLedgerRedisStore { From 68da282da1b69f6e46ea8bda9ddaa2b4a42ee568 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Fri, 9 Aug 2019 21:25:45 +0300 Subject: [PATCH 03/11] fix(engines): Make leftovers be an associated type This allows the trait implementer to choose the data type of the leftovers instead of forcing them to use BigUint --- .../src/engines/ethereum_ledger/eth_engine.rs | 75 ++++++++++++------- .../engines/ethereum_ledger/test_helpers.rs | 18 ++++- .../src/stores/mod.rs | 6 +- .../src/stores/redis_ethereum_ledger/store.rs | 13 ++-- 4 files changed, 75 insertions(+), 37 deletions(-) diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index 356e0ae1c..3e3dd8b59 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -35,7 +35,7 @@ use tokio_retry::{strategy::ExponentialBackoff, Retry}; use url::Url; use uuid::Uuid; -use crate::stores::redis_ethereum_ledger::*; +use crate::stores::{redis_ethereum_ledger::*, LeftoversStore}; use crate::{ApiResponse, CreateAccount, SettlementEngine, SettlementEngineApi}; use interledger_settlement::{Convert, ConvertDetails, Quantity}; @@ -101,7 +101,12 @@ pub struct EthereumLedgerSettlementEngineBuilder<'a, S, Si, A> { impl<'a, S, Si, A> EthereumLedgerSettlementEngineBuilder<'a, S, Si, A> where - S: EthereumStore + Clone + Send + Sync + 'static, + S: EthereumStore + + LeftoversStore + + Clone + + Send + + Sync + + 'static, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, A: EthereumAccount + Send + Sync + 'static, { @@ -223,7 +228,12 @@ where impl EthereumLedgerSettlementEngine where - S: EthereumStore + Clone + Send + Sync + 'static, + S: EthereumStore + + LeftoversStore + + Clone + + Send + + Sync + + 'static, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, A: EthereumAccount + Send + Sync + 'static, { @@ -373,7 +383,11 @@ where store .load_account_id_from_address(addr) .and_then(move |id| { - self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash) + self_clone.notify_connector( + id.to_string(), + amount.to_string(), + tx_hash, + ) }) .and_then(move |_| { // only save the transaction hash if the connector @@ -571,29 +585,31 @@ where }) .and_then(move |quantity: Quantity| { result(BigUint::from_str(&quantity.amount)) - .map_err(|err| { - let error_msg = format!("Error converting to BigUint {:?}", err); - error!("{:?}", error_msg); - }) - .and_then(move |connector_amount: BigUint| { - // Scale the amount settled by the - // connector back up to our scale - result(connector_amount.normalize_scale(ConvertDetails { - from: quantity.scale, - to: engine_scale, - })) - .and_then(move |scaled_connector_amount| { - let diff: BigUint = engine_amount - scaled_connector_amount; - if diff > Zero::zero() { - // connector settled less than we - // instructed it to, so we must save - // the difference for the leftovers - Either::A(store.save_leftovers(account_id, diff)) - } else { - Either::B(ok(())) - } + .map_err(|err| { + let error_msg = format!("Error converting to BigUint {:?}", err); + error!("{:?}", error_msg); + }) + .and_then(move |connector_amount: BigUint| { + // Scale the amount settled by the + // connector back up to our scale + result(connector_amount.normalize_scale(ConvertDetails { + from: quantity.scale, + to: engine_scale, + })) + .and_then( + move |scaled_connector_amount| { + let diff: BigUint = engine_amount - scaled_connector_amount; + if diff > Zero::zero() { + // connector settled less than we + // instructed it to, so we must save + // the difference for the leftovers + Either::A(store.save_leftovers(account_id, diff)) + } else { + Either::B(ok(())) + } + }, + ) }) - }) }), ) } @@ -689,7 +705,12 @@ where impl SettlementEngine for EthereumLedgerSettlementEngine where - S: EthereumStore + Clone + Send + Sync + 'static, + S: EthereumStore + + LeftoversStore + + Clone + + Send + + Sync + + 'static, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, A: EthereumAccount + Send + Sync + 'static, { diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs index 94e7ff54a..ab4ed509c 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs @@ -68,17 +68,22 @@ use crate::stores::LeftoversStore; use num_bigint::BigUint; impl LeftoversStore for TestStore { + type AssetType = BigUint; + fn save_leftovers( &self, account_id: String, - leftovers: BigUint, + leftovers: Self::AssetType, ) -> Box + Send> { let mut guard = self.leftovers.write(); (*guard).insert(account_id, leftovers); Box::new(ok(())) } - fn load_leftovers(&self, account_id: String) -> Box + Send> { + fn load_leftovers( + &self, + account_id: String, + ) -> Box + Send> { let guard = self.leftovers.read(); if let Some(l) = guard.get(&account_id) { Box::new(ok(l.clone())) @@ -261,6 +266,7 @@ impl TestStore { cache_hits: Arc::new(RwLock::new(0)), last_observed_block: Arc::new(RwLock::new(U256::from(0))), saved_hashes: Arc::new(RwLock::new(HashMap::new())), + leftovers: Arc::new(RwLock::new(HashMap::new())), } } } @@ -288,7 +294,13 @@ pub fn test_engine( ) -> EthereumLedgerSettlementEngine where Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, - S: EthereumStore + IdempotentStore + Clone + Send + Sync + 'static, + S: EthereumStore + + LeftoversStore + + IdempotentStore + + Clone + + Send + + Sync + + 'static, A: EthereumAccount + Send + Sync + 'static, { EthereumLedgerSettlementEngineBuilder::new(store, key) diff --git a/crates/interledger-settlement-engines/src/stores/mod.rs b/crates/interledger-settlement-engines/src/stores/mod.rs index 121e11e0f..16c1c8ef3 100644 --- a/crates/interledger-settlement-engines/src/stores/mod.rs +++ b/crates/interledger-settlement-engines/src/stores/mod.rs @@ -30,16 +30,18 @@ pub trait IdempotentEngineStore { } pub trait LeftoversStore { + type AssetType; + /// Saves the leftover data fn save_leftovers( &self, account_id: String, - leftovers: BigUint, + leftovers: Self::AssetType, ) -> Box + Send>; /// Saves the leftover data fn load_leftovers( &self, account_id: String, - ) -> Box + Send>; + ) -> Box + Send>; } diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index 1085ec3b3..fceac85ef 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -14,6 +14,8 @@ use redis::{self, cmd, r#async::SharedConnection, ConnectionInfo, PipelineComman use log::{error, trace}; use crate::stores::redis_store_common::{EngineRedisStore, EngineRedisStoreBuilder}; +use crate::stores::LeftoversStore; +use num_bigint::BigUint; // Key for the latest observed block and balance. The data is stored in order to // avoid double crediting transactions which have already been processed, and in @@ -107,10 +109,12 @@ impl EthereumLedgerRedisStore { } impl LeftoversStore for EthereumLedgerRedisStore { + type AssetType = BigUint; + fn save_leftovers( &self, account_id: String, - leftovers: BigUint, + leftovers: Self::AssetType, ) -> Box + Send> { let mut pipe = redis::pipe(); pipe.set(format!("leftovers:{}", account_id), leftovers.to_string()) @@ -125,10 +129,9 @@ impl LeftoversStore for EthereumLedgerRedisStore { fn load_leftovers( &self, account_id: String, - ) -> Box + Send> { + ) -> Box + Send> { let mut pipe = redis::pipe(); - pipe.get(format!("leftovers:{}", account_id)) - .ignore(); + pipe.get(format!("leftovers:{}", account_id)).ignore(); Box::new( pipe.query_async(self.connection.clone()) .map_err(move |err| error!("Error loading leftovers {:?}: ", err)) @@ -138,7 +141,7 @@ impl LeftoversStore for EthereumLedgerRedisStore { } else { Box::new(err(())) } - }) + }), ) } } From dce326dbc85f4e7ca2d618e14eef20dd92de5428 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Fri, 9 Aug 2019 22:09:36 +0300 Subject: [PATCH 04/11] fix(engine-store): load_leftovers -> pop_leftovers + test --- .../src/engines/ethereum_ledger/eth_engine.rs | 5 +-- .../engines/ethereum_ledger/test_helpers.rs | 7 ++-- .../src/stores/mod.rs | 4 +-- .../src/stores/redis_ethereum_ledger/store.rs | 32 ++++++++++++++++--- 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index 3e3dd8b59..5ec8d44dc 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -517,7 +517,7 @@ where let url = url.clone(); // settle for amount + leftovers - store.load_leftovers(account_id.clone()) + store.pop_leftovers(account_id.clone()) .and_then(move |leftovers| { result(BigUint::from_str(&amount.clone()).map_err(move |err| { let error_msg = format!("Error converting to BigUint {:?}", err); @@ -543,7 +543,8 @@ where }) .and_then(move |response| { trace!("Accounting system responded with {:?}", response); - self_clone.process_connector_response(account_id_clone2, response, full_amount.clone()) + Ok(()) // This call causes the type_length_error + // self_clone.process_connector_response(account_id_clone2, response, full_amount.clone()) }) }) }; diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs index ab4ed509c..138227295 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use std::sync::Arc; use hyper::StatusCode; +use num_traits::Zero; use std::process::Command; use std::str::FromStr; use std::thread::sleep; @@ -80,12 +81,14 @@ impl LeftoversStore for TestStore { Box::new(ok(())) } - fn load_leftovers( + fn pop_leftovers( &self, account_id: String, ) -> Box + Send> { - let guard = self.leftovers.read(); + let mut guard = self.leftovers.write(); if let Some(l) = guard.get(&account_id) { + let l = l.clone(); + (*guard).insert(account_id, Zero::zero()); Box::new(ok(l.clone())) } else { Box::new(err(())) diff --git a/crates/interledger-settlement-engines/src/stores/mod.rs b/crates/interledger-settlement-engines/src/stores/mod.rs index 16c1c8ef3..cffacd623 100644 --- a/crates/interledger-settlement-engines/src/stores/mod.rs +++ b/crates/interledger-settlement-engines/src/stores/mod.rs @@ -39,8 +39,8 @@ pub trait LeftoversStore { leftovers: Self::AssetType, ) -> Box + Send>; - /// Saves the leftover data - fn load_leftovers( + /// Clears the leftover data in the database and returns the cleared value + fn pop_leftovers( &self, account_id: String, ) -> Box + Send>; diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index fceac85ef..d168dada9 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -126,17 +126,19 @@ impl LeftoversStore for EthereumLedgerRedisStore { ) } - fn load_leftovers( + fn pop_leftovers( &self, account_id: String, ) -> Box + Send> { let mut pipe = redis::pipe(); - pipe.get(format!("leftovers:{}", account_id)).ignore(); + // Loads the value and resets it to 0 + pipe.getset(format!("leftovers:{}", account_id), 0); Box::new( pipe.query_async(self.connection.clone()) .map_err(move |err| error!("Error loading leftovers {:?}: ", err)) - .and_then(move |(_conn, leftovers): (_, String)| { - if let Ok(leftovers) = BigUint::from_str(&leftovers) { + .and_then(move |(_conn, leftovers): (_, Vec)| { + // redis.rs returns a bulk value for some reason, length is always 1 + if let Ok(leftovers) = BigUint::from_str(&leftovers[0]) { Box::new(ok(leftovers)) } else { Box::new(err(())) @@ -326,6 +328,28 @@ mod tests { use std::iter::FromIterator; use std::str::FromStr; + #[test] + fn saves_and_pops_leftovers_properly() { + let amount = BigUint::from(100u64); + let acc = "0".to_string(); + block_on(test_store().and_then(|(store, context)| { + store + .save_leftovers(acc.clone(), amount.clone()) + .map_err(|err| eprintln!("Redis error: {:?}", err)) + .and_then(move |_| { + store + .pop_leftovers(acc) + .map_err(|err| eprintln!("Redis error: {:?}", err)) + .and_then(move |ret| { + assert_eq!(amount, ret); + let _ = context; + Ok(()) + }) + }) + })) + .unwrap() + } + #[test] fn saves_and_loads_ethereum_addreses_properly() { block_on(test_store().and_then(|(store, context)| { From 11b99a8a97df6fef3890f055f956b4b540aa28c2 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 12 Aug 2019 13:38:02 +0300 Subject: [PATCH 05/11] fix(tests): return 0 if there were no leftovers found in the store --- .../src/engines/ethereum_ledger/eth_engine.rs | 83 +++++++++---------- .../engines/ethereum_ledger/test_helpers.rs | 2 +- .../src/stores/redis_ethereum_ledger/store.rs | 16 +++- 3 files changed, 54 insertions(+), 47 deletions(-) diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index 5ec8d44dc..88cb9f69e 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -494,67 +494,66 @@ where amount: String, tx_hash: H256, ) -> impl Future { - let mut url = self.connector_url.clone(); - let account_id_clone = account_id.clone(); let engine_scale = self.asset_scale; + let mut url = self.connector_url.clone(); url.path_segments_mut() .expect("Invalid connector URL") .push("accounts") .push(&account_id.clone()) .push("settlements"); debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash); - let self_clone = self.clone(); - let store = self.store.clone(); - let amount_clone = amount.clone(); - let action = move || { - // need to make 2 clones, one to own the variables in the function - // and one for the retry closure.. - let self_clone = self_clone.clone(); - let store = store.clone(); - let account_id = account_id.clone(); - let account_id_clone2 = account_id.clone(); - let amount = amount.clone(); - let url = url.clone(); - - // settle for amount + leftovers - store.pop_leftovers(account_id.clone()) + + // settle for amount + leftovers + self.store + .pop_leftovers(account_id.clone()) .and_then(move |leftovers| { - result(BigUint::from_str(&amount.clone()).map_err(move |err| { + debug!("POPPED LEFTOVERS {:?}", leftovers); + result(BigUint::from_str(&amount).map_err(move |err| { let error_msg = format!("Error converting to BigUint {:?}", err); error!("{:?}", error_msg); })) .and_then(move |amount| { - Ok(amount + leftovers) - }) - }) - .and_then(move |full_amount| { - let client = Client::new(); - let full_amount_clone = full_amount.clone(); - client - .post(url.clone()) - .header("Idempotency-Key", tx_hash.to_string()) - .json(&json!({ "amount": full_amount.clone().to_string(), "scale" : engine_scale })) - .send() - .map_err(move |err| { - error!( - "Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}", - account_id, full_amount_clone, err - ) + debug!("Got uncredited amount {}", amount); + let full_amount = amount + leftovers; + let client = Client::new(); + debug!( + "Notifying accounting system about full amount: {}", + full_amount + ); + let url = url.clone(); + let account_id_clone = account_id.clone(); + let full_amount_clone = full_amount.clone(); + + let action = move || { + let account_id = account_id.clone(); + let full_amount = full_amount.clone(); + client + .post(url.clone()) + .header("Idempotency-Key", tx_hash.to_string()) + .json(&json!(Quantity::new(full_amount.clone(), engine_scale))) + .send() + .map_err(move |err| { + error!( + "Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}", + account_id.clone(), full_amount.clone(), err + ); + }) + }; + + Retry::spawn( + ExponentialBackoff::from_millis(10).take(MAX_RETRIES), + action, + ) + .map_err(move |_| { + error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, full_amount_clone, tx_hash) }) .and_then(move |response| { trace!("Accounting system responded with {:?}", response); Ok(()) // This call causes the type_length_error // self_clone.process_connector_response(account_id_clone2, response, full_amount.clone()) }) + }) }) - }; - Retry::spawn( - ExponentialBackoff::from_millis(10).take(MAX_RETRIES), - action, - ) - .map_err(move |_| { - error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount_clone, tx_hash) - }) } fn process_connector_response( diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs index 138227295..564a427ae 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs @@ -91,7 +91,7 @@ impl LeftoversStore for TestStore { (*guard).insert(account_id, Zero::zero()); Box::new(ok(l.clone())) } else { - Box::new(err(())) + Box::new(ok(Zero::zero())) } } } diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index d168dada9..aa65777a3 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; use std::str::FromStr; use crate::engines::ethereum_ledger::{EthereumAccount, EthereumAddresses, EthereumStore}; +use num_traits::Zero; use redis::{self, cmd, r#async::SharedConnection, ConnectionInfo, PipelineCommands, Value}; use log::{error, trace}; @@ -116,6 +117,7 @@ impl LeftoversStore for EthereumLedgerRedisStore { account_id: String, leftovers: Self::AssetType, ) -> Box + Send> { + trace!("Saving leftovers {:?} {:?}", account_id, leftovers); let mut pipe = redis::pipe(); pipe.set(format!("leftovers:{}", account_id), leftovers.to_string()) .ignore(); @@ -130,6 +132,7 @@ impl LeftoversStore for EthereumLedgerRedisStore { &self, account_id: String, ) -> Box + Send> { + trace!("Loading leftovers {:?}", account_id); let mut pipe = redis::pipe(); // Loads the value and resets it to 0 pipe.getset(format!("leftovers:{}", account_id), 0); @@ -137,11 +140,16 @@ impl LeftoversStore for EthereumLedgerRedisStore { pipe.query_async(self.connection.clone()) .map_err(move |err| error!("Error loading leftovers {:?}: ", err)) .and_then(move |(_conn, leftovers): (_, Vec)| { - // redis.rs returns a bulk value for some reason, length is always 1 - if let Ok(leftovers) = BigUint::from_str(&leftovers[0]) { - Box::new(ok(leftovers)) + // redis.rs returns a bulk value for some reason, length is + // always 1 + if leftovers.len() == 1 { + if let Ok(leftovers) = BigUint::from_str(&leftovers[0]) { + Box::new(ok(leftovers)) + } else { + Box::new(ok(Zero::zero())) + } } else { - Box::new(err(())) + Box::new(ok(Zero::zero())) } }), ) From 72751ba8cda1c8ee6972fc8fdd8c89f15657f362 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 12 Aug 2019 13:43:22 +0300 Subject: [PATCH 06/11] reenable the response processing of the connector --- .../src/engines/ethereum_ledger/eth_engine.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index 88cb9f69e..f9f6c409a 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -504,6 +504,7 @@ where debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash); // settle for amount + leftovers + let self_clone = self.clone(); self.store .pop_leftovers(account_id.clone()) .and_then(move |leftovers| { @@ -523,6 +524,8 @@ where let url = url.clone(); let account_id_clone = account_id.clone(); let full_amount_clone = full_amount.clone(); + let account_id_clone2 = account_id.clone(); + let full_amount_clone2 = full_amount.clone(); let action = move || { let account_id = account_id.clone(); @@ -539,18 +542,17 @@ where ); }) }; - Retry::spawn( ExponentialBackoff::from_millis(10).take(MAX_RETRIES), action, ) .map_err(move |_| { - error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, full_amount_clone, tx_hash) + error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone2, full_amount_clone2, tx_hash) }) .and_then(move |response| { trace!("Accounting system responded with {:?}", response); - Ok(()) // This call causes the type_length_error - // self_clone.process_connector_response(account_id_clone2, response, full_amount.clone()) + // Ok(()) // This call causes the type_length_error + self_clone.process_connector_response(account_id_clone, response, full_amount_clone) }) }) }) From 88dc75d37b36db527e8975301c4125fcedec0880 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 12 Aug 2019 16:14:40 +0300 Subject: [PATCH 07/11] fix(eth-se): Box most futures to work around type_length_error Note: https://github.com/interledgerjs/settlement-xrp is not updated yet to return data for checking leftovers --- .../src/engines/ethereum_ledger/eth_engine.rs | 343 ++++++++++-------- .../src/main.rs | 2 - 2 files changed, 186 insertions(+), 159 deletions(-) diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index f9f6c409a..e54d959d8 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -17,7 +17,6 @@ use hyper::StatusCode; use interledger_store_redis::RedisStoreBuilder; use log::info; use num_bigint::BigUint; -use num_traits::Zero; use redis::IntoConnectionInfo; use reqwest::r#async::{Client, Response as HttpResponse}; use ring::{digest, hmac}; @@ -299,64 +298,66 @@ where let token_address = self.address.token_address; // get the current block number - web3.eth() - .block_number() - .map_err(move |err| error!("Could not fetch current block number {:?}", err)) - .and_then(move |current_block| { - trace!("Current block {}", current_block); - // get the safe number of blocks to avoid reorgs - let fetch_until = current_block - confirmations; - // U256 does not implement IntoFuture so we must wrap it - Ok((Ok(fetch_until), store.load_recently_observed_block())) - }) - .flatten() - .and_then(move |(fetch_until, last_observed_block)| { - trace!( - "Will fetch txs from block {} until {}", - last_observed_block, - fetch_until - ); + Box::new( + web3.eth() + .block_number() + .map_err(move |err| error!("Could not fetch current block number {:?}", err)) + .and_then(move |current_block| { + trace!("Current block {}", current_block); + // get the safe number of blocks to avoid reorgs + let fetch_until = current_block - confirmations; + // U256 does not implement IntoFuture so we must wrap it + Ok((Ok(fetch_until), store.load_recently_observed_block())) + }) + .flatten() + .and_then(move |(fetch_until, last_observed_block)| { + trace!( + "Will fetch txs from block {} until {}", + last_observed_block, + fetch_until + ); - let notify_all_txs_fut = if let Some(token_address) = token_address { - trace!("Settling for ERC20 transactions"); - // get all erc20 transactions - let notify_all_erc20_txs_fut = filter_transfer_logs( - web3.clone(), - token_address, - None, - Some(our_address), - BlockNumber::Number(last_observed_block.low_u64()), - BlockNumber::Number(fetch_until.low_u64()), - ) - .and_then(move |transfers: Vec| { - // map each incoming erc20 transactions to an outgoing - // notification to the connector - join_all(transfers.into_iter().map(move |transfer| { - self_clone2.notify_erc20_transfer(transfer, token_address) - })) - }); + let notify_all_txs_fut = if let Some(token_address) = token_address { + trace!("Settling for ERC20 transactions"); + // get all erc20 transactions + let notify_all_erc20_txs_fut = filter_transfer_logs( + web3.clone(), + token_address, + None, + Some(our_address), + BlockNumber::Number(last_observed_block.low_u64()), + BlockNumber::Number(fetch_until.low_u64()), + ) + .and_then(move |transfers: Vec| { + // map each incoming erc20 transactions to an outgoing + // notification to the connector + join_all(transfers.into_iter().map(move |transfer| { + self_clone2.notify_erc20_transfer(transfer, token_address) + })) + }); - // combine all erc20 futures for that range of blocks - Either::A(notify_all_erc20_txs_fut) - } else { - trace!("Settling for ETH transactions"); - let checked_blocks = last_observed_block.low_u64()..=fetch_until.low_u64(); - // for each block create a future which will notify the - // connector about all the transactions in that block that are sent to our account - let notify_eth_txs_fut = checked_blocks - .map(move |block_num| self_clone.notify_eth_txs_in_block(block_num)); - - // combine all the futures for that range of blocks - Either::B(join_all(notify_eth_txs_fut)) - }; + // combine all erc20 futures for that range of blocks + Either::A(notify_all_erc20_txs_fut) + } else { + trace!("Settling for ETH transactions"); + let checked_blocks = last_observed_block.low_u64()..=fetch_until.low_u64(); + // for each block create a future which will notify the + // connector about all the transactions in that block that are sent to our account + let notify_eth_txs_fut = checked_blocks + .map(move |block_num| self_clone.notify_eth_txs_in_block(block_num)); + + // combine all the futures for that range of blocks + Either::B(join_all(notify_eth_txs_fut)) + }; - notify_all_txs_fut.and_then(move |ret| { - trace!("Transactions settled {:?}", ret); - // now that all transactions have been processed successfully, we - // can save `fetch_until` as the latest observed block - store_clone.save_recently_observed_block(fetch_until) - }) - }) + notify_all_txs_fut.and_then(move |ret| { + trace!("Transactions settled {:?}", ret); + // now that all transactions have been processed successfully, we + // can save `fetch_until` as the latest observed block + store_clone.save_recently_observed_block(fetch_until) + }) + }), + ) } /// Submits an ERC20 transfer object's data to the connector @@ -365,7 +366,7 @@ where &self, transfer: ERC20Transfer, token_address: Address, - ) -> impl Future { + ) -> Box + Send> { let store = self.store.clone(); let tx_hash = transfer.tx_hash; let self_clone = self.clone(); @@ -374,31 +375,35 @@ where token_address: Some(token_address), }; let amount = transfer.amount; - store - .check_if_tx_processed(tx_hash) - .map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash)) - .and_then(move |processed| { - if !processed { - Either::A( - store - .load_account_id_from_address(addr) - .and_then(move |id| { - self_clone.notify_connector( - id.to_string(), - amount.to_string(), - tx_hash, - ) - }) - .and_then(move |_| { - // only save the transaction hash if the connector - // was successfully notified - store.mark_tx_processed(tx_hash) - }), - ) - } else { - Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction - } - }) + Box::new( + store + .check_if_tx_processed(tx_hash) + .map_err(move |_| { + error!("Error when querying store about transaction: {:?}", tx_hash) + }) + .and_then(move |processed| { + if !processed { + Either::A( + store + .load_account_id_from_address(addr) + .and_then(move |id| { + self_clone.notify_connector( + id.to_string(), + amount.to_string(), + tx_hash, + ) + }) + .and_then(move |_| { + // only save the transaction hash if the connector + // was successfully notified + store.mark_tx_processed(tx_hash) + }), + ) + } else { + Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction + } + }), + ) } fn notify_eth_txs_in_block(&self, block_number: u64) -> impl Future { @@ -438,18 +443,17 @@ where }) } - fn notify_eth_transfer(&self, tx_hash: H256) -> impl Future { + fn notify_eth_transfer(&self, tx_hash: H256) -> Box + Send> { let our_address = self.address.own_address; let web3 = self.web3.clone(); let store = self.store.clone(); let self_clone = self.clone(); // Skip transactions which have already been processed by the connector - store.check_if_tx_processed(tx_hash) + Box::new(store.check_if_tx_processed(tx_hash) .map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash)) .and_then(move |processed| { if !processed { - Either::A( - web3.eth().transaction(TransactionId::Hash(tx_hash)) + Either::A(web3.eth().transaction(TransactionId::Hash(tx_hash)) .map_err(move |err| error!("Could not fetch transaction data from transaction hash: {:?}. Got error: {:?}", tx_hash, err)) .and_then(move |maybe_tx| { // Unlikely to error out since we only call this on @@ -467,8 +471,7 @@ where own_address: from, token_address, }; - Either::A( - store.load_account_id_from_address(addr) + Either::A(store.load_account_id_from_address(addr) .and_then(move |id| { self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash) }) @@ -485,7 +488,7 @@ where } else { Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction } - }) + })) } fn notify_connector( @@ -504,72 +507,84 @@ where debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash); // settle for amount + leftovers - let self_clone = self.clone(); - self.store + let account_id_clone = account_id.clone(); + let full_amount_fut = self + .store .pop_leftovers(account_id.clone()) .and_then(move |leftovers| { - debug!("POPPED LEFTOVERS {:?}", leftovers); - result(BigUint::from_str(&amount).map_err(move |err| { + let full_amount_fut2 = result(BigUint::from_str(&amount).map_err(move |err| { let error_msg = format!("Error converting to BigUint {:?}", err); error!("{:?}", error_msg); })) .and_then(move |amount| { debug!("Got uncredited amount {}", amount); let full_amount = amount + leftovers; - let client = Client::new(); debug!( "Notifying accounting system about full amount: {}", full_amount ); - let url = url.clone(); - let account_id_clone = account_id.clone(); - let full_amount_clone = full_amount.clone(); - let account_id_clone2 = account_id.clone(); - let full_amount_clone2 = full_amount.clone(); + ok(full_amount) + }); + ok(full_amount_fut2) + }) + .flatten(); - let action = move || { - let account_id = account_id.clone(); - let full_amount = full_amount.clone(); - client - .post(url.clone()) - .header("Idempotency-Key", tx_hash.to_string()) - .json(&json!(Quantity::new(full_amount.clone(), engine_scale))) - .send() - .map_err(move |err| { - error!( - "Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}", - account_id.clone(), full_amount.clone(), err - ); - }) - }; - Retry::spawn( - ExponentialBackoff::from_millis(10).take(MAX_RETRIES), - action, - ) - .map_err(move |_| { - error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone2, full_amount_clone2, tx_hash) + let self_clone = self.clone(); + let ping_connector_fut = full_amount_fut.and_then(move |full_amount| { + let url = url.clone(); + let account_id = account_id_clone.clone(); + let account_id2 = account_id_clone.clone(); + let full_amount2 = full_amount.clone(); + + let action = move || { + let client = Client::new(); + let account_id = account_id.clone(); + let full_amount = full_amount.clone(); + let full_amount_clone = full_amount.clone(); + client + .post(url.clone()) + .header("Idempotency-Key", tx_hash.to_string()) + .json(&json!(Quantity::new(full_amount.clone(), engine_scale))) + .send() + .map_err(move |err| { + error!( + "Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}", + account_id, full_amount_clone, err + ); }) - .and_then(move |response| { - trace!("Accounting system responded with {:?}", response); - // Ok(()) // This call causes the type_length_error - self_clone.process_connector_response(account_id_clone, response, full_amount_clone) + .and_then(move |ret| { + ok((ret, full_amount)) }) - }) + }; + Retry::spawn( + ExponentialBackoff::from_millis(10).take(MAX_RETRIES), + action, + ) + .map_err(move |_| { + error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id2, full_amount2, tx_hash) }) + }); + + ping_connector_fut.and_then(move |ret| { + trace!("Accounting system responded with {:?}", ret.0); + self_clone.process_connector_response(account_id, ret.0, ret.1) + }) } + /// Parses a response from a connector into a Quantity type and calls a + /// function to further process the parsed data to check if the store's + /// leftovers should be updated. fn process_connector_response( &self, account_id: String, response: HttpResponse, engine_amount: BigUint, - ) -> impl Future { - let engine_scale = self.asset_scale; - let store = self.store.clone(); + ) -> Box + Send> { + let self_clone = self.clone(); if !response.status().is_success() { - return Either::A(err(())); + return Box::new(err(())); } - Either::B( + Box::new( response .into_body() .concat2() @@ -586,32 +601,46 @@ where }) }) .and_then(move |quantity: Quantity| { - result(BigUint::from_str(&quantity.amount)) - .map_err(|err| { - let error_msg = format!("Error converting to BigUint {:?}", err); - error!("{:?}", error_msg); - }) - .and_then(move |connector_amount: BigUint| { - // Scale the amount settled by the - // connector back up to our scale - result(connector_amount.normalize_scale(ConvertDetails { - from: quantity.scale, - to: engine_scale, - })) - .and_then( - move |scaled_connector_amount| { - let diff: BigUint = engine_amount - scaled_connector_amount; - if diff > Zero::zero() { - // connector settled less than we - // instructed it to, so we must save - // the difference for the leftovers - Either::A(store.save_leftovers(account_id, diff)) - } else { - Either::B(ok(())) - } - }, - ) - }) + self_clone.process_received_quantity(account_id, quantity, engine_amount) + }), + ) + } + + // Normalizes a received Quantity object against the local engine scale, and + // if the normalized value is less than what the engine originally sent, it + // stores it as leftovers in the store. + fn process_received_quantity( + &self, + account_id: String, + quantity: Quantity, + engine_amount: BigUint, + ) -> Box + Send> { + let store = self.store.clone(); + let engine_scale = self.asset_scale; + Box::new( + result(BigUint::from_str(&quantity.amount)) + .map_err(|err| { + let error_msg = format!("Error converting to BigUint {:?}", err); + error!("{:?}", error_msg); + }) + .and_then(move |connector_amount: BigUint| { + // Scale the amount settled by the + // connector back up to our scale + result(connector_amount.normalize_scale(ConvertDetails { + from: quantity.scale, + to: engine_scale, + })) + .and_then(move |scaled_connector_amount| { + if engine_amount > scaled_connector_amount { + let diff = engine_amount - scaled_connector_amount; + // connector settled less than we + // instructed it to, so we must save + // the difference for the leftovers + store.save_leftovers(account_id, diff) + } else { + Box::new(ok(())) + } + }) }), ) } @@ -1059,7 +1088,7 @@ mod tests { "{\"amount\": \"100000000000\", \"scale\": 18 }".to_string(), )) .with_status(200) - .with_body("OK".to_string()) + .with_body("{\"amount\": \"100000000000\", \"scale\": 9 }".to_string()) .create(); let bob_connector_url = mockito::server_url(); diff --git a/crates/interledger-settlement-engines/src/main.rs b/crates/interledger-settlement-engines/src/main.rs index 8d1c69c9a..a4f7601e8 100644 --- a/crates/interledger-settlement-engines/src/main.rs +++ b/crates/interledger-settlement-engines/src/main.rs @@ -1,5 +1,3 @@ -#![type_length_limit = "3396805"] - use clap::{value_t, App, Arg, SubCommand}; use hex; use std::str::FromStr; From cf1dbd617fd191a205e471410d2a3186e3134864 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 12 Aug 2019 19:14:10 +0300 Subject: [PATCH 08/11] rename leftovers to uncredited_settlement_amount --- .../src/engines/ethereum_ledger/eth_engine.rs | 20 +++--- .../engines/ethereum_ledger/test_helpers.rs | 16 ++--- .../src/stores/mod.rs | 6 +- .../src/stores/redis_ethereum_ledger/store.rs | 64 ++++++++++++------- crates/interledger-settlement/src/lib.rs | 4 +- 5 files changed, 65 insertions(+), 45 deletions(-) diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index e54d959d8..655a58f18 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -297,6 +297,9 @@ where let our_address = self.address.own_address; let token_address = self.address.token_address; + // We `Box` futures in these functions due to + // https://github.com/rust-lang/rust/issues/54540#issuecomment-494749912. + // Otherwise, we get `type_length_limit` errors. // get the current block number Box::new( web3.eth() @@ -306,7 +309,6 @@ where trace!("Current block {}", current_block); // get the safe number of blocks to avoid reorgs let fetch_until = current_block - confirmations; - // U256 does not implement IntoFuture so we must wrap it Ok((Ok(fetch_until), store.load_recently_observed_block())) }) .flatten() @@ -506,19 +508,19 @@ where .push("settlements"); debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash); - // settle for amount + leftovers + // settle for amount + uncredited_settlement_amount let account_id_clone = account_id.clone(); let full_amount_fut = self .store - .pop_leftovers(account_id.clone()) - .and_then(move |leftovers| { + .load_uncredited_settlement_amount(account_id.clone()) + .and_then(move |uncredited_settlement_amount| { let full_amount_fut2 = result(BigUint::from_str(&amount).map_err(move |err| { let error_msg = format!("Error converting to BigUint {:?}", err); error!("{:?}", error_msg); })) .and_then(move |amount| { debug!("Got uncredited amount {}", amount); - let full_amount = amount + leftovers; + let full_amount = amount + uncredited_settlement_amount; debug!( "Notifying accounting system about full amount: {}", full_amount @@ -573,7 +575,7 @@ where /// Parses a response from a connector into a Quantity type and calls a /// function to further process the parsed data to check if the store's - /// leftovers should be updated. + /// uncredited settlement amount should be updated. fn process_connector_response( &self, account_id: String, @@ -608,7 +610,7 @@ where // Normalizes a received Quantity object against the local engine scale, and // if the normalized value is less than what the engine originally sent, it - // stores it as leftovers in the store. + // stores it as uncredited settlement amount in the store. fn process_received_quantity( &self, account_id: String, @@ -635,8 +637,8 @@ where let diff = engine_amount - scaled_connector_amount; // connector settled less than we // instructed it to, so we must save - // the difference for the leftovers - store.save_leftovers(account_id, diff) + // the difference + store.save_uncredited_settlement_amount(account_id, diff) } else { Box::new(ok(())) } diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs index 564a427ae..5c65081db 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs @@ -62,7 +62,7 @@ pub struct TestStore { pub last_observed_block: Arc>, pub saved_hashes: Arc>>, pub cache_hits: Arc>, - pub leftovers: Arc>>, + pub uncredited_settlement_amount: Arc>>, } use crate::stores::LeftoversStore; @@ -71,21 +71,21 @@ use num_bigint::BigUint; impl LeftoversStore for TestStore { type AssetType = BigUint; - fn save_leftovers( + fn save_uncredited_settlement_amount( &self, account_id: String, - leftovers: Self::AssetType, + uncredited_settlement_amount: Self::AssetType, ) -> Box + Send> { - let mut guard = self.leftovers.write(); - (*guard).insert(account_id, leftovers); + let mut guard = self.uncredited_settlement_amount.write(); + (*guard).insert(account_id, uncredited_settlement_amount); Box::new(ok(())) } - fn pop_leftovers( + fn load_uncredited_settlement_amount( &self, account_id: String, ) -> Box + Send> { - let mut guard = self.leftovers.write(); + let mut guard = self.uncredited_settlement_amount.write(); if let Some(l) = guard.get(&account_id) { let l = l.clone(); (*guard).insert(account_id, Zero::zero()); @@ -269,7 +269,7 @@ impl TestStore { cache_hits: Arc::new(RwLock::new(0)), last_observed_block: Arc::new(RwLock::new(U256::from(0))), saved_hashes: Arc::new(RwLock::new(HashMap::new())), - leftovers: Arc::new(RwLock::new(HashMap::new())), + uncredited_settlement_amount: Arc::new(RwLock::new(HashMap::new())), } } } diff --git a/crates/interledger-settlement-engines/src/stores/mod.rs b/crates/interledger-settlement-engines/src/stores/mod.rs index cffacd623..79bd5af36 100644 --- a/crates/interledger-settlement-engines/src/stores/mod.rs +++ b/crates/interledger-settlement-engines/src/stores/mod.rs @@ -33,14 +33,14 @@ pub trait LeftoversStore { type AssetType; /// Saves the leftover data - fn save_leftovers( + fn save_uncredited_settlement_amount( &self, account_id: String, - leftovers: Self::AssetType, + uncredited_settlement_amount: Self::AssetType, ) -> Box + Send>; /// Clears the leftover data in the database and returns the cleared value - fn pop_leftovers( + fn load_uncredited_settlement_amount( &self, account_id: String, ) -> Box + Send>; diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index aa65777a3..07a07a3a4 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -112,46 +112,64 @@ impl EthereumLedgerRedisStore { impl LeftoversStore for EthereumLedgerRedisStore { type AssetType = BigUint; - fn save_leftovers( + fn save_uncredited_settlement_amount( &self, account_id: String, - leftovers: Self::AssetType, + uncredited_settlement_amount: Self::AssetType, ) -> Box + Send> { - trace!("Saving leftovers {:?} {:?}", account_id, leftovers); + trace!( + "Saving uncredited_settlement_amount {:?} {:?}", + account_id, + uncredited_settlement_amount + ); let mut pipe = redis::pipe(); - pipe.set(format!("leftovers:{}", account_id), leftovers.to_string()) - .ignore(); + pipe.set( + format!("uncredited_settlement_amount:{}", account_id), + uncredited_settlement_amount.to_string(), + ) + .ignore(); Box::new( pipe.query_async(self.connection.clone()) - .map_err(move |err| error!("Error saving leftovers {:?}: {:?}", leftovers, err)) + .map_err(move |err| { + error!( + "Error saving uncredited_settlement_amount {:?}: {:?}", + uncredited_settlement_amount, err + ) + }) .and_then(move |(_conn, _ret): (_, Value)| Ok(())), ) } - fn pop_leftovers( + fn load_uncredited_settlement_amount( &self, account_id: String, ) -> Box + Send> { - trace!("Loading leftovers {:?}", account_id); + trace!("Loading uncredited_settlement_amount {:?}", account_id); let mut pipe = redis::pipe(); // Loads the value and resets it to 0 - pipe.getset(format!("leftovers:{}", account_id), 0); + pipe.getset(format!("uncredited_settlement_amount:{}", account_id), 0); Box::new( pipe.query_async(self.connection.clone()) - .map_err(move |err| error!("Error loading leftovers {:?}: ", err)) - .and_then(move |(_conn, leftovers): (_, Vec)| { - // redis.rs returns a bulk value for some reason, length is - // always 1 - if leftovers.len() == 1 { - if let Ok(leftovers) = BigUint::from_str(&leftovers[0]) { - Box::new(ok(leftovers)) + .map_err(move |err| { + error!("Error loading uncredited_settlement_amount {:?}: ", err) + }) + .and_then( + move |(_conn, uncredited_settlement_amount): (_, Vec)| { + // redis.rs returns a bulk value for some reason, length is + // always 1 + if uncredited_settlement_amount.len() == 1 { + if let Ok(uncredited_settlement_amount) = + BigUint::from_str(&uncredited_settlement_amount[0]) + { + Box::new(ok(uncredited_settlement_amount)) + } else { + Box::new(ok(Zero::zero())) + } } else { Box::new(ok(Zero::zero())) } - } else { - Box::new(ok(Zero::zero())) - } - }), + }, + ), ) } } @@ -337,16 +355,16 @@ mod tests { use std::str::FromStr; #[test] - fn saves_and_pops_leftovers_properly() { + fn saves_and_pops_uncredited_settlement_amount_properly() { let amount = BigUint::from(100u64); let acc = "0".to_string(); block_on(test_store().and_then(|(store, context)| { store - .save_leftovers(acc.clone(), amount.clone()) + .save_uncredited_settlement_amount(acc.clone(), amount.clone()) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |_| { store - .pop_leftovers(acc) + .load_uncredited_settlement_amount(acc) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |ret| { assert_eq!(amount, ret); diff --git a/crates/interledger-settlement/src/lib.rs b/crates/interledger-settlement/src/lib.rs index 4a9e3f19b..afa42e6f1 100644 --- a/crates/interledger-settlement/src/lib.rs +++ b/crates/interledger-settlement/src/lib.rs @@ -186,7 +186,7 @@ mod tests { .unwrap(), 1 ); - // there's leftovers for all number slots which do not increase in + // there's uncredited_settlement_amount for all number slots which do not increase in // increments of 10^abs(to_scale-from_scale) assert_eq!( 1u64.normalize_scale(ConvertDetails { from: 2, to: 1 }) @@ -219,7 +219,7 @@ mod tests { .unwrap(), 100 ); - // 299 units with base 3 is 29 units with base 2 (0.9 leftovers) + // 299 units with base 3 is 29 units with base 2 (0.9 uncredited_settlement_amount) assert_eq!( 299u64 .normalize_scale(ConvertDetails { from: 3, to: 2 }) From 19e0cd19bfa8039677c8d71146bfb0e38d92569b Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Tue, 13 Aug 2019 17:14:06 +0300 Subject: [PATCH 09/11] use an array of strings to store multiple BigUint uncredited values --- .../src/stores/redis_ethereum_ledger/store.rs | 70 ++++++++++++------- 1 file changed, 43 insertions(+), 27 deletions(-) diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index 07a07a3a4..9cf0569f9 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -123,7 +123,7 @@ impl LeftoversStore for EthereumLedgerRedisStore { uncredited_settlement_amount ); let mut pipe = redis::pipe(); - pipe.set( + pipe.lpush( format!("uncredited_settlement_amount:{}", account_id), uncredited_settlement_amount.to_string(), ) @@ -147,24 +147,36 @@ impl LeftoversStore for EthereumLedgerRedisStore { trace!("Loading uncredited_settlement_amount {:?}", account_id); let mut pipe = redis::pipe(); // Loads the value and resets it to 0 - pipe.getset(format!("uncredited_settlement_amount:{}", account_id), 0); + pipe.lrange( + format!("uncredited_settlement_amount:{}", account_id), + 0, + -1, + ); + pipe.del(format!("uncredited_settlement_amount:{}", account_id)) + .ignore(); Box::new( pipe.query_async(self.connection.clone()) .map_err(move |err| { error!("Error loading uncredited_settlement_amount {:?}: ", err) }) .and_then( - move |(_conn, uncredited_settlement_amount): (_, Vec)| { - // redis.rs returns a bulk value for some reason, length is - // always 1 - if uncredited_settlement_amount.len() == 1 { - if let Ok(uncredited_settlement_amount) = - BigUint::from_str(&uncredited_settlement_amount[0]) - { - Box::new(ok(uncredited_settlement_amount)) - } else { - Box::new(ok(Zero::zero())) + move |(_conn, uncredited_settlement_amounts): (_, Vec>)| { + if uncredited_settlement_amounts.len() == 1 { + let uncredited_settlement_amounts = + uncredited_settlement_amounts[0].clone(); + let mut total_amount = BigUint::zero(); + for uncredited_settlement_amount in uncredited_settlement_amounts { + let amount = if let Ok(amount) = + BigUint::from_str(&uncredited_settlement_amount) + { + amount + } else { + // could not convert to bigint + return Box::new(err(())); + }; + total_amount += amount; } + Box::new(ok(total_amount)) } else { Box::new(ok(Zero::zero())) } @@ -351,27 +363,31 @@ mod tests { block_on, test_eth_store as test_store, }; use super::*; + use futures::future::join_all; use std::iter::FromIterator; use std::str::FromStr; #[test] fn saves_and_pops_uncredited_settlement_amount_properly() { - let amount = BigUint::from(100u64); - let acc = "0".to_string(); block_on(test_store().and_then(|(store, context)| { - store - .save_uncredited_settlement_amount(acc.clone(), amount.clone()) - .map_err(|err| eprintln!("Redis error: {:?}", err)) - .and_then(move |_| { - store - .load_uncredited_settlement_amount(acc) - .map_err(|err| eprintln!("Redis error: {:?}", err)) - .and_then(move |ret| { - assert_eq!(amount, ret); - let _ = context; - Ok(()) - }) - }) + let amount = BigUint::from(100u64); + let acc = "0".to_string(); + join_all(vec![ + store.save_uncredited_settlement_amount(acc.clone(), amount.clone()), + store.save_uncredited_settlement_amount(acc.clone(), amount.clone()), + store.save_uncredited_settlement_amount(acc.clone(), amount.clone()), + ]) + .map_err(|err| eprintln!("Redis error: {:?}", err)) + .and_then(move |_| { + store + .load_uncredited_settlement_amount(acc) + .map_err(|err| eprintln!("Redis error: {:?}", err)) + .and_then(move |ret| { + assert_eq!(ret, BigUint::from(300u64)); + let _ = context; + Ok(()) + }) + }) })) .unwrap() } From 00a30dd5b1837777d7973b57926abbfc90dc3181 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Tue, 13 Aug 2019 19:36:41 +0300 Subject: [PATCH 10/11] test(eth-engine): ensure that the test works with numbers larger than u64 --- .../src/stores/redis_ethereum_ledger/store.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index 9cf0569f9..0178d4863 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -370,7 +370,8 @@ mod tests { #[test] fn saves_and_pops_uncredited_settlement_amount_properly() { block_on(test_store().and_then(|(store, context)| { - let amount = BigUint::from(100u64); + let amount = BigUint::from_str("10000000000000000000").unwrap(); + let ret_amount = BigUint::from_str("30000000000000000000").unwrap(); let acc = "0".to_string(); join_all(vec![ store.save_uncredited_settlement_amount(acc.clone(), amount.clone()), @@ -383,7 +384,7 @@ mod tests { .load_uncredited_settlement_amount(acc) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |ret| { - assert_eq!(ret, BigUint::from(300u64)); + assert_eq!(ret, ret_amount); let _ = context; Ok(()) }) From 0d5a4771896dad7f58690a425d7d073cd732575b Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Thu, 15 Aug 2019 19:35:15 +0300 Subject: [PATCH 11/11] chore: prefix uncredited amount key --- .../src/stores/redis_ethereum_ledger/store.rs | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index 899c31b49..dd0203e5e 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -26,6 +26,7 @@ static SAVED_TRANSACTIONS_KEY: &str = "transactions"; static SETTLEMENT_ENGINES_KEY: &str = "settlement"; static LEDGER_KEY: &str = "ledger"; static ETHEREUM_KEY: &str = "eth"; +static UNCREDITED_AMOUNT_KEY: &str = "uncredited_settlement_amount"; #[derive(Clone, Debug, Serialize)] pub struct Account { @@ -56,6 +57,13 @@ fn ethereum_ledger_key(account_id: u64) -> String { ) } +fn ethereum_uncredited_amount_key(account_id: String) -> String { + format!( + "{}:{}:{}:{}", + ETHEREUM_KEY, LEDGER_KEY, UNCREDITED_AMOUNT_KEY, account_id, + ) +} + impl EthereumAccount for Account { fn token_address(&self) -> Option { self.token_address @@ -123,8 +131,12 @@ impl LeftoversStore for EthereumLedgerRedisStore { uncredited_settlement_amount ); let mut pipe = redis::pipe(); + // We store these amounts as lists of strings + // because we cannot do BigNumber arithmetic in the store + // When loading the amounts, we convert them to the appropriate data + // type and sum them up. pipe.lpush( - format!("uncredited_settlement_amount:{}", account_id), + ethereum_uncredited_amount_key(account_id.clone()), uncredited_settlement_amount.to_string(), ) .ignore(); @@ -147,11 +159,7 @@ impl LeftoversStore for EthereumLedgerRedisStore { trace!("Loading uncredited_settlement_amount {:?}", account_id); let mut pipe = redis::pipe(); // Loads the value and resets it to 0 - pipe.lrange( - format!("uncredited_settlement_amount:{}", account_id), - 0, - -1, - ); + pipe.lrange(ethereum_uncredited_amount_key(account_id.clone()), 0, -1); pipe.del(format!("uncredited_settlement_amount:{}", account_id)) .ignore(); Box::new(