Skip to content

Commit 0b8ee3f

Browse files
committed
feat(engine): Add leftovers value when settling to connector
1 parent 2422594 commit 0b8ee3f

File tree

5 files changed

+83
-32
lines changed

5 files changed

+83
-32
lines changed

crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ where
374374
store
375375
.load_account_id_from_address(addr)
376376
.and_then(move |id| {
377-
self_clone.notify_connector(id.to_string(), amount, tx_hash)
377+
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
378378
})
379379
.and_then(move |_| {
380380
// only save the transaction hash if the connector
@@ -457,7 +457,7 @@ where
457457
Either::A(
458458
store.load_account_id_from_address(addr)
459459
.and_then(move |id| {
460-
self_clone.notify_connector(id.to_string(), amount, tx_hash)
460+
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
461461
})
462462
.and_then(move |_| {
463463
// only save the transaction hash if the connector
@@ -478,7 +478,7 @@ where
478478
fn notify_connector(
479479
&self,
480480
account_id: String,
481-
amount: U256,
481+
amount: String,
482482
tx_hash: H256,
483483
) -> impl Future<Item = (), Error = ()> {
484484
let mut url = self.connector_url.clone();
@@ -489,45 +489,65 @@ where
489489
.push("accounts")
490490
.push(&account_id.clone())
491491
.push("settlements");
492-
let client = Client::new();
493492
debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash);
494493
let self_clone = self.clone();
494+
let store = self.store.clone();
495+
let amount_clone = amount.clone();
495496
let action = move || {
496497
// need to make 2 clones, one to own the variables in the function
497498
// and one for the retry closure..
498499
let self_clone = self_clone.clone();
500+
let store = store.clone();
499501
let account_id = account_id.clone();
500502
let account_id_clone2 = account_id.clone();
501-
client
502-
.post(url.clone())
503-
.header("Idempotency-Key", tx_hash.to_string())
504-
.json(&json!({ "amount": amount.to_string(), "scale" : engine_scale }))
505-
.send()
506-
.map_err(move |err| {
507-
error!(
508-
"Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}",
509-
account_id, amount, err
510-
)
511-
})
512-
.and_then(move |response| {
513-
trace!("Accounting system responded with {:?}", response);
514-
self_clone.process_connector_response(account_id_clone2, response, amount)
503+
let amount = amount.clone();
504+
let url = url.clone();
505+
506+
// settle for amount + leftovers
507+
store.load_leftovers(account_id.clone())
508+
.and_then(move |leftovers| {
509+
result(BigUint::from_str(&amount.clone()).map_err(move |err| {
510+
let error_msg = format!("Error converting to BigUint {:?}", err);
511+
error!("{:?}", error_msg);
512+
}))
513+
.and_then(move |amount| {
514+
Ok(amount + leftovers)
515515
})
516+
})
517+
.and_then(move |full_amount| {
518+
let client = Client::new();
519+
let full_amount_clone = full_amount.clone();
520+
client
521+
.post(url.clone())
522+
.header("Idempotency-Key", tx_hash.to_string())
523+
.json(&json!({ "amount": full_amount.clone().to_string(), "scale" : engine_scale }))
524+
.send()
525+
.map_err(move |err| {
526+
error!(
527+
"Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}",
528+
account_id, full_amount_clone, err
529+
)
530+
})
531+
.and_then(move |response| {
532+
trace!("Accounting system responded with {:?}", response);
533+
self_clone.process_connector_response(account_id_clone2, response, full_amount.clone())
534+
})
535+
})
516536
};
517537
Retry::spawn(
518538
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
519539
action,
520540
)
521541
.map_err(move |_| {
522-
error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount, tx_hash)
542+
error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount_clone, tx_hash)
523543
})
524544
}
525545

526546
fn process_connector_response(
527547
&self,
528548
account_id: String,
529549
response: HttpResponse,
530-
amount: U256,
550+
engine_amount: BigUint,
531551
) -> impl Future<Item = (), Error = ()> {
532552
let engine_scale = self.asset_scale;
533553
let store = self.store.clone();
@@ -551,18 +571,12 @@ where
551571
})
552572
})
553573
.and_then(move |quantity: Quantity| {
554-
// Convert both to BigUInts so we can do arithmetic
555-
join_all(vec![
556-
result(BigUint::from_str(&quantity.amount)),
557-
result(BigUint::from_str(&amount.to_string())),
558-
])
574+
result(BigUint::from_str(&quantity.amount))
559575
.map_err(|err| {
560-
let error_msg = format!("Error converting to BigUints {:?}", err);
576+
let error_msg = format!("Error converting to BigUint {:?}", err);
561577
error!("{:?}", error_msg);
562578
})
563-
.and_then(move |amounts: Vec<BigUint>| {
564-
let connector_amount = amounts[0].clone();
565-
let engine_amount = amounts[1].clone();
579+
.and_then(move |connector_amount: BigUint| {
566580
// Scale the amount settled by the
567581
// connector back up to our scale
568582
result(connector_amount.normalize_scale(ConvertDetails {

crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,12 @@ pub struct TestStore {
6161
pub last_observed_block: Arc<RwLock<U256>>,
6262
pub saved_hashes: Arc<RwLock<HashMap<H256, bool>>>,
6363
pub cache_hits: Arc<RwLock<u64>>,
64-
pub leftovers: Arc<RwLock<HashMap<String, String>>>,
64+
pub leftovers: Arc<RwLock<HashMap<String, BigUint>>>,
6565
}
6666

6767
use crate::stores::LeftoversStore;
6868
use num_bigint::BigUint;
69+
use num_traits::Zero;
6970

7071
impl LeftoversStore for TestStore {
7172
fn save_leftovers(
@@ -74,9 +75,18 @@ impl LeftoversStore for TestStore {
7475
leftovers: BigUint,
7576
) -> Box<Future<Item = (), Error = ()> + Send> {
7677
let mut guard = self.leftovers.write();
77-
(*guard).insert(account_id, leftovers.to_string());
78+
(*guard).insert(account_id, leftovers);
7879
Box::new(ok(()))
7980
}
81+
82+
fn load_leftovers(&self, account_id: String) -> Box<Future<Item = BigUint, Error = ()> + Send> {
83+
let mut guard = self.leftovers.read();
84+
if let Some(l) = guard.get(&account_id) {
85+
Box::new(ok(l.clone()))
86+
} else {
87+
Box::new(err(()))
88+
}
89+
}
8090
}
8191

8292
impl EthereumStore for TestStore {

crates/interledger-settlement-engines/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![type_length_limit = "1739269"]
1+
#![type_length_limit = "3396805"]
22

33
use clap::{value_t, App, Arg, SubCommand};
44
use hex;

crates/interledger-settlement-engines/src/stores/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,10 @@ pub trait LeftoversStore {
3737
account_id: String,
3838
leftovers: BigUint,
3939
) -> Box<dyn Future<Item = (), Error = ()> + Send>;
40+
41+
/// Saves the leftover data
42+
fn load_leftovers(
43+
&self,
44+
account_id: String,
45+
) -> Box<dyn Future<Item = BigUint, Error = ()> + Send>;
4046
}

crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use futures::{
66
use ethereum_tx_sign::web3::types::{Address as EthAddress, H256, U256};
77
use interledger_service::Account as AccountTrait;
88
use std::collections::HashMap;
9+
use std::str::FromStr;
910

1011
use crate::engines::ethereum_ledger::{EthereumAccount, EthereumAddresses, EthereumStore};
1112
use redis::{self, cmd, r#async::SharedConnection, ConnectionInfo, PipelineCommands, Value};
@@ -122,6 +123,26 @@ impl LeftoversStore for EthereumLedgerRedisStore {
122123
.and_then(move |(_conn, _ret): (_, Value)| Ok(())),
123124
)
124125
}
126+
127+
fn load_leftovers(
128+
&self,
129+
account_id: String,
130+
) -> Box<dyn Future<Item = BigUint, Error = ()> + Send> {
131+
let mut pipe = redis::pipe();
132+
pipe.get(format!("leftovers:{}", account_id))
133+
.ignore();
134+
Box::new(
135+
pipe.query_async(self.connection.clone())
136+
.map_err(move |err| error!("Error loading leftovers {:?}: ", err))
137+
.and_then(move |(_conn, leftovers): (_, String)| {
138+
if let Ok(leftovers) = BigUint::from_str(&leftovers) {
139+
Box::new(ok(leftovers))
140+
} else {
141+
Box::new(err(()))
142+
}
143+
})
144+
)
145+
}
125146
}
126147

127148
impl EthereumStore for EthereumLedgerRedisStore {

0 commit comments

Comments
 (0)