diff --git a/crates/interledger-settlement-engines/Cargo.toml b/crates/interledger-settlement-engines/Cargo.toml index 7ed62b447..30cae9d1f 100644 --- a/crates/interledger-settlement-engines/Cargo.toml +++ b/crates/interledger-settlement-engines/Cargo.toml @@ -35,6 +35,7 @@ clap = "2.32.0" clarity = { git = "https://github.com/gakonst/clarity" } sha3 = "0.8.2" num-bigint = "0.2.2" +num-traits = "0.2.8" [dev-dependencies] lazy_static = "1.3" diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index b7d25c02e..1d8cf7b49 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -34,7 +34,7 @@ use tokio_retry::{strategy::ExponentialBackoff, Retry}; use url::Url; use uuid::Uuid; -use crate::stores::redis_ethereum_ledger::*; +use crate::stores::{redis_ethereum_ledger::*, LeftoversStore}; use crate::{ApiResponse, CreateAccount, SettlementEngine, SettlementEngineApi}; use interledger_settlement::{Convert, ConvertDetails, Quantity}; @@ -100,7 +100,12 @@ pub struct EthereumLedgerSettlementEngineBuilder<'a, S, Si, A> { impl<'a, S, Si, A> EthereumLedgerSettlementEngineBuilder<'a, S, Si, A> where - S: EthereumStore + Clone + Send + Sync + 'static, + S: EthereumStore + + LeftoversStore + + Clone + + Send + + Sync + + 'static, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, A: EthereumAccount + Send + Sync + 'static, { @@ -222,7 +227,12 @@ where impl EthereumLedgerSettlementEngine where - S: EthereumStore + Clone + Send + Sync + 'static, + S: EthereumStore + + LeftoversStore + + Clone + + Send + + Sync + + 'static, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, A: EthereumAccount + Send + Sync + 'static, { @@ -283,6 +293,9 @@ where let our_address = self.address.own_address; let token_address = self.address.token_address; + // We `Box` futures in these functions due to + // https://github.com/rust-lang/rust/issues/54540#issuecomment-494749912. + // Otherwise, we get `type_length_limit` errors. // get the current block number web3.eth() .block_number() @@ -358,7 +371,7 @@ where &self, transfer: ERC20Transfer, token_address: Address, - ) -> impl Future { + ) -> Box + Send> { let store = self.store.clone(); let tx_hash = transfer.tx_hash; let self_clone = self.clone(); @@ -367,7 +380,7 @@ where token_address: Some(token_address), }; let amount = transfer.amount; - store + Box::new(store .check_if_tx_processed(tx_hash) .map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash)) .and_then(move |processed| { @@ -377,7 +390,7 @@ where .load_account_id_from_address(addr) .and_then(move |id| { debug!("Notifying connector about incoming ERC20 transaction for account {} for amount: {} (tx hash: {})", id, amount, tx_hash); - self_clone.notify_connector(id.to_string(), amount, tx_hash) + self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash) }) .and_then(move |_| { // only save the transaction hash if the connector @@ -388,7 +401,7 @@ where } else { Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction } - }) + })) } fn notify_eth_txs_in_block(&self, block_number: u64) -> impl Future { @@ -422,18 +435,17 @@ where .and_then(|_| Ok(())) } - fn notify_eth_transfer(&self, tx_hash: H256) -> impl Future { + fn notify_eth_transfer(&self, tx_hash: H256) -> Box + Send> { let our_address = self.address.own_address; let web3 = self.web3.clone(); let store = self.store.clone(); let self_clone = self.clone(); // Skip transactions which have already been processed by the connector - store.check_if_tx_processed(tx_hash) + Box::new(store.check_if_tx_processed(tx_hash) .map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash)) .and_then(move |processed| { if !processed { - Either::A( - web3.eth().transaction(TransactionId::Hash(tx_hash)) + Either::A(web3.eth().transaction(TransactionId::Hash(tx_hash)) .map_err(move |err| error!("Could not fetch transaction data from transaction hash: {:?}. Got error: {:?}", tx_hash, err)) .and_then(move |maybe_tx| { // Unlikely to error out since we only call this on @@ -451,10 +463,10 @@ where own_address: from, token_address: None, }; - return Either::A(store.load_account_id_from_address(addr) + + return Either::A(store.load_account_id_from_address(addr) .and_then(move |id| { - debug!("Notifying connector about incoming ETH transaction for account {} for amount: {} (tx hash: {})", id, amount, tx_hash); - self_clone.notify_connector(id.to_string(), amount, tx_hash) + self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash) }) .and_then(move |_| { // only save the transaction hash if the connector @@ -470,54 +482,161 @@ where } else { Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction } - }) + })) } fn notify_connector( &self, account_id: String, - amount: U256, + amount: String, tx_hash: H256, ) -> impl Future { - let mut url = self.connector_url.clone(); - let account_id_clone = account_id.clone(); let engine_scale = self.asset_scale; + let mut url = self.connector_url.clone(); url.path_segments_mut() .expect("Invalid connector URL") .push("accounts") .push(&account_id.clone()) .push("settlements"); - let client = Client::new(); debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash); - let action = move || { - let account_id = account_id.clone(); - client - .post(url.clone()) - .header("Idempotency-Key", tx_hash.to_string()) - .json(&json!({ "amount": amount.to_string(), "scale" : engine_scale })) - .send() - .map_err(move |err| { - error!( - "Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}", - account_id, amount, err - ) + + // settle for amount + uncredited_settlement_amount + let account_id_clone = account_id.clone(); + let full_amount_fut = self + .store + .load_uncredited_settlement_amount(account_id.clone()) + .and_then(move |uncredited_settlement_amount| { + let full_amount_fut2 = result(BigUint::from_str(&amount).map_err(move |err| { + let error_msg = format!("Error converting to BigUint {:?}", err); + error!("{:?}", error_msg); + })) + .and_then(move |amount| { + debug!("Got uncredited amount {}", amount); + let full_amount = amount + uncredited_settlement_amount; + debug!( + "Notifying accounting system about full amount: {}", + full_amount + ); + ok(full_amount) + }); + ok(full_amount_fut2) + }) + .flatten(); + + let self_clone = self.clone(); + let ping_connector_fut = full_amount_fut.and_then(move |full_amount| { + let url = url.clone(); + let account_id = account_id_clone.clone(); + let account_id2 = account_id_clone.clone(); + let full_amount2 = full_amount.clone(); + + let action = move || { + let client = Client::new(); + let account_id = account_id.clone(); + let full_amount = full_amount.clone(); + let full_amount_clone = full_amount.clone(); + client + .post(url.clone()) + .header("Idempotency-Key", tx_hash.to_string()) + .json(&json!(Quantity::new(full_amount.clone(), engine_scale))) + .send() + .map_err(move |err| { + error!( + "Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}", + account_id, full_amount_clone, err + ); + }) + .and_then(move |ret| { + ok((ret, full_amount)) + }) + }; + Retry::spawn( + ExponentialBackoff::from_millis(10).take(MAX_RETRIES), + action, + ) + .map_err(move |_| { + error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id2, full_amount2, tx_hash) + }) + }); + + ping_connector_fut.and_then(move |ret| { + trace!("Accounting system responded with {:?}", ret.0); + self_clone.process_connector_response(account_id, ret.0, ret.1) + }) + } + + /// Parses a response from a connector into a Quantity type and calls a + /// function to further process the parsed data to check if the store's + /// uncredited settlement amount should be updated. + fn process_connector_response( + &self, + account_id: String, + response: HttpResponse, + engine_amount: BigUint, + ) -> Box + Send> { + let self_clone = self.clone(); + if !response.status().is_success() { + return Box::new(err(())); + } + Box::new( + response + .into_body() + .concat2() + .map_err(|err| { + let err = format!("Couldn't retrieve body {:?}", err); + error!("{}", err); }) - .and_then(move |response| { - trace!("Accounting system responded with {:?}", response); - if response.status().is_success() { - Ok(()) - } else { - Err(()) - } + .and_then(move |body| { + // Get the amount stored by the connector and + // check for any precision loss / overflow + serde_json::from_slice::(&body).map_err(|err| { + let err = format!("Couldn't parse body {:?} into Quantity {:?}", body, err); + error!("{}", err); + }) }) - }; - Retry::spawn( - ExponentialBackoff::from_millis(10).take(MAX_RETRIES), - action, + .and_then(move |quantity: Quantity| { + self_clone.process_received_quantity(account_id, quantity, engine_amount) + }), + ) + } + + // Normalizes a received Quantity object against the local engine scale, and + // if the normalized value is less than what the engine originally sent, it + // stores it as uncredited settlement amount in the store. + fn process_received_quantity( + &self, + account_id: String, + quantity: Quantity, + engine_amount: BigUint, + ) -> Box + Send> { + let store = self.store.clone(); + let engine_scale = self.asset_scale; + Box::new( + result(BigUint::from_str(&quantity.amount)) + .map_err(|err| { + let error_msg = format!("Error converting to BigUint {:?}", err); + error!("{:?}", error_msg); + }) + .and_then(move |connector_amount: BigUint| { + // Scale the amount settled by the + // connector back up to our scale + result(connector_amount.normalize_scale(ConvertDetails { + from: quantity.scale, + to: engine_scale, + })) + .and_then(move |scaled_connector_amount| { + if engine_amount > scaled_connector_amount { + let diff = engine_amount - scaled_connector_amount; + // connector settled less than we + // instructed it to, so we must save + // the difference + store.save_uncredited_settlement_amount(account_id, diff) + } else { + Box::new(ok(())) + } + }) + }), ) - .map_err(move |_| { - error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount, tx_hash) - }) } /// Helper function which submits an Ethereum ledger transaction to `to` for `amount`. @@ -624,7 +743,12 @@ where impl SettlementEngine for EthereumLedgerSettlementEngine where - S: EthereumStore + Clone + Send + Sync + 'static, + S: EthereumStore + + LeftoversStore + + Clone + + Send + + Sync + + 'static, Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, A: EthereumAccount + Send + Sync + 'static, { @@ -980,7 +1104,7 @@ mod tests { "{\"amount\": \"100000000000\", \"scale\": 18 }".to_string(), )) .with_status(200) - .with_body("OK".to_string()) + .with_body("{\"amount\": \"100000000000\", \"scale\": 9 }".to_string()) .create(); let bob_connector_url = mockito::server_url(); diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs index 92fc0bc70..a9a3a34a9 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use std::sync::Arc; use hyper::StatusCode; +use num_traits::Zero; use std::process::Command; use std::str::FromStr; use std::thread::sleep; @@ -61,6 +62,38 @@ pub struct TestStore { pub last_observed_block: Arc>, pub saved_hashes: Arc>>, pub cache_hits: Arc>, + pub uncredited_settlement_amount: Arc>>, +} + +use crate::stores::LeftoversStore; +use num_bigint::BigUint; + +impl LeftoversStore for TestStore { + type AssetType = BigUint; + + fn save_uncredited_settlement_amount( + &self, + account_id: String, + uncredited_settlement_amount: Self::AssetType, + ) -> Box + Send> { + let mut guard = self.uncredited_settlement_amount.write(); + (*guard).insert(account_id, uncredited_settlement_amount); + Box::new(ok(())) + } + + fn load_uncredited_settlement_amount( + &self, + account_id: String, + ) -> Box + Send> { + let mut guard = self.uncredited_settlement_amount.write(); + if let Some(l) = guard.get(&account_id) { + let l = l.clone(); + (*guard).insert(account_id, Zero::zero()); + Box::new(ok(l.clone())) + } else { + Box::new(ok(Zero::zero())) + } + } } impl EthereumStore for TestStore { @@ -238,6 +271,7 @@ impl TestStore { cache_hits: Arc::new(RwLock::new(0)), last_observed_block: Arc::new(RwLock::new(U256::from(0))), saved_hashes: Arc::new(RwLock::new(HashMap::new())), + uncredited_settlement_amount: Arc::new(RwLock::new(HashMap::new())), } } } @@ -265,7 +299,13 @@ pub fn test_engine( ) -> EthereumLedgerSettlementEngine where Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static, - S: EthereumStore + IdempotentStore + Clone + Send + Sync + 'static, + S: EthereumStore + + LeftoversStore + + IdempotentStore + + Clone + + Send + + Sync + + 'static, A: EthereumAccount + Send + Sync + 'static, { EthereumLedgerSettlementEngineBuilder::new(store, key) diff --git a/crates/interledger-settlement-engines/src/stores/mod.rs b/crates/interledger-settlement-engines/src/stores/mod.rs index 26f97f009..79bd5af36 100644 --- a/crates/interledger-settlement-engines/src/stores/mod.rs +++ b/crates/interledger-settlement-engines/src/stores/mod.rs @@ -28,3 +28,20 @@ pub trait IdempotentEngineStore { data: Bytes, ) -> Box + Send>; } + +pub trait LeftoversStore { + type AssetType; + + /// Saves the leftover data + fn save_uncredited_settlement_amount( + &self, + account_id: String, + uncredited_settlement_amount: Self::AssetType, + ) -> Box + Send>; + + /// Clears the leftover data in the database and returns the cleared value + fn load_uncredited_settlement_amount( + &self, + account_id: String, + ) -> Box + Send>; +} diff --git a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs index 2e3bc79a6..dd0203e5e 100644 --- a/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs +++ b/crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs @@ -6,13 +6,17 @@ use futures::{ use ethereum_tx_sign::web3::types::{Address as EthAddress, H256, U256}; use interledger_service::Account as AccountTrait; use std::collections::HashMap; +use std::str::FromStr; use crate::engines::ethereum_ledger::{EthereumAccount, EthereumAddresses, EthereumStore}; +use num_traits::Zero; use redis::{self, cmd, r#async::SharedConnection, ConnectionInfo, PipelineCommands, Value}; use log::{error, trace}; use crate::stores::redis_store_common::{EngineRedisStore, EngineRedisStoreBuilder}; +use crate::stores::LeftoversStore; +use num_bigint::BigUint; // Key for the latest observed block and balance. The data is stored in order to // avoid double crediting transactions which have already been processed, and in @@ -22,6 +26,7 @@ static SAVED_TRANSACTIONS_KEY: &str = "transactions"; static SETTLEMENT_ENGINES_KEY: &str = "settlement"; static LEDGER_KEY: &str = "ledger"; static ETHEREUM_KEY: &str = "eth"; +static UNCREDITED_AMOUNT_KEY: &str = "uncredited_settlement_amount"; #[derive(Clone, Debug, Serialize)] pub struct Account { @@ -52,6 +57,13 @@ fn ethereum_ledger_key(account_id: u64) -> String { ) } +fn ethereum_uncredited_amount_key(account_id: String) -> String { + format!( + "{}:{}:{}:{}", + ETHEREUM_KEY, LEDGER_KEY, UNCREDITED_AMOUNT_KEY, account_id, + ) +} + impl EthereumAccount for Account { fn token_address(&self) -> Option { self.token_address @@ -105,6 +117,83 @@ impl EthereumLedgerRedisStore { } } +impl LeftoversStore for EthereumLedgerRedisStore { + type AssetType = BigUint; + + fn save_uncredited_settlement_amount( + &self, + account_id: String, + uncredited_settlement_amount: Self::AssetType, + ) -> Box + Send> { + trace!( + "Saving uncredited_settlement_amount {:?} {:?}", + account_id, + uncredited_settlement_amount + ); + let mut pipe = redis::pipe(); + // We store these amounts as lists of strings + // because we cannot do BigNumber arithmetic in the store + // When loading the amounts, we convert them to the appropriate data + // type and sum them up. + pipe.lpush( + ethereum_uncredited_amount_key(account_id.clone()), + uncredited_settlement_amount.to_string(), + ) + .ignore(); + Box::new( + pipe.query_async(self.connection.clone()) + .map_err(move |err| { + error!( + "Error saving uncredited_settlement_amount {:?}: {:?}", + uncredited_settlement_amount, err + ) + }) + .and_then(move |(_conn, _ret): (_, Value)| Ok(())), + ) + } + + fn load_uncredited_settlement_amount( + &self, + account_id: String, + ) -> Box + Send> { + trace!("Loading uncredited_settlement_amount {:?}", account_id); + let mut pipe = redis::pipe(); + // Loads the value and resets it to 0 + pipe.lrange(ethereum_uncredited_amount_key(account_id.clone()), 0, -1); + pipe.del(format!("uncredited_settlement_amount:{}", account_id)) + .ignore(); + Box::new( + pipe.query_async(self.connection.clone()) + .map_err(move |err| { + error!("Error loading uncredited_settlement_amount {:?}: ", err) + }) + .and_then( + move |(_conn, uncredited_settlement_amounts): (_, Vec>)| { + if uncredited_settlement_amounts.len() == 1 { + let uncredited_settlement_amounts = + uncredited_settlement_amounts[0].clone(); + let mut total_amount = BigUint::zero(); + for uncredited_settlement_amount in uncredited_settlement_amounts { + let amount = if let Ok(amount) = + BigUint::from_str(&uncredited_settlement_amount) + { + amount + } else { + // could not convert to bigint + return Box::new(err(())); + }; + total_amount += amount; + } + Box::new(ok(total_amount)) + } else { + Box::new(ok(Zero::zero())) + } + }, + ), + ) + } +} + impl EthereumStore for EthereumLedgerRedisStore { type Account = Account; @@ -284,9 +373,36 @@ mod tests { block_on, test_eth_store as test_store, }; use super::*; + use futures::future::join_all; use std::iter::FromIterator; use std::str::FromStr; + #[test] + fn saves_and_pops_uncredited_settlement_amount_properly() { + block_on(test_store().and_then(|(store, context)| { + let amount = BigUint::from_str("10000000000000000000").unwrap(); + let ret_amount = BigUint::from_str("30000000000000000000").unwrap(); + let acc = "0".to_string(); + join_all(vec![ + store.save_uncredited_settlement_amount(acc.clone(), amount.clone()), + store.save_uncredited_settlement_amount(acc.clone(), amount.clone()), + store.save_uncredited_settlement_amount(acc.clone(), amount.clone()), + ]) + .map_err(|err| eprintln!("Redis error: {:?}", err)) + .and_then(move |_| { + store + .load_uncredited_settlement_amount(acc) + .map_err(|err| eprintln!("Redis error: {:?}", err)) + .and_then(move |ret| { + assert_eq!(ret, ret_amount); + let _ = context; + Ok(()) + }) + }) + })) + .unwrap() + } + #[test] fn saves_and_loads_ethereum_addreses_properly() { block_on(test_store().and_then(|(store, context)| { diff --git a/crates/interledger-settlement/src/lib.rs b/crates/interledger-settlement/src/lib.rs index 4a9e3f19b..afa42e6f1 100644 --- a/crates/interledger-settlement/src/lib.rs +++ b/crates/interledger-settlement/src/lib.rs @@ -186,7 +186,7 @@ mod tests { .unwrap(), 1 ); - // there's leftovers for all number slots which do not increase in + // there's uncredited_settlement_amount for all number slots which do not increase in // increments of 10^abs(to_scale-from_scale) assert_eq!( 1u64.normalize_scale(ConvertDetails { from: 2, to: 1 }) @@ -219,7 +219,7 @@ mod tests { .unwrap(), 100 ); - // 299 units with base 3 is 29 units with base 2 (0.9 leftovers) + // 299 units with base 3 is 29 units with base 2 (0.9 uncredited_settlement_amount) assert_eq!( 299u64 .normalize_scale(ConvertDetails { from: 3, to: 2 })