@@ -35,6 +35,7 @@ use url::Url;
35
35
use uuid:: Uuid ;
36
36
37
37
use crate :: stores:: redis_ethereum_ledger:: * ;
38
+ use crate :: stores:: LeftoversStore ;
38
39
use crate :: { ApiResponse , CreateAccount , SettlementEngine , SettlementEngineApi } ;
39
40
use interledger_settlement:: { Convert , ConvertDetails , Quantity } ;
40
41
@@ -100,7 +101,7 @@ pub struct EthereumLedgerSettlementEngineBuilder<'a, S, Si, A> {
100
101
101
102
impl < ' a , S , Si , A > EthereumLedgerSettlementEngineBuilder < ' a , S , Si , A >
102
103
where
103
- S : EthereumStore < Account = A > + Clone + Send + Sync + ' static ,
104
+ S : EthereumStore < Account = A > + LeftoversStore + Clone + Send + Sync + ' static ,
104
105
Si : EthereumLedgerTxSigner + Clone + Send + Sync + ' static ,
105
106
A : EthereumAccount + Send + Sync + ' static ,
106
107
{
@@ -222,7 +223,7 @@ where
222
223
223
224
impl < S , Si , A > EthereumLedgerSettlementEngine < S , Si , A >
224
225
where
225
- S : EthereumStore < Account = A > + Clone + Send + Sync + ' static ,
226
+ S : EthereumStore < Account = A > + LeftoversStore + Clone + Send + Sync + ' static ,
226
227
Si : EthereumLedgerTxSigner + Clone + Send + Sync + ' static ,
227
228
A : EthereumAccount + Send + Sync + ' static ,
228
229
{
@@ -489,8 +490,13 @@ where
489
490
. push ( "settlements" ) ;
490
491
let client = Client :: new ( ) ;
491
492
debug ! ( "Making POST to {:?} {:?} about {:?}" , url, amount, tx_hash) ;
493
+ let store = self . store . clone ( ) ;
492
494
let action = move || {
495
+ // need to make 2 clones, one to own the store in the function
496
+ // and one for the retry closure..
497
+ let store = store. clone ( ) ;
493
498
let account_id = account_id. clone ( ) ;
499
+ let account_id_clone2 = account_id. clone ( ) ;
494
500
client
495
501
. post ( url. clone ( ) )
496
502
. header ( "Idempotency-Key" , tx_hash. to_string ( ) )
@@ -504,11 +510,64 @@ where
504
510
} )
505
511
. and_then ( move |response| {
506
512
trace ! ( "Accounting system responded with {:?}" , response) ;
507
- if response. status ( ) . is_success ( ) {
508
- Ok ( ( ) )
509
- } else {
510
- Err ( ( ) )
513
+ if !response. status ( ) . is_success ( ) {
514
+ return Either :: A ( err ( ( ) ) ) ;
511
515
}
516
+ Either :: B (
517
+ response
518
+ . into_body ( )
519
+ . concat2 ( )
520
+ . map_err ( |err| {
521
+ let err = format ! ( "Couldn't retrieve body {:?}" , err) ;
522
+ error ! ( "{}" , err) ;
523
+ } )
524
+ . and_then ( move |body| {
525
+ // Get the amount stored by the connector and
526
+ // check for any precision loss / overflow
527
+ serde_json:: from_slice :: < Quantity > ( & body) . map_err ( |err| {
528
+ let err = format ! (
529
+ "Couldn't parse body {:?} into Quantity {:?}" ,
530
+ body, err
531
+ ) ;
532
+ error ! ( "{}" , err) ;
533
+ } )
534
+ } )
535
+ . and_then ( move |quantity : Quantity | {
536
+ // Convert both to BigUInts so we can do arithmetic
537
+ join_all ( vec ! [
538
+ result( BigUint :: from_str( & quantity. amount) ) ,
539
+ result( BigUint :: from_str( & amount. to_string( ) ) ) ,
540
+ ] )
541
+ . map_err ( |err| {
542
+ let error_msg =
543
+ format ! ( "Error converting to BigUints {:?}" , err) ;
544
+ error ! ( "{:?}" , error_msg) ;
545
+ } )
546
+ . and_then (
547
+ move |amounts : Vec < BigUint > | {
548
+ let connector_amount = amounts[ 0 ] . clone ( ) ;
549
+ let engine_amount = amounts[ 1 ] . clone ( ) ;
550
+ // Scale the amount settled by the
551
+ // connector back up to our scale
552
+ let scaled_connector_amount = connector_amount
553
+ . normalize_scale ( ConvertDetails {
554
+ from : quantity. scale ,
555
+ to : engine_scale,
556
+ } ) ;
557
+
558
+ let diff: BigUint = engine_amount - scaled_connector_amount;
559
+ if diff > BigUint :: from ( 0u32 ) {
560
+ // connector settled less than we
561
+ // instructed it to, so we must save
562
+ // the difference for the leftovers
563
+ Either :: A ( store. save_leftovers ( account_id_clone2, diff) )
564
+ } else {
565
+ Either :: B ( ok ( ( ) ) )
566
+ }
567
+ } ,
568
+ )
569
+ } ) ,
570
+ )
512
571
} )
513
572
} ;
514
573
Retry :: spawn (
@@ -611,7 +670,7 @@ where
611
670
612
671
impl < S , Si , A > SettlementEngine for EthereumLedgerSettlementEngine < S , Si , A >
613
672
where
614
- S : EthereumStore < Account = A > + Clone + Send + Sync + ' static ,
673
+ S : EthereumStore < Account = A > + LeftoversStore + Clone + Send + Sync + ' static ,
615
674
Si : EthereumLedgerTxSigner + Clone + Send + Sync + ' static ,
616
675
A : EthereumAccount + Send + Sync + ' static ,
617
676
{
0 commit comments