@@ -17,6 +17,7 @@ use hyper::StatusCode;
1717use interledger_store_redis:: RedisStoreBuilder ;
1818use log:: info;
1919use num_bigint:: BigUint ;
20+ use num_traits:: Zero ;
2021use redis:: IntoConnectionInfo ;
2122use reqwest:: r#async:: { Client , Response as HttpResponse } ;
2223use ring:: { digest, hmac} ;
@@ -490,11 +491,11 @@ where
490491 . push ( "settlements" ) ;
491492 let client = Client :: new ( ) ;
492493 debug ! ( "Making POST to {:?} {:?} about {:?}" , url, amount, tx_hash) ;
493- let store = self . store . clone ( ) ;
494+ let self_clone = self . clone ( ) ;
494495 let action = move || {
495- // need to make 2 clones, one to own the store in the function
496+ // need to make 2 clones, one to own the variables in the function
496497 // and one for the retry closure..
497- let store = store . clone ( ) ;
498+ let self_clone = self_clone . clone ( ) ;
498499 let account_id = account_id. clone ( ) ;
499500 let account_id_clone2 = account_id. clone ( ) ;
500501 client
@@ -510,64 +511,7 @@ where
510511 } )
511512 . and_then ( move |response| {
512513 trace ! ( "Accounting system responded with {:?}" , response) ;
513- if !response. status ( ) . is_success ( ) {
514- return Either :: A ( err ( ( ) ) ) ;
515- }
516- Either :: B (
517- response
518- . into_body ( )
519- . concat2 ( )
520- . map_err ( |err| {
521- let err = format ! ( "Couldn't retrieve body {:?}" , err) ;
522- error ! ( "{}" , err) ;
523- } )
524- . and_then ( move |body| {
525- // Get the amount stored by the connector and
526- // check for any precision loss / overflow
527- serde_json:: from_slice :: < Quantity > ( & body) . map_err ( |err| {
528- let err = format ! (
529- "Couldn't parse body {:?} into Quantity {:?}" ,
530- body, err
531- ) ;
532- error ! ( "{}" , err) ;
533- } )
534- } )
535- . and_then ( move |quantity : Quantity | {
536- // Convert both to BigUInts so we can do arithmetic
537- join_all ( vec ! [
538- result( BigUint :: from_str( & quantity. amount) ) ,
539- result( BigUint :: from_str( & amount. to_string( ) ) ) ,
540- ] )
541- . map_err ( |err| {
542- let error_msg =
543- format ! ( "Error converting to BigUints {:?}" , err) ;
544- error ! ( "{:?}" , error_msg) ;
545- } )
546- . and_then (
547- move |amounts : Vec < BigUint > | {
548- let connector_amount = amounts[ 0 ] . clone ( ) ;
549- let engine_amount = amounts[ 1 ] . clone ( ) ;
550- // Scale the amount settled by the
551- // connector back up to our scale
552- let scaled_connector_amount = connector_amount
553- . normalize_scale ( ConvertDetails {
554- from : quantity. scale ,
555- to : engine_scale,
556- } ) ;
557-
558- let diff: BigUint = engine_amount - scaled_connector_amount;
559- if diff > BigUint :: from ( 0u32 ) {
560- // connector settled less than we
561- // instructed it to, so we must save
562- // the difference for the leftovers
563- Either :: A ( store. save_leftovers ( account_id_clone2, diff) )
564- } else {
565- Either :: B ( ok ( ( ) ) )
566- }
567- } ,
568- )
569- } ) ,
570- )
514+ self_clone. process_connector_response ( account_id_clone2, response, amount)
571515 } )
572516 } ;
573517 Retry :: spawn (
@@ -579,6 +523,68 @@ where
579523 } )
580524 }
581525
526+ fn process_connector_response (
527+ & self ,
528+ account_id : String ,
529+ response : HttpResponse ,
530+ amount : U256 ,
531+ ) -> impl Future < Item = ( ) , Error = ( ) > {
532+ let engine_scale = self . asset_scale ;
533+ let store = self . store . clone ( ) ;
534+ if !response. status ( ) . is_success ( ) {
535+ return Either :: A ( err ( ( ) ) ) ;
536+ }
537+ Either :: B (
538+ response
539+ . into_body ( )
540+ . concat2 ( )
541+ . map_err ( |err| {
542+ let err = format ! ( "Couldn't retrieve body {:?}" , err) ;
543+ error ! ( "{}" , err) ;
544+ } )
545+ . and_then ( move |body| {
546+ // Get the amount stored by the connector and
547+ // check for any precision loss / overflow
548+ serde_json:: from_slice :: < Quantity > ( & body) . map_err ( |err| {
549+ let err = format ! ( "Couldn't parse body {:?} into Quantity {:?}" , body, err) ;
550+ error ! ( "{}" , err) ;
551+ } )
552+ } )
553+ . and_then ( move |quantity : Quantity | {
554+ // Convert both to BigUInts so we can do arithmetic
555+ join_all ( vec ! [
556+ result( BigUint :: from_str( & quantity. amount) ) ,
557+ result( BigUint :: from_str( & amount. to_string( ) ) ) ,
558+ ] )
559+ . map_err ( |err| {
560+ let error_msg = format ! ( "Error converting to BigUints {:?}" , err) ;
561+ error ! ( "{:?}" , error_msg) ;
562+ } )
563+ . and_then ( move |amounts : Vec < BigUint > | {
564+ let connector_amount = amounts[ 0 ] . clone ( ) ;
565+ let engine_amount = amounts[ 1 ] . clone ( ) ;
566+ // Scale the amount settled by the
567+ // connector back up to our scale
568+ result ( connector_amount. normalize_scale ( ConvertDetails {
569+ from : quantity. scale ,
570+ to : engine_scale,
571+ } ) )
572+ . and_then ( move |scaled_connector_amount| {
573+ let diff: BigUint = engine_amount - scaled_connector_amount;
574+ if diff > Zero :: zero ( ) {
575+ // connector settled less than we
576+ // instructed it to, so we must save
577+ // the difference for the leftovers
578+ Either :: A ( store. save_leftovers ( account_id, diff) )
579+ } else {
580+ Either :: B ( ok ( ( ) ) )
581+ }
582+ } )
583+ } )
584+ } ) ,
585+ )
586+ }
587+
582588 /// Helper function which submits an Ethereum ledger transaction to `to` for `amount`.
583589 /// If called with `token_address`, it makes an ERC20 transaction instead.
584590 /// Due to the lack of an API to create and sign the transaction
0 commit comments