diff --git a/payjoin-ffi/src/receive/mod.rs b/payjoin-ffi/src/receive/mod.rs index 47d9833f0..38d1c86aa 100644 --- a/payjoin-ffi/src/receive/mod.rs +++ b/payjoin-ffi/src/receive/mod.rs @@ -711,6 +711,12 @@ pub trait CanBroadcast: Send + Sync { fn callback(&self, tx: Vec) -> Result; } +#[uniffi::export(with_foreign)] +#[async_trait::async_trait] +pub trait CanBroadcastAsync: Send + Sync { + async fn callback(&self, tx: Vec) -> Result; +} + #[uniffi::export] impl UncheckedOriginalPayload { pub fn check_broadcast_suitability( @@ -728,6 +734,28 @@ impl UncheckedOriginalPayload { ))))) } + pub async fn check_broadcast_suitability_async( + &self, + min_fee_rate: Option, + can_broadcast: Arc, + ) -> Result { + let min_fee_rate = validate_fee_rate_sat_per_kwu_opt(min_fee_rate)?; + Ok(UncheckedOriginalPayloadTransition(Arc::new(RwLock::new(Some( + self.0 + .clone() + .check_broadcast_suitability_async(min_fee_rate, |transaction| { + let bytes = payjoin::bitcoin::consensus::encode::serialize(transaction); + async { + can_broadcast + .callback(bytes) + .await + .map_err(|e| ImplementationError::new(e).into()) + } + }) + .await, + ))))) + } + /// Call this method if the only way to initiate a Payjoin with this receiver /// requires manual intervention, as in most consumer wallets. /// @@ -775,6 +803,12 @@ pub trait IsScriptOwned: Send + Sync { fn callback(&self, script: Vec) -> Result; } +#[uniffi::export(with_foreign)] +#[async_trait::async_trait] +pub trait IsScriptOwnedAsync: Send + Sync { + async fn callback(&self, script: Vec) -> Result; +} + #[uniffi::export] impl MaybeInputsOwned { ///The Sender’s Original PSBT @@ -783,6 +817,7 @@ impl MaybeInputsOwned { &self.0.clone().extract_tx_to_schedule_broadcast(), ) } + pub fn check_inputs_not_owned( &self, is_owned: Arc, @@ -793,6 +828,26 @@ impl MaybeInputsOwned { }), )))) } + + pub async fn check_inputs_not_owned_async( + &self, + is_owned: Arc, + ) -> MaybeInputsOwnedTransition { + MaybeInputsOwnedTransition(Arc::new(RwLock::new(Some( + self.0 + .clone() + .check_inputs_not_owned_async(&mut |input| { + let bytes = input.to_bytes(); + async { + is_owned + .callback(bytes) + .await + .map_err(|e| ImplementationError::new(e).into()) + } + }) + .await, + )))) + } } #[derive(Clone, uniffi::Object)] @@ -830,6 +885,12 @@ pub trait IsOutputKnown: Send + Sync { fn callback(&self, outpoint: OutPoint) -> Result; } +#[uniffi::export(with_foreign)] +#[async_trait::async_trait] +pub trait IsOutputKnownAsync: Send + Sync { + async fn callback(&self, outpoint: OutPoint) -> Result; +} + #[uniffi::export] impl MaybeInputsSeen { pub fn check_no_inputs_seen_before( @@ -844,6 +905,26 @@ impl MaybeInputsSeen { }), )))) } + + pub async fn check_no_inputs_seen_before_async( + &self, + is_known: Arc, + ) -> MaybeInputsSeenTransition { + MaybeInputsSeenTransition(Arc::new(RwLock::new(Some( + self.0 + .clone() + .check_no_inputs_seen_before_async(&mut |outpoint| { + let plain_outpoint = OutPoint::from(*outpoint); + async { + is_known + .callback(plain_outpoint) + .await + .map_err(|e| ImplementationError::new(e).into()) + } + }) + .await, + )))) + } } /// The receiver has not yet identified which outputs belong to the receiver. @@ -893,6 +974,26 @@ impl OutputsUnknown { }), )))) } + + pub async fn identify_receiver_outputs_async( + &self, + is_receiver_output: Arc, + ) -> OutputsUnknownTransition { + OutputsUnknownTransition(Arc::new(RwLock::new(Some( + self.0 + .clone() + .identify_receiver_outputs_async(&mut |input| { + let bytes = input.to_bytes(); + async { + is_receiver_output + .callback(bytes) + .await + .map_err(|e| ImplementationError::new(e).into()) + } + }) + .await, + )))) + } } #[derive(uniffi::Object)] @@ -1160,6 +1261,12 @@ pub trait ProcessPsbt: Send + Sync { fn callback(&self, psbt: String) -> Result; } +#[uniffi::export(with_foreign)] +#[async_trait::async_trait] +pub trait ProcessPsbtAsync: Send + Sync { + async fn callback(&self, psbt: String) -> Result; +} + #[uniffi::export] impl ProvisionalProposal { pub fn finalize_proposal( @@ -1176,6 +1283,27 @@ impl ProvisionalProposal { )))) } + pub async fn finalize_proposal_async( + &self, + process_psbt: Arc, + ) -> ProvisionalProposalTransition { + ProvisionalProposalTransition(Arc::new(RwLock::new(Some( + self.0 + .clone() + .finalize_proposal_async(|pre_processed| { + let string = pre_processed.to_string(); + async { + let psbt = process_psbt + .callback(string) + .await + .map_err(ImplementationError::new)?; + Ok(Psbt::from_str(&psbt).map_err(ImplementationError::new)?) + } + }) + .await, + )))) + } + pub fn psbt_to_sign(&self) -> String { self.0.clone().psbt_to_sign().to_string() } } @@ -1358,6 +1486,12 @@ pub trait TransactionExists: Send + Sync { fn callback(&self, txid: String) -> Result>, ForeignError>; } +#[uniffi::export(with_foreign)] +#[async_trait::async_trait] +pub trait TransactionExistsAsync: Send + Sync { + async fn callback(&self, txid: String) -> Result>, ForeignError>; +} + #[allow(clippy::type_complexity)] #[derive(uniffi::Object)] pub struct MonitorTransition( @@ -1433,6 +1567,27 @@ impl Monitor { .map_err(|e| ImplementationError::new(e).into()) }))))) } + + pub async fn monitor_async( + &self, + transaction_exists: Arc, + ) -> MonitorTransition { + MonitorTransition(Arc::new(RwLock::new(Some( + self.0 + .clone() + .check_payment_async(|txid| { + let string = txid.to_string(); + async { + transaction_exists + .callback(string) + .await + .and_then(|buf| buf.map(try_deserialize_tx).transpose()) + .map_err(|e| ImplementationError::new(e).into()) + } + }) + .await, + )))) + } } /// Session persister that should save and load events as JSON strings. diff --git a/payjoin/src/core/psbt/mod.rs b/payjoin/src/core/psbt/mod.rs index c02eaef59..c4adf031a 100644 --- a/payjoin/src/core/psbt/mod.rs +++ b/payjoin/src/core/psbt/mod.rs @@ -39,7 +39,7 @@ pub(crate) trait PsbtExt: Sized { ) -> &mut BTreeMap; fn proprietary_mut(&mut self) -> &mut BTreeMap>; fn unknown_mut(&mut self) -> &mut BTreeMap>; - fn input_pairs(&self) -> Box> + '_>; + fn input_pairs(&self) -> Box> + Send + '_>; // guarantees that length of psbt input matches that of unsigned_tx inputs and same /// thing for outputs. fn validate(self) -> Result; @@ -63,7 +63,7 @@ impl PsbtExt for Psbt { fn unknown_mut(&mut self) -> &mut BTreeMap> { &mut self.unknown } - fn input_pairs(&self) -> Box> + '_> { + fn input_pairs(&self) -> Box> + Send + '_> { Box::new( self.unsigned_tx .input diff --git a/payjoin/src/core/receive/mod.rs b/payjoin/src/core/receive/mod.rs index f102d743a..f2e618471 100644 --- a/payjoin/src/core/receive/mod.rs +++ b/payjoin/src/core/receive/mod.rs @@ -10,6 +10,7 @@ //! version 1, refer to the `receive::v1` module documentation after enabling the `v1` feature. use std::collections::BTreeMap; +use std::future::Future; use std::str::FromStr; use bitcoin::transaction::InputWeightPrediction; @@ -351,6 +352,55 @@ impl PsbtContext { let payjoin_proposal = self.prepare_psbt(signed_psbt); Ok(payjoin_proposal) } + + /// Finalizes the Payjoin proposal into a PSBT which the sender will find acceptable before + /// they sign the transaction and broadcast it to the network. + /// + /// Finalization consists of two steps: + /// 1. Remove all sender signatures which were received with the original PSBT as these signatures are now invalid. + /// 2. Sign and finalize the resulting PSBT using the passed `wallet_process_psbt` signing function. + async fn to_sync_wallet_process_psbt( + &self, + wallet_process_psbt: impl Fn(&Psbt) -> F, + ) -> Result Result, ImplementationError> + where + F: Future>, + { + let psbt = self.psbt_to_sign(); + let signed_psbt = wallet_process_psbt(&psbt).await?; + Ok(move |_: &Psbt| Ok(signed_psbt.clone())) + } +} + +pub(crate) async fn compute_async_results( + values: Vec, + async_fn: &mut impl FnMut(&T) -> F, +) -> Result, ImplementationError> +where + F: Future>, +{ + let mut results = vec![]; + for value in values { + let result = async_fn(&value).await?; + results.push((value, result)); + } + Ok(results) +} + +pub(crate) fn make_sync_fn( + results: Vec<(T, O)>, + map: impl Fn(&T) -> &U, +) -> impl Fn(&U) -> Result +where + U: PartialEq + ?Sized, + O: Clone, +{ + move |value| { + results + .iter() + .find_map(|(v, r)| if map(v) == value { Some((*r).clone()) } else { None }) + .ok_or(ImplementationError::from("All values should exist")) + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -368,20 +418,25 @@ impl OriginalPayload { Ok(original_psbt_fee / self.psbt.clone().extract_tx_unchecked_fee_rate().weight()) } + fn check_min_fee_rate(&self, min_fee_rate: FeeRate) -> Result<(), Error> { + let original_psbt_fee_rate = self.psbt_fee_rate()?; + if original_psbt_fee_rate < min_fee_rate { + return Err(InternalPayloadError::PsbtBelowFeeRate( + original_psbt_fee_rate, + min_fee_rate, + ) + .into()); + } + Ok(()) + } + pub fn check_broadcast_suitability( &self, min_fee_rate: Option, can_broadcast: impl Fn(&bitcoin::Transaction) -> Result, ) -> Result<(), Error> { - let original_psbt_fee_rate = self.psbt_fee_rate()?; if let Some(min_fee_rate) = min_fee_rate { - if original_psbt_fee_rate < min_fee_rate { - return Err(InternalPayloadError::PsbtBelowFeeRate( - original_psbt_fee_rate, - min_fee_rate, - ) - .into()); - } + self.check_min_fee_rate(min_fee_rate)? } if can_broadcast(&self.psbt.clone().extract_tx_unchecked_fee_rate()) .map_err(Error::Implementation)? @@ -392,6 +447,41 @@ impl OriginalPayload { } } + pub(crate) async fn to_sync_can_broadcast( + &self, + can_broadcast: impl Fn(&bitcoin::Transaction) -> F, + ) -> Result Result, ImplementationError> + where + F: Future>, + { + let tx = self.psbt.clone().extract_tx_unchecked_fee_rate(); + let result = can_broadcast(&tx).await?; + Ok(move |_: &Transaction| Ok(result)) + } + + pub async fn check_broadcast_suitability_async( + &self, + min_fee_rate: Option, + can_broadcast: impl Fn(&bitcoin::Transaction) -> F, + ) -> Result<(), Error> + where + F: Future>, + { + let can_broadcast_sync = + self.to_sync_can_broadcast(can_broadcast).await.map_err(Error::Implementation)?; + self.check_broadcast_suitability(min_fee_rate, can_broadcast_sync) + } + + fn input_scripts(&self) -> Result, InternalPayloadError> { + self.psbt + .input_pairs() + .map(|input| match input.previous_txout() { + Ok(txout) => Ok(txout.script_pubkey.to_owned()), + Err(e) => Err(InternalPayloadError::PrevTxOut(e)), + }) + .collect() + } + /// Check that the original PSBT has no receiver-owned inputs. /// /// An attacker can try to spend the receiver's own inputs. This check prevents that. @@ -399,64 +489,93 @@ impl OriginalPayload { &self, is_owned: &mut impl FnMut(&Script) -> Result, ) -> Result<(), Error> { - let mut err: Result<(), Error> = Ok(()); - if let Some(e) = self - .psbt - .input_pairs() - .scan(&mut err, |err, input| match input.previous_txout() { - Ok(txout) => Some(txout.script_pubkey.to_owned()), - Err(e) => { - **err = Err(InternalPayloadError::PrevTxOut(e).into()); - None - } - }) - .find_map(|script| match is_owned(&script) { - Ok(false) => None, - Ok(true) => Some(InternalPayloadError::InputOwned(script).into()), - Err(e) => Some(Error::Implementation(e)), - }) - { - return Err(e); + for script in self.input_scripts()? { + match is_owned(&script) { + Ok(false) => continue, + Ok(true) => return Err(InternalPayloadError::InputOwned(script).into()), + Err(e) => return Err(Error::Implementation(e)), + } } - err?; Ok(()) } + pub(crate) async fn to_sync_is_owned( + &self, + is_owned: &mut impl FnMut(&Script) -> F, + ) -> Result Result, Error> + where + F: Future>, + { + let results = + compute_async_results(self.input_scripts()?, &mut |s: &ScriptBuf| is_owned(s.as_ref())) + .await + .map_err(Error::Implementation)?; + Ok(make_sync_fn(results, |s: &ScriptBuf| s.as_script())) + } + + /// Check that the original PSBT has no receiver-owned inputs. + /// + /// An attacker can try to spend the receiver's own inputs. This check prevents that. + pub async fn check_inputs_not_owned_async( + &self, + is_owned: &mut impl FnMut(&Script) -> F, + ) -> Result<(), Error> + where + F: Future>, + { + let mut is_owned_sync = self.to_sync_is_owned(is_owned).await?; + self.check_inputs_not_owned(&mut is_owned_sync) + } + + fn input_outpoints(&self) -> Vec { + self.psbt.input_pairs().map(|input| input.txin.previous_output).collect() + } + pub fn check_no_inputs_seen_before( &self, is_known: &mut impl FnMut(&OutPoint) -> Result, ) -> Result<(), Error> { - self.psbt.input_pairs().try_for_each(|input| { - match is_known(&input.txin.previous_output) { - Ok(false) => Ok::<(), Error>(()), - Ok(true) => { - tracing::warn!("Request contains an input we've seen before: {}. Preventing possible probing attack.", input.txin.previous_output); - Err(InternalPayloadError::InputSeen(input.txin.previous_output))? - }, + for outpoint in self.input_outpoints() { + match is_known(&outpoint) { + Ok(false) => continue, + Ok(true) => { + tracing::warn!("Request contains an input we've seen before: {}. Preventing possible probing attack.", outpoint); + Err(InternalPayloadError::InputSeen(outpoint))? + } Err(e) => Err(Error::Implementation(e))?, } - })?; + } Ok(()) } - pub fn identify_receiver_outputs( - self, - is_receiver_output: &mut impl FnMut(&Script) -> Result, - ) -> Result { - let owned_vouts: Vec = self - .psbt - .unsigned_tx - .output - .iter() - .enumerate() - .filter_map(|(vout, txo)| match is_receiver_output(&txo.script_pubkey) { - Ok(true) => Some(Ok(vout)), - Ok(false) => None, - Err(e) => Some(Err(e)), - }) - .collect::, _>>() - .map_err(Error::Implementation)?; + pub(crate) async fn to_sync_is_known( + &self, + is_known: &mut impl FnMut(&OutPoint) -> F, + ) -> Result Result, ImplementationError> + where + F: Future>, + { + let results = compute_async_results(self.input_outpoints(), is_known).await?; + Ok(make_sync_fn(results, |o| o)) + } + + pub async fn check_no_inputs_seen_before_async( + &self, + is_known: &mut impl FnMut(&OutPoint) -> F, + ) -> Result<(), Error> + where + F: Future>, + { + let mut is_known_sync = + self.to_sync_is_known(is_known).await.map_err(Error::Implementation)?; + self.check_no_inputs_seen_before(&mut is_known_sync) + } + fn output_scripts(&self) -> Vec { + self.psbt.unsigned_tx.output.iter().map(|output| output.script_pubkey.to_owned()).collect() + } + + fn process_owned_outputs(self, owned_vouts: Vec) -> Result { if owned_vouts.is_empty() { return Err(InternalPayloadError::MissingPayment.into()); } @@ -473,6 +592,50 @@ impl OriginalPayload { let original_payload = OriginalPayload { params, ..self.clone() }; Ok(common::WantsOutputs::new(original_payload, owned_vouts)) } + + pub fn identify_receiver_outputs( + self, + is_receiver_output: &mut impl FnMut(&Script) -> Result, + ) -> Result { + let mut owned_vouts: Vec = Vec::new(); + for (vout, script_buf) in self.output_scripts().iter().enumerate() { + match is_receiver_output(script_buf) { + Ok(true) => owned_vouts.push(vout), + Ok(false) => continue, + Err(e) => return Err(Error::Implementation(e)), + } + } + + self.process_owned_outputs(owned_vouts) + } + + pub(crate) async fn to_sync_is_receiver_output( + &self, + is_receiver_output: &mut impl FnMut(&Script) -> F, + ) -> Result Result, ImplementationError> + where + F: Future>, + { + let results = compute_async_results(self.output_scripts(), &mut |s: &ScriptBuf| { + is_receiver_output(s.as_ref()) + }) + .await?; + Ok(make_sync_fn(results, |s: &ScriptBuf| s.as_script())) + } + + pub async fn identify_receiver_outputs_async( + self, + is_receiver_output: &mut impl FnMut(&Script) -> F, + ) -> Result + where + F: Future>, + { + let mut is_receiver_output_sync = self + .to_sync_is_receiver_output(is_receiver_output) + .await + .map_err(Error::Implementation)?; + self.identify_receiver_outputs(&mut is_receiver_output_sync) + } } #[cfg(test)] @@ -486,7 +649,9 @@ pub(crate) mod tests { witness, Amount, PubkeyHash, ScriptBuf, ScriptHash, Sequence, Txid, WScriptHash, XOnlyPublicKey, }; - use payjoin_test_utils::{DUMMY20, DUMMY32, PARSED_ORIGINAL_PSBT, QUERY_PARAMS}; + use payjoin_test_utils::{ + DUMMY20, DUMMY32, PARSED_ORIGINAL_PSBT, PARSED_PAYJOIN_PROPOSAL, QUERY_PARAMS, + }; use super::*; use crate::psbt::InternalPsbtInputError::InvalidScriptPubKey; @@ -498,6 +663,24 @@ pub(crate) mod tests { OriginalPayload { psbt: PARSED_ORIGINAL_PSBT.clone(), params } } + pub(crate) fn original_missing_prevtxout_from_test_vector() -> OriginalPayload { + let params = Params::from_query_str(QUERY_PARAMS, &[Version::One]) + .expect("Could not parse params from query str"); + let mut psbt: Psbt = PARSED_ORIGINAL_PSBT.clone(); + for psbtin in psbt.inputs_mut() { + psbtin.non_witness_utxo = None; + psbtin.witness_utxo = None; + } + OriginalPayload { psbt: psbt.clone(), params } + } + + pub(crate) fn psbt_context_from_test_vector() -> PsbtContext { + PsbtContext { + payjoin_psbt: PARSED_PAYJOIN_PROPOSAL.clone(), + original_psbt: PARSED_ORIGINAL_PSBT.clone(), + } + } + #[test] fn input_pair_with_expected_weight() { let p2wsh_txout = TxOut { @@ -832,6 +1015,141 @@ pub(crate) mod tests { assert_eq!(err, PsbtInputError::from(InternalPsbtInputError::ProvidedUnnecessaryWeight)); } + #[test] + fn test_check_broadcast_suitability() { + let original = original_from_test_vector(); + + // Outcome 1: min_fee_rate too high → PsbtBelowFeeRate error + let err = original + .clone() + .check_broadcast_suitability(Some(FeeRate::MAX), |_| Ok(true)) + .expect_err("Should fail when fee rate is below minimum"); + match err { + Error::Protocol(ProtocolError::OriginalPayload(PayloadError( + InternalPayloadError::PsbtBelowFeeRate(original_fee_rate, min_fee_rate), + ))) => { + assert_eq!(original_fee_rate, original.psbt_fee_rate().unwrap()); + assert_eq!(min_fee_rate, FeeRate::MAX); + } + _ => panic!("Expected PsbtBelowFeeRate error, got: {err:?}"), + } + + // Outcome 2: can_broadcast returns false → OriginalPsbtNotBroadcastable error + let err = original + .clone() + .check_broadcast_suitability(None, |_| Ok(false)) + .expect_err("Should fail when can_broadcast returns false"); + match err { + Error::Protocol(ProtocolError::OriginalPayload(PayloadError( + InternalPayloadError::OriginalPsbtNotBroadcastable, + ))) => {} + _ => panic!("Expected OriginalPsbtNotBroadcastable error, got: {err:?}"), + } + + // Outcome 3: can_broadcast returns an implementation error → Error::Implementation + let err = original + .clone() + .check_broadcast_suitability(None, |_| { + Err(ImplementationError::from("broadcast check failed")) + }) + .expect_err("Should fail when can_broadcast returns an implementation error"); + match err { + Error::Implementation(error_message) => { + assert_eq!(error_message.to_string(), "broadcast check failed".to_string()) + } + _ => panic!("Expected Error::Implementation, got: {err:?}"), + } + + // Outcome 4: success + original + .check_broadcast_suitability(None, |_| Ok(true)) + .expect("Should succeed when fee rate is acceptable and can_broadcast returns true"); + } + + #[test] + fn test_check_inputs_not_owned() { + let original = original_from_test_vector(); + let original_missing_prevtxout = original_missing_prevtxout_from_test_vector(); + + // Outcome 1: input_scripts returns a PrevTxOut error → Protocol error + let err = original_missing_prevtxout + .check_inputs_not_owned(&mut |_| Ok(false)) + .expect_err("Should fail when previous txout is missing"); + match err { + Error::Protocol(ProtocolError::OriginalPayload(PayloadError( + InternalPayloadError::PrevTxOut(_), + ))) => {} + _ => panic!("Expected PrevTxOut error, got: {err:?}"), + } + + // Outcome 2: is_owned returns true → InputOwned error + let err = original + .clone() + .check_inputs_not_owned(&mut |_| Ok(true)) + .expect_err("Should fail when input is owned"); + match err { + Error::Protocol(ProtocolError::OriginalPayload(PayloadError( + InternalPayloadError::InputOwned(_), + ))) => {} + _ => panic!("Expected InputOwned error, got: {err:?}"), + } + + // Outcome 3: is_owned returns an implementation error → Error::Implementation + let err = original + .clone() + .check_inputs_not_owned(&mut |_| { + Err(ImplementationError::from("ownership check failed")) + }) + .expect_err("Should fail when is_owned returns an implementation error"); + match err { + Error::Implementation(error_message) => { + assert_eq!(error_message.to_string(), "ownership check failed".to_string()) + } + _ => panic!("Expected Error::Implementation, got: {err:?}"), + } + + // Outcome 4: is_owned returns false → success + original + .check_inputs_not_owned(&mut |_| Ok(false)) + .expect("Should succeed when no inputs are owned"); + } + + #[test] + fn test_check_no_inputs_seen_before() { + let original = original_from_test_vector(); + + // Outcome 1: is_known returns true → InputSeen error + let err = original + .clone() + .check_no_inputs_seen_before(&mut |_| Ok(true)) + .expect_err("Should fail when input has been seen before"); + match err { + Error::Protocol(ProtocolError::OriginalPayload(PayloadError( + InternalPayloadError::InputSeen(_), + ))) => {} + _ => panic!("Expected InputSeen error, got: {err:?}"), + } + + // Outcome 2: is_known returns an implementation error → Error::Implementation + let err = original + .clone() + .check_no_inputs_seen_before(&mut |_| { + Err(ImplementationError::from("input seen check failed")) + }) + .expect_err("Should fail when is_known returns an implementation error"); + match err { + Error::Implementation(error_message) => { + assert_eq!(error_message.to_string(), "input seen check failed".to_string()) + } + _ => panic!("Expected Error::Implementation, got: {err:?}"), + } + + // Outcome 3: is_known returns false → success + original + .check_no_inputs_seen_before(&mut |_| Ok(false)) + .expect("Should succeed when no inputs have been seen before"); + } + #[test] fn test_identify_receiver_outputs() { let original = original_from_test_vector(); @@ -866,4 +1184,32 @@ pub(crate) mod tests { assert_eq!(wants_outputs.owned_vouts, vec![0, 1]); assert_eq!(wants_outputs.params.additional_fee_contribution, None); } + + #[test] + fn test_finalize_proposal() { + let psbt_context = psbt_context_from_test_vector(); + + // Outcome 1: wallet_process_psbt returns an implementation error → ImplementationError + let err = psbt_context + .clone() + .finalize_proposal(|_| Err(ImplementationError::from("wallet signing failed"))) + .expect_err("Should fail when wallet_process_psbt returns an error"); + assert_eq!(err.to_string(), "wallet signing failed"); + + // Outcome 2: wallet_process_psbt returns a psbt with mismatched ntxid → ImplementationError + let psbt_context = psbt_context_from_test_vector(); + let err = psbt_context + .clone() + .finalize_proposal(|_| { + // return a totally different psbt to trigger ntxid mismatch + Ok(PARSED_ORIGINAL_PSBT.clone()) + }) + .expect_err("Should fail when ntxid mismatches"); + assert!(err.to_string().contains("Ntxid mismatch")); + + // Outcome 3: wallet_process_psbt succeeds → Ok(Psbt) + let _psbt = psbt_context + .finalize_proposal(|_| Ok(PARSED_PAYJOIN_PROPOSAL.clone())) + .expect("Should succeed when wallet_process_psbt returns a valid signed psbt"); + } } diff --git a/payjoin/src/core/receive/v1/mod.rs b/payjoin/src/core/receive/v1/mod.rs index 8c659564e..c68da95c8 100644 --- a/payjoin/src/core/receive/v1/mod.rs +++ b/payjoin/src/core/receive/v1/mod.rs @@ -77,16 +77,20 @@ impl UncheckedOriginalPayload { /// received from the sender is broadcastable to the network in the case of a payjoin failure. /// /// The recommended usage of this typestate differs based on whether you are implementing an -/// interactive (where the receiver takes manual actions to respond to the -/// payjoin proposal) or a non-interactive (ex. a donation page which automatically generates a new QR code -/// for each visit) payment receiver. For the latter, you should call [`Self::check_broadcast_suitability`] to check -/// that the proposal is actually broadcastable (and, optionally, whether the fee rate is above the -/// minimum limit you have set). These mechanisms protect the receiver against probing attacks, where -/// a malicious sender can repeatedly send proposals to have the non-interactive receiver reveal the UTXOs -/// it owns with the proposals it modifies. +/// a non-interactive payment receiver (ex. a donation page which automatically generates a new QR code +/// for each visit) or interactive (where the receiver takes manual actions to respond to the +/// payjoin proposal). /// -/// If you are implementing an interactive payment receiver, then such checks are not necessary, and you -/// can go ahead with calling [`Self::assume_interactive_receiver`] to move on to the next typestate. +/// For the former, you should call [`Self::check_broadcast_suitability`] +/// or [`Self::check_broadcast_suitability_async`] to check that the proposal is actually +/// broadcastable (and, optionally, whether the fee rate is above the minimum limit you have set). +/// These mechanisms protect the receiver against probing attacks, where a malicious sender +/// can repeatedly send proposals to have the non-interactive receiver reveal the UTXOs +/// it owns with the proposals it modifies. The difference between the two checks is that +/// the async version takes an asynchronous `can_broadcast` callback function. +/// +/// For the latter, such checks are not necessary and you can go ahead with calling +/// [`Self::assume_interactive_receiver`] to move on to the next typestate. #[derive(Debug, Clone)] pub struct UncheckedOriginalPayload { original: OriginalPayload, @@ -100,6 +104,9 @@ impl UncheckedOriginalPayload { /// as a fallback mechanism in case the payjoin fails. This validation would be equivalent to /// `testmempoolaccept` Bitcoin Core RPC call returning `{"allowed": true,...}`. /// + /// If the `can_broadcast` validation callback needs to be asynchronous + /// [`Self::check_broadcast_suitability_async`] should be used instead. + /// /// Receiver can optionally set a minimum fee rate which will be enforced on the original PSBT in the proposal. /// This can be used to further prevent probing attacks since the attacker would now need to probe the receiver /// with transactions which are both broadcastable and pay high fee. Unrelated to the probing attack scenario, @@ -113,6 +120,33 @@ impl UncheckedOriginalPayload { Ok(MaybeInputsOwned { original: self.original }) } + /// Checks that the original PSBT in the proposal can be broadcasted. + /// + /// If the receiver is a non-interactive payment processor (ex. a donation page which generates + /// a new QR code for each visit), then it should make sure that the original PSBT is broadcastable + /// as a fallback mechanism in case the payjoin fails. This validation would be equivalent to + /// `testmempoolaccept` Bitcoin Core RPC call returning `{"allowed": true,...}`. + /// + /// Receiver can optionally set a minimum fee rate which will be enforced on the original PSBT in the proposal. + /// This can be used to further prevent probing attacks since the attacker would now need to probe the receiver + /// with transactions which are both broadcastable and pay high fee. Unrelated to the probing attack scenario, + /// this parameter also makes operating in a high fee environment easier for the receiver. + pub async fn check_broadcast_suitability_async( + self, + min_fee_rate: Option, + can_broadcast: impl Fn(&bitcoin::Transaction) -> F, + ) -> Result + where + F: Future>, + { + let can_broadcast_sync = self + .original + .to_sync_can_broadcast(can_broadcast) + .await + .map_err(Error::Implementation)?; + self.check_broadcast_suitability(min_fee_rate, can_broadcast_sync) + } + /// Moves on to the next typestate without any of the current typestate's validations. /// /// Use this for interactive payment receivers, where there is no risk of a probing attack since the @@ -128,7 +162,7 @@ impl UncheckedOriginalPayload { /// typestate. The receiver can call [`Self::extract_tx_to_schedule_broadcast`] /// to extract the signed original PSBT to schedule a fallback in case the Payjoin process fails. /// -/// Call [`Self::check_inputs_not_owned`] to proceed. +/// Call [`Self::check_inputs_not_owned`] or [`Self::check_inputs_not_owned_async`] to proceed. #[derive(Debug, Clone)] pub struct MaybeInputsOwned { pub(crate) original: OriginalPayload, @@ -147,6 +181,9 @@ impl MaybeInputsOwned { /// Check that the original PSBT has no receiver-owned inputs. /// /// An attacker can try to spend the receiver's own inputs. This check prevents that. + /// + /// If the `can_broadcast` validation callback needs to be asynchronous + /// [`Self::check_inputs_not_owned_async`] should be used instead. pub fn check_inputs_not_owned( self, is_owned: &mut impl FnMut(&Script) -> Result, @@ -154,11 +191,25 @@ impl MaybeInputsOwned { self.original.check_inputs_not_owned(is_owned)?; Ok(MaybeInputsSeen { original: self.original }) } + + /// Check that the original PSBT has no receiver-owned inputs. + /// + /// An attacker can try to spend the receiver's own inputs. This check prevents that. + pub async fn check_inputs_not_owned_async( + self, + is_owned: &mut impl FnMut(&Script) -> F, + ) -> Result + where + F: Future>, + { + let mut is_owned_sync = self.original.to_sync_is_owned(is_owned).await?; + self.check_inputs_not_owned(&mut is_owned_sync) + } } /// Typestate to check that the original PSBT has no inputs that the receiver has seen before. /// -/// Call [`Self::check_no_inputs_seen_before`] to proceed. +/// Call [`Self::check_no_inputs_seen_before`] or [`Self::check_no_inputs_seen_before_async`] to proceed. #[derive(Debug, Clone)] pub struct MaybeInputsSeen { original: OriginalPayload, @@ -172,6 +223,9 @@ impl MaybeInputsSeen { /// and sending them back to the receiver. /// 2. Re-entrant payjoin, where the sender uses the payjoin PSBT of a previous payjoin as the /// original proposal PSBT of the current, new payjoin. + /// + /// If the `is_known` validation callback needs to be asynchronous + /// [`Self::check_no_inputs_seen_before_async`] should be used instead. pub fn check_no_inputs_seen_before( self, is_known: &mut impl FnMut(&OutPoint) -> Result, @@ -179,6 +233,26 @@ impl MaybeInputsSeen { self.original.check_no_inputs_seen_before(is_known)?; Ok(OutputsUnknown { original: self.original }) } + + /// Check that the receiver has never seen the inputs in the original proposal before. + /// + /// This check prevents the following attacks: + /// 1. Probing attacks, where the sender can use the exact same proposal (or with minimal change) + /// to have the receiver reveal their UTXO set by contributing to all proposals with different inputs + /// and sending them back to the receiver. + /// 2. Re-entrant payjoin, where the sender uses the payjoin PSBT of a previous payjoin as the + /// original proposal PSBT of the current, new payjoin. + pub async fn check_no_inputs_seen_before_async( + self, + is_known: &mut impl FnMut(&OutPoint) -> F, + ) -> Result + where + F: Future>, + { + let mut is_known_sync = + self.original.to_sync_is_known(is_known).await.map_err(Error::Implementation)?; + self.check_no_inputs_seen_before(&mut is_known_sync) + } } /// Typestate to check that the outputs of the original PSBT actually pay to the receiver. @@ -186,7 +260,7 @@ impl MaybeInputsSeen { /// The receiver should only accept the original PSBTs from the sender if it actually sends them /// money. /// -/// Call [`Self::identify_receiver_outputs`] to proceed. +/// Call [`Self::identify_receiver_outputs`] or [`Self::identify_receiver_outputs_async`] to proceed. #[derive(Debug, Clone)] pub struct OutputsUnknown { original: OriginalPayload, @@ -203,6 +277,9 @@ impl OutputsUnknown { /// function sets that parameter to None so that it is ignored in subsequent steps of the /// receiver flow. This protects the receiver from accidentally subtracting fees from their own /// outputs. + /// + /// If the `is_known` validation callback needs to be asynchronous + /// [`Self::identify_receiver_outputs_async`] should be used instead. #[cfg_attr(not(feature = "v1"), allow(dead_code))] pub fn identify_receiver_outputs( self, @@ -210,6 +287,32 @@ impl OutputsUnknown { ) -> Result { self.original.identify_receiver_outputs(is_receiver_output) } + + /// Validates whether the original PSBT contains outputs which pay to the receiver and only + /// then proceeds to the next typestate. + /// + /// Additionally, this function also protects the receiver from accidentally subtracting fees + /// from their own outputs: when a sender is sending a proposal, + /// they can select an output which they want the receiver to subtract fees from to account for + /// the increased transaction size. If a sender specifies a receiver output for this purpose, this + /// function sets that parameter to None so that it is ignored in subsequent steps of the + /// receiver flow. This protects the receiver from accidentally subtracting fees from their own + /// outputs. + #[cfg_attr(not(feature = "v1"), allow(dead_code))] + pub async fn identify_receiver_outputs_async( + self, + is_receiver_output: &mut impl FnMut(&Script) -> F, + ) -> Result + where + F: Future>, + { + let mut is_receiver_output_sync = self + .original + .to_sync_is_receiver_output(is_receiver_output) + .await + .map_err(Error::Implementation)?; + self.identify_receiver_outputs(&mut is_receiver_output_sync) + } } /// Validate the request headers for a Payjoin request @@ -275,7 +378,8 @@ impl crate::receive::common::WantsFeeRange { /// by the receiver. The receiver may sign and finalize the Payjoin proposal which will be sent to /// the sender for their signature. /// -/// Call [`Self::finalize_proposal`] to return a finalized [`PayjoinProposal`]. +/// Call [`Self::finalize_proposal`] or [`Self::finalize_proposal_async`] to return a +/// finalized [`PayjoinProposal`]. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ProvisionalProposal { psbt_context: PsbtContext, @@ -288,6 +392,9 @@ impl ProvisionalProposal { /// Finalization consists of two steps: /// 1. Remove all sender signatures which were received with the original PSBT as these signatures are now invalid. /// 2. Sign and finalize the resulting PSBT using the passed `wallet_process_psbt` signing function. + /// + /// If the `wallet_process_psbt` validation callback needs to be asynchronous + /// [`Self::finalize_proposal_async`] should be used instead. pub fn finalize_proposal( self, wallet_process_psbt: impl Fn(&Psbt) -> Result, @@ -299,6 +406,27 @@ impl ProvisionalProposal { Ok(PayjoinProposal { payjoin_psbt: finalized_psbt }) } + /// Finalizes the Payjoin proposal into a PSBT which the sender will find acceptable before + /// they sign the transaction and broadcast it to the network. + /// + /// Finalization consists of two steps: + /// 1. Remove all sender signatures which were received with the original PSBT as these signatures are now invalid. + /// 2. Sign and finalize the resulting PSBT using the passed `wallet_process_psbt` signing function. + pub async fn finalize_proposal_async( + self, + wallet_process_psbt: impl Fn(&Psbt) -> F, + ) -> Result + where + F: Future>, + { + let wallet_process_psbt_sync = self + .psbt_context + .to_sync_wallet_process_psbt(wallet_process_psbt) + .await + .map_err(|e| Error::Implementation(ImplementationError::new(e)))?; + self.finalize_proposal(wallet_process_psbt_sync) + } + /// The Payjoin proposal PSBT that the receiver needs to sign /// /// In some applications the entity that progresses the typestate @@ -477,6 +605,37 @@ mod tests { assert_eq!(call_count, 4); } + #[tokio::test] + async fn test_mutable_receiver_state_closures_async() { + let call_count = std::cell::Cell::new(0usize); + let maybe_inputs_owned = maybe_inputs_owned_from_test_vector(); + + let mock_callback = |ret: bool| { + call_count.set(call_count.get() + 1); + async move { Ok::(ret) } + }; + + let maybe_inputs_seen = + maybe_inputs_owned.check_inputs_not_owned_async(&mut |_| mock_callback(false)).await; + assert_eq!(call_count.get(), 1); + + let outputs_unknown = maybe_inputs_seen + .map_err(|_| "Check inputs owned closure failed".to_string()) + .expect("Next receiver state should be accessible") + .check_no_inputs_seen_before_async(&mut |_| mock_callback(false)) + .await; + assert_eq!(call_count.get(), 2); + + let _wants_outputs = outputs_unknown + .map_err(|_| "Check no inputs seen closure failed".to_string()) + .expect("Next receiver state should be accessible") + .identify_receiver_outputs_async(&mut |_| mock_callback(true)) + .await; + // there are 2 receiver outputs so we should expect this callback to run twice incrementing + // call count twice + assert_eq!(call_count.get(), 4); + } + #[test] fn is_output_substitution_disabled() { let mut proposal = unchecked_proposal_from_test_vector(); @@ -516,6 +675,36 @@ mod tests { } } + #[tokio::test] + async fn unchecked_proposal_min_fee_async() { + let proposal = unchecked_proposal_from_test_vector(); + + let min_fee_rate = + proposal.original.psbt_fee_rate().expect("Feerate calculation should not fail"); + let _ = proposal + .clone() + .check_broadcast_suitability_async(Some(min_fee_rate), |_| async { Ok(true) }) + .await + .expect("Broadcast suitability check with appropriate min_fee_rate should succeed"); + assert_eq!(proposal.original.psbt_fee_rate().unwrap(), min_fee_rate); + + let min_fee_rate = FeeRate::MAX; + let proposal_below_min_fee = proposal + .clone() + .check_broadcast_suitability_async(Some(min_fee_rate), |_| async { Ok(true) }) + .await + .expect_err("Broadcast suitability with min_fee_rate below minimum should fail"); + match proposal_below_min_fee { + Error::Protocol(ProtocolError::OriginalPayload(PayloadError( + InternalPayloadError::PsbtBelowFeeRate(original_fee_rate, min_fee_rate_param), + ))) => { + assert_eq!(original_fee_rate, proposal.original.psbt_fee_rate().unwrap()); + assert_eq!(min_fee_rate_param, min_fee_rate); + } + _ => panic!("Expected PsbtBelowFeeRate error, got: {proposal_below_min_fee:?}"), + } + } + #[test] fn test_finalize_proposal_invalid_payjoin_proposal() { let proposal = unchecked_proposal_from_test_vector(); @@ -538,6 +727,32 @@ mod tests { ); } + #[tokio::test] + async fn test_finalize_proposal_invalid_payjoin_proposal_async() { + let proposal = unchecked_proposal_from_test_vector(); + let provisional = provisional_proposal_from_test_vector(proposal); + let empty_tx = Transaction { + version: bitcoin::transaction::Version::TWO, + lock_time: LockTime::Seconds(Time::MIN), + input: vec![], + output: vec![], + }; + let other_psbt = Psbt::from_unsigned_tx(empty_tx).expect("Valid unsigned tx"); + let err = provisional + .clone() + .finalize_proposal_async(|_| async { Ok(other_psbt.clone()) }) + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + format!( + "Implementation error: Ntxid mismatch: expected {}, got {}", + provisional.psbt_context.payjoin_psbt.unsigned_tx.compute_txid(), + other_psbt.unsigned_tx.compute_txid() + ) + ); + } + #[test] fn test_getting_psbt_to_sign() { let provisional_proposal = ProvisionalProposal { diff --git a/payjoin/src/core/receive/v2/mod.rs b/payjoin/src/core/receive/v2/mod.rs index 7ff08f0ee..583bc342b 100644 --- a/payjoin/src/core/receive/v2/mod.rs +++ b/payjoin/src/core/receive/v2/mod.rs @@ -24,6 +24,7 @@ //! Note: Even fresh requests may be linkable via metadata (e.g. client IP, request timing), //! but request reuse makes correlation trivial for the relay. +use std::future::Future; use std::str::FromStr; #[cfg(not(target_arch = "wasm32"))] use std::time::Duration; @@ -57,7 +58,9 @@ use crate::persist::{ MaybeFatalOrSuccessTransition, MaybeFatalTransition, MaybeFatalTransitionWithNoResults, MaybeSuccessTransition, MaybeTransientTransition, NextStateTransition, TerminalTransition, }; -use crate::receive::{parse_payload, InputPair, OriginalPayload, PsbtContext}; +use crate::receive::{ + compute_async_results, make_sync_fn, parse_payload, InputPair, OriginalPayload, PsbtContext, +}; use crate::time::Time; use crate::uri::ShortId; use crate::{ImplementationError, IntoUrl, IntoUrlError, Request, Version}; @@ -608,23 +611,30 @@ pub struct UncheckedOriginalPayload { /// received from the sender is broadcastable to the network in the case of a payjoin failure. /// /// The recommended usage of this typestate differs based on whether you are implementing an -/// interactive (where the receiver takes manual actions to respond to the -/// payjoin proposal) or a non-interactive (ex. a donation page which automatically generates a new QR code -/// for each visit) payment receiver. For the latter, you should call [`Receiver::check_broadcast_suitability`] to check -/// that the proposal is actually broadcastable (and, optionally, whether the fee rate is above the -/// minimum limit you have set). These mechanisms protect the receiver against probing attacks, where -/// a malicious sender can repeatedly send proposals to have the non-interactive receiver reveal the UTXOs -/// it owns with the proposals it modifies. +/// a non-interactive payment receiver (ex. a donation page which automatically generates a new QR code +/// for each visit) or interactive (where the receiver takes manual actions to respond to the +/// payjoin proposal). +/// +/// For the former, you should call [`Receiver::check_broadcast_suitability`] +/// or [`Receiver::check_broadcast_suitability_async`] to check that the +/// proposal is actually broadcastable (and, optionally, whether the fee rate is above the minimum +/// limit you have set). These mechanisms protect the receiver against probing attacks, where a malicious +/// sender can repeatedly send proposals to have the non-interactive receiver reveal the UTXOs +/// it owns with the proposals it modifies. The difference between the two checks is that +/// the async version takes an asynchronous `can_broadcast` callback function. /// -/// If you are implementing an interactive payment receiver, then such checks are not necessary, and you -/// can go ahead with calling [`Receiver::assume_interactive_receiver`] to move on to the next typestate. +/// For the latter, such checks are not necessary and you can go ahead with calling +/// [`Receiver::assume_interactive_receiver`] to move on to the next typestate. impl Receiver { /// Checks that the original PSBT in the proposal can be broadcasted. /// /// If the receiver is a non-interactive payment processor (ex. a donation page which generates /// a new QR code for each visit), then it should make sure that the original PSBT is broadcastable /// as a fallback mechanism in case the payjoin fails. This validation would be equivalent to - /// `testmempoolaccept` RPC call returning `{"allowed": true,...}`. + /// `testmempoolaccept` Bitcoin Core RPC call returning `{"allowed": true,...}`. + /// + /// If the `can_broadcast` validation callback needs to be asynchronous + /// [`Receiver::check_broadcast_suitability_async`] should be used instead. /// /// Receiver can optionally set a minimum fee rate which will be enforced on the original PSBT in the proposal. /// This can be used to further prevent probing attacks since the attacker would now need to probe the receiver @@ -661,6 +671,38 @@ impl Receiver { } } + /// Checks that the original PSBT in the proposal can be broadcasted. + /// + /// If the receiver is a non-interactive payment processor (ex. a donation page which generates + /// a new QR code for each visit), then it should make sure that the original PSBT is broadcastable + /// as a fallback mechanism in case the payjoin fails. This validation would be equivalent to + /// `testmempoolaccept` RPC call returning `{"allowed": true,...}`. + /// + /// Receiver can optionally set a minimum fee rate which will be enforced on the original PSBT in the proposal. + /// This can be used to further prevent probing attacks since the attacker would now need to probe the receiver + /// with transactions which are both broadcastable and pay high fee. Unrelated to the probing attack scenario, + /// this parameter also makes operating in a high fee environment easier for the receiver. + pub async fn check_broadcast_suitability_async( + self, + min_fee_rate: Option, + can_broadcast: impl Fn(&bitcoin::Transaction) -> F, + ) -> MaybeFatalTransition< + SessionEvent, + Receiver, + Error, + Receiver, + > + where + F: Future>, + { + let can_broadcast_sync = self.state.original.to_sync_can_broadcast(can_broadcast).await; + match can_broadcast_sync { + Ok(can_broadcast_sync) => + self.check_broadcast_suitability(min_fee_rate, can_broadcast_sync), + Err(e) => MaybeFatalTransition::transient(Error::Implementation(e)), + } + } + /// Moves on to the next typestate without any of the current typestate's validations. /// /// Use this for interactive payment receivers, where there is no risk of a probing attack since the @@ -697,7 +739,8 @@ pub struct MaybeInputsOwned { /// typestate. The receiver can call [`Receiver::extract_tx_to_schedule_broadcast`] /// to extract the signed original PSBT to schedule a fallback in case the Payjoin process fails. /// -/// Call [`Receiver::check_inputs_not_owned`] to proceed. +/// Call [`Receiver::check_inputs_not_owned`] or +/// [`Receiver::check_inputs_not_owned_async`] to proceed. impl Receiver { /// Extracts the original transaction received from the sender. /// @@ -711,6 +754,9 @@ impl Receiver { /// Check that the original PSBT has no receiver-owned inputs. /// /// An attacker can try to spend the receiver's own inputs. This check prevents that. + /// + /// If the `can_broadcast` validation callback needs to be asynchronous + /// [`Receiver::check_inputs_not_owned_async`] should be used instead. pub fn check_inputs_not_owned( self, is_owned: &mut impl FnMut(&Script) -> Result, @@ -721,30 +767,57 @@ impl Receiver { Receiver, > { match self.state.original.check_inputs_not_owned(is_owned) { - Ok(inner) => inner, + Ok(()) => MaybeFatalTransition::success( + SessionEvent::CheckedInputsNotOwned(), + Receiver { + state: MaybeInputsSeen { original: self.original.clone() }, + session_context: self.session_context, + }, + ), Err(e) => match e { - Error::Implementation(_) => { - return MaybeFatalTransition::transient(e); - } - _ => { - return MaybeFatalTransition::replyable_error( - SessionEvent::GotReplyableError((&e).into()), - Receiver { - state: HasReplyableError { error_reply: (&e).into() }, - session_context: self.session_context, - }, - e, - ); - } + Error::Implementation(_) => MaybeFatalTransition::transient(e), + _ => MaybeFatalTransition::replyable_error( + SessionEvent::GotReplyableError((&e).into()), + Receiver { + state: HasReplyableError { error_reply: (&e).into() }, + session_context: self.session_context, + }, + e, + ), }, - }; - MaybeFatalTransition::success( - SessionEvent::CheckedInputsNotOwned(), - Receiver { - state: MaybeInputsSeen { original: self.original.clone() }, - session_context: self.session_context, + } + } + + /// Check that the original PSBT has no receiver-owned inputs. + /// + /// An attacker can try to spend the receiver's own inputs. This check prevents that. + pub async fn check_inputs_not_owned_async( + self, + is_owned: &mut impl FnMut(&Script) -> F, + ) -> MaybeFatalTransition< + SessionEvent, + Receiver, + Error, + Receiver, + > + where + F: Future>, + { + let is_owned_sync = self.state.original.to_sync_is_owned(is_owned).await; + match is_owned_sync { + Ok(mut is_owned_sync) => self.check_inputs_not_owned(&mut is_owned_sync), + Err(e) => match e { + Error::Implementation(_) => MaybeFatalTransition::transient(e), + _ => MaybeFatalTransition::replyable_error( + SessionEvent::GotReplyableError((&e).into()), + Receiver { + state: HasReplyableError { error_reply: (&e).into() }, + session_context: self.session_context, + }, + e, + ), }, - ) + } } pub(crate) fn apply_checked_inputs_not_owned(self) -> ReceiveSession { @@ -763,7 +836,8 @@ pub struct MaybeInputsSeen { /// Typestate to check that the original PSBT has no inputs that the receiver has seen before. /// -/// Call [`Receiver::check_no_inputs_seen_before`] to proceed. +/// Call [`Receiver::check_no_inputs_seen_before`] or +/// [`Receiver::check_no_inputs_seen_before_async`] to proceed. impl Receiver { /// Check that the receiver has never seen the inputs in the original proposal before. /// @@ -773,6 +847,9 @@ impl Receiver { /// and sending them back to the receiver. /// 2. Re-entrant payjoin, where the sender uses the payjoin PSBT of a previous payjoin as the /// original proposal PSBT of the current, new payjoin. + /// + /// If the `is_known` validation callback needs to be asynchronous + /// [`Receiver::check_no_inputs_seen_before_async`] should be used instead. pub fn check_no_inputs_seen_before( self, is_known: &mut impl FnMut(&OutPoint) -> Result, @@ -783,30 +860,52 @@ impl Receiver { Receiver, > { match self.state.original.check_no_inputs_seen_before(is_known) { - Ok(inner) => inner, + Ok(()) => MaybeFatalTransition::success( + SessionEvent::CheckedNoInputsSeenBefore(), + Receiver { + state: OutputsUnknown { original: self.original.clone() }, + session_context: self.session_context, + }, + ), Err(e) => match e { - Error::Implementation(_) => { - return MaybeFatalTransition::transient(e); - } - _ => { - return MaybeFatalTransition::replyable_error( - SessionEvent::GotReplyableError((&e).into()), - Receiver { - state: HasReplyableError { error_reply: (&e).into() }, - session_context: self.session_context, - }, - e, - ); - } - }, - }; - MaybeFatalTransition::success( - SessionEvent::CheckedNoInputsSeenBefore(), - Receiver { - state: OutputsUnknown { original: self.original.clone() }, - session_context: self.session_context, + Error::Implementation(_) => MaybeFatalTransition::transient(e), + _ => MaybeFatalTransition::replyable_error( + SessionEvent::GotReplyableError((&e).into()), + Receiver { + state: HasReplyableError { error_reply: (&e).into() }, + session_context: self.session_context, + }, + e, + ), }, - ) + } + } + + /// Check that the receiver has never seen the inputs in the original proposal before. + /// + /// This check prevents the following attacks: + /// 1. Probing attacks, where the sender can use the exact same proposal (or with minimal change) + /// to have the receiver reveal their UTXO set by contributing to all proposals with different inputs + /// and sending them back to the receiver. + /// 2. Re-entrant payjoin, where the sender uses the payjoin PSBT of a previous payjoin as the + /// original proposal PSBT of the current, new payjoin. + pub async fn check_no_inputs_seen_before_async( + self, + is_known: &mut impl FnMut(&OutPoint) -> F, + ) -> MaybeFatalTransition< + SessionEvent, + Receiver, + Error, + Receiver, + > + where + F: Future>, + { + let is_known_sync = self.state.original.to_sync_is_known(is_known).await; + match is_known_sync { + Ok(mut is_known_sync) => self.check_no_inputs_seen_before(&mut is_known_sync), + Err(e) => MaybeFatalTransition::transient(Error::Implementation(e)), + } } pub(crate) fn apply_checked_no_inputs_seen_before(self) -> ReceiveSession { @@ -828,7 +927,8 @@ pub struct OutputsUnknown { /// The receiver should only accept the original PSBTs from the sender which actually send them /// money. /// -/// Call [`Receiver::identify_receiver_outputs`] to proceed. +/// Call [`Receiver::identify_receiver_outputs`] or +/// [`Receiver::identify_receiver_outputs_async`] to proceed. impl Receiver { /// Validates whether the original PSBT contains outputs which pay to the receiver and only /// then proceeds to the next typestate. @@ -840,6 +940,9 @@ impl Receiver { /// function sets that parameter to None so that it is ignored in subsequent steps of the /// receiver flow. This protects the receiver from accidentally subtracting fees from their own /// outputs. + /// + /// If the `is_known` validation callback needs to be asynchronous + /// [`Receiver::identify_receiver_outputs_async`] should be used instead. pub fn identify_receiver_outputs( self, is_receiver_output: &mut impl FnMut(&Script) -> Result, @@ -849,28 +952,54 @@ impl Receiver { Error, Receiver, > { - let inner = match self.state.original.identify_receiver_outputs(is_receiver_output) { - Ok(inner) => inner, + match self.state.original.identify_receiver_outputs(is_receiver_output) { + Ok(inner) => MaybeFatalTransition::success( + SessionEvent::IdentifiedReceiverOutputs(inner.owned_vouts.clone()), + Receiver { state: WantsOutputs { inner }, session_context: self.session_context }, + ), Err(e) => match e { - Error::Implementation(_) => { - return MaybeFatalTransition::transient(e); - } - _ => { - return MaybeFatalTransition::replyable_error( - SessionEvent::GotReplyableError((&e).into()), - Receiver { - state: HasReplyableError { error_reply: (&e).into() }, - session_context: self.session_context, - }, - e, - ); - } + Error::Implementation(_) => MaybeFatalTransition::transient(e), + _ => MaybeFatalTransition::replyable_error( + SessionEvent::GotReplyableError((&e).into()), + Receiver { + state: HasReplyableError { error_reply: (&e).into() }, + session_context: self.session_context, + }, + e, + ), }, - }; - MaybeFatalTransition::success( - SessionEvent::IdentifiedReceiverOutputs(inner.owned_vouts.clone()), - Receiver { state: WantsOutputs { inner }, session_context: self.session_context }, - ) + } + } + + /// Validates whether the original PSBT contains outputs which pay to the receiver and only + /// then proceeds to the next typestate. + /// + /// Additionally, this function also protects the receiver from accidentally subtracting fees + /// from their own outputs: when a sender is sending a proposal, + /// they can select an output which they want the receiver to subtract fees from to account for + /// the increased transaction size. If a sender specifies a receiver output for this purpose, this + /// function sets that parameter to None so that it is ignored in subsequent steps of the + /// receiver flow. This protects the receiver from accidentally subtracting fees from their own + /// outputs. + pub async fn identify_receiver_outputs_async( + self, + is_receiver_output: &mut impl FnMut(&Script) -> F, + ) -> MaybeFatalTransition< + SessionEvent, + Receiver, + Error, + Receiver, + > + where + F: Future>, + { + let is_receiver_output_sync = + self.state.original.to_sync_is_receiver_output(is_receiver_output).await; + match is_receiver_output_sync { + Ok(mut is_receiver_output_sync) => + self.identify_receiver_outputs(&mut is_receiver_output_sync), + Err(e) => MaybeFatalTransition::transient(Error::Implementation(e)), + } } pub(crate) fn apply_identified_receiver_outputs( @@ -1054,23 +1183,20 @@ impl Receiver { ) -> MaybeFatalTransition, ProtocolError> { let max_effective_fee_rate = max_effective_fee_rate.or(Some(self.session_context.max_fee_rate)); - let psbt_context = match self + match self .state .inner .calculate_psbt_context_with_fee_range(min_fee_rate, max_effective_fee_rate) { - Ok(inner) => inner, - Err(e) => { - return MaybeFatalTransition::transient(ProtocolError::OriginalPayload(e.into())); - } - }; - MaybeFatalTransition::success( - SessionEvent::AppliedFeeRange(psbt_context.clone()), - Receiver { - state: ProvisionalProposal { psbt_context }, - session_context: self.session_context, - }, - ) + Ok(psbt_context) => MaybeFatalTransition::success( + SessionEvent::AppliedFeeRange(psbt_context.clone()), + Receiver { + state: ProvisionalProposal { psbt_context }, + session_context: self.session_context, + }, + ), + Err(e) => MaybeFatalTransition::transient(ProtocolError::OriginalPayload(e.into())), + } } pub(crate) fn apply_applied_fee_range(self, psbt_context: PsbtContext) -> ReceiveSession { @@ -1099,26 +1225,50 @@ impl Receiver { /// Finalization consists of two steps: /// 1. Remove all sender signatures which were received with the original PSBT as these signatures are now invalid. /// 2. Sign and finalize the resulting PSBT using the passed `wallet_process_psbt` signing function. + /// + /// If the `wallet_process_psbt` validation callback needs to be asynchronous + /// [`Receiver::finalize_proposal_async`] should be used instead. pub fn finalize_proposal( self, wallet_process_psbt: impl Fn(&Psbt) -> Result, ) -> MaybeTransientTransition, ImplementationError> { let original_psbt = self.state.psbt_context.original_psbt.clone(); - let inner = match self.state.psbt_context.finalize_proposal(wallet_process_psbt) { - Ok(inner) => inner, + let payjoin_psbt = match self.state.psbt_context.finalize_proposal(wallet_process_psbt) { + Ok(payjoin_psbt) => payjoin_psbt, Err(e) => { return MaybeTransientTransition::transient(e); } }; - let psbt_context = PsbtContext { payjoin_psbt: inner.clone(), original_psbt }; + let psbt_context = PsbtContext { payjoin_psbt: payjoin_psbt.clone(), original_psbt }; let payjoin_proposal = PayjoinProposal { psbt_context: psbt_context.clone() }; MaybeTransientTransition::success( - SessionEvent::FinalizedProposal(inner), + SessionEvent::FinalizedProposal(payjoin_psbt), Receiver { state: payjoin_proposal, session_context: self.session_context }, ) } + /// Finalizes the Payjoin proposal into a PSBT which the sender will find acceptable before + /// they re-sign the transaction and broadcast it to the network. + /// + /// Finalization consists of two steps: + /// 1. Remove all sender signatures which were received with the original PSBT as these signatures are now invalid. + /// 2. Sign and finalize the resulting PSBT using the passed `wallet_process_psbt` signing function. + pub async fn finalize_proposal_async( + self, + wallet_process_psbt: impl Fn(&Psbt) -> F, + ) -> MaybeTransientTransition, ImplementationError> + where + F: Future>, + { + let wallet_process_psbt_sync = + self.state.psbt_context.to_sync_wallet_process_psbt(wallet_process_psbt).await; + match wallet_process_psbt_sync { + Ok(wallet_process_psbt_sync) => self.finalize_proposal(wallet_process_psbt_sync), + Err(e) => MaybeTransientTransition::transient(e), + } + } + /// The Payjoin proposal PSBT that the receiver needs to sign /// /// In some applications the entity that progresses the typestate @@ -1315,6 +1465,35 @@ pub struct Monitor { /// Call [`Receiver::check_payment`] to confirm the status of the transaction in the /// network and conclude the Payjoin session. impl Receiver { + fn handle_broadcasted_payjoin_tx( + &self, + payjoin_txid: Txid, + tx: bitcoin::Transaction, + ) -> MaybeFatalOrSuccessTransition { + let tx_id = tx.compute_txid(); + if tx_id != payjoin_txid { + return MaybeFatalOrSuccessTransition::transient(Error::Implementation( + ImplementationError::from( + format!( + "Payjoin transaction ID mismatch. Expected: {payjoin_txid}, Got: {tx_id}" + ) + .as_str(), + ), + )); + } + // TODO: should we check for witness and scriptsig on the tx? + let mut sender_witnesses = vec![]; + + for i in self.state.psbt_context.sender_input_indexes() { + let input = tx.input.get(i).expect("sender_input_indexes should return valid indices"); + sender_witnesses.push((input.script_sig.clone(), input.witness.clone())); + } + // Payjoin transaction with SegWit inputs was detected. Log the signatures and complete the session. + MaybeFatalOrSuccessTransition::success(SessionEvent::Closed(SessionOutcome::Success( + sender_witnesses, + ))) + } + /// Checks the network for the Payjoin proposal or the fallback transaction using the passed /// `transaction_exists` closure. Concludes the Payjoin session with a Success if the /// transaction satisfies the condition. @@ -1354,26 +1533,7 @@ impl Receiver { // is not going to change when the sender signs it. So we can use the TXID to check the // network for the Payjoin proposal. match transaction_exists(payjoin_txid) { - Ok(Some(tx)) => { - let tx_id = tx.compute_txid(); - if tx_id != payjoin_txid { - return MaybeFatalOrSuccessTransition::transient(Error::Implementation( - ImplementationError::from(format!("Payjoin transaction ID mismatch. Expected: {payjoin_txid}, Got: {tx_id}").as_str()), - )); - } - // TODO: should we check for witness and scriptsig on the tx? - let mut sender_witnesses = vec![]; - - for i in self.state.psbt_context.sender_input_indexes() { - let input = - tx.input.get(i).expect("sender_input_indexes should return valid indices"); - sender_witnesses.push((input.script_sig.clone(), input.witness.clone())); - } - // Payjoin transaction with SegWit inputs was detected. Log the signatures and complete the session. - return MaybeFatalOrSuccessTransition::success(SessionEvent::Closed( - SessionOutcome::Success(sender_witnesses), - )); - } + Ok(Some(tx)) => return self.handle_broadcasted_payjoin_tx(payjoin_txid, tx), Ok(None) => {} Err(e) => return MaybeFatalOrSuccessTransition::transient(Error::Implementation(e)), } @@ -1391,6 +1551,45 @@ impl Receiver { MaybeFatalOrSuccessTransition::no_results(self.clone()) } + + /// Checks the network for the Payjoin proposal or the fallback transaction using the passed + /// `transaction_exists` closure. Concludes the Payjoin session with a Success if the + /// transaction satisfies the condition. + /// + /// For example, the condition can be if the transaction has been broadcast to the + /// network, or if it has some number of confirmations on the blockchain. + /// + /// If the receiver input address type in the fallback transaction is non-SegWit, then this + /// function will directly conclude the Payjoin session with a Success without running the + /// provided `transaction_exists` closure. `transaction_exists` uses the transaction ID to + /// search for the transaction in the network. Since a non-SegWit input signature is going to + /// change the TXID of the Payjoin proposal, it cannot be monitored. + pub async fn check_payment_async( + &self, + transaction_exists: impl Fn(Txid) -> F, + ) -> MaybeFatalOrSuccessTransition + where + F: Future, ImplementationError>>, + { + let fallback_txid = self + .state + .psbt_context + .original_psbt + .clone() + .extract_tx_unchecked_fee_rate() + .compute_txid(); + + let payjoin_txid = self.state.psbt_context.payjoin_psbt.unsigned_tx.compute_txid(); + let txids = vec![fallback_txid, payjoin_txid]; + let results = compute_async_results(txids, &mut |&txid| transaction_exists(txid)).await; + match results { + Ok(results) => { + let transaction_exists_sync = make_sync_fn(results, |t| t); + self.check_payment(|txid| transaction_exists_sync(&txid)) + } + Err(e) => MaybeFatalOrSuccessTransition::transient(Error::Implementation(e)), + } + } } /// Derive a mailbox endpoint on a directory given a [`ShortId`]. @@ -1588,6 +1787,114 @@ pub mod test { Ok(()) } + #[tokio::test] + async fn test_monitor_typestate_async() -> Result<(), BoxError> { + let psbt_ctx = PsbtContext { + original_psbt: PARSED_ORIGINAL_PSBT.clone(), + payjoin_psbt: PARSED_PAYJOIN_PROPOSAL.clone(), + }; + let monitor = Receiver { + state: Monitor { psbt_context: psbt_ctx }, + session_context: SHARED_CONTEXT.clone(), + }; + + let payjoin_tx = PARSED_PAYJOIN_PROPOSAL.clone().unsigned_tx; + let original_tx = PARSED_ORIGINAL_PSBT.clone().extract_tx().expect("valid tx"); + + // Nothing was spent, should be in the same state + let persister = InMemoryPersister::default(); + let res = monitor + .check_payment_async(|_| async { Ok(None) }) + .await + .save(&persister) + .expect("InMemoryPersister shouldn't fail"); + assert!(matches!(res, OptionalTransitionOutcome::Stasis(_))); + assert!(!persister.inner.read().expect("Shouldn't be poisoned").is_closed); + assert_eq!(persister.inner.read().expect("Shouldn't be poisoned").events.len(), 0); + + // Payjoin was broadcasted, should progress to success + let persister = InMemoryPersister::default(); + let res = monitor + .check_payment_async(|_| async { Ok(Some(payjoin_tx.clone())) }) + .await + .save(&persister) + .expect("InMemoryPersister shouldn't fail"); + + assert!(matches!(res, OptionalTransitionOutcome::Progress(_))); + assert!(persister.inner.read().expect("Shouldn't be poisoned").is_closed); + assert_eq!(persister.inner.read().expect("Shouldn't be poisoned").events.len(), 1); + assert_eq!( + persister.inner.read().expect("Shouldn't be poisoned").events.last(), + Some(&SessionEvent::Closed(SessionOutcome::Success(vec![( + ScriptBuf::default(), + Witness::default() + )]))) + ); + + // Fallback was broadcasted, should progress to success + let persister = InMemoryPersister::default(); + let res = monitor + .check_payment_async(|txid| { + let txid = txid.to_owned(); + let original_tx = original_tx.to_owned(); + async move { + // Emulate if one of the fallback outpoints was double spent + if txid == original_tx.compute_txid() { + Ok(Some(original_tx.clone())) + } else { + Ok(None) + } + } + }) + .await + .save(&persister) + .expect("InMemoryPersister shouldn't fail"); + + assert!(matches!(res, OptionalTransitionOutcome::Progress(_))); + assert!(persister.inner.read().expect("Shouldn't be poisoned").is_closed); + assert_eq!(persister.inner.read().expect("Shouldn't be poisoned").events.len(), 1); + assert_eq!( + persister.inner.read().expect("Shouldn't be poisoned").events.last(), + Some(&SessionEvent::Closed(SessionOutcome::FallbackBroadcasted)) + ); + + // NOTE: due to shoe horning async `transaction_exists` into sync version this now fails + // // Fallback transaction is non-SegWit address type, should end the session without checking + // // the network for broadcasts. + // // Not using the test-utils vectors here as they are SegWit. + // let parsed_original_psbt_p2pkh = Psbt::from_str("cHNidP8BAFICAAAAAd5tU7sqAGa46oUVdEfV1HTeVVPYqvSvxy8/dvF3dwpZAQAAAAD9////AUTxBSoBAAAAFgAUhV1NWa6seBB5g6VZC2lnduxfEaUAAAAAAAEA/QoBAgAAAAIT2eO393FPqJ4fw6NH0rXALebtTCderecX0y6DumtjNgAAAAAA/f///5hrwcRiTXqXScbvk3APDdzy162Yj+6JD/iSEO9KYQl+AQAAAGpHMEQCIGcFm57xH5tQvJMipWfzxS7OGRi7+JfTT6WA27kOt8fVAiAp2I3WGdLk3/dVhoVxN6Jl9Wp/xeCIZZ1OTukSs8jszgEhAjjEq9kNnhvQbdVlWsE9QTIe4h39UPQ8flvU5Ivq6DFm/f///wIo3gUqAQAAABl2qRTWng6zTFWPZX1k12UqqBI6kLz8z4isAPIFKgEAAAAZdqkUIz2wzl605b3cg3j72nXReQuXXaWIrGcAAAABB2pHMEQCIEP33+9X/ecNmaiydM54HS+HoHfZygAQ/vMlc5r1IWkeAiA9oKjOVmp+RnrDF4zzHHGtoG1yy1+UWXBNaDiwd0LokgEhAmfCwbIv1mi5psiB3HFqXN1bFAo+goNUPWIso60J1matAAA=").expect("known psbt should parse"); + // let parsed_payjoin_proposal_p2pkh: Psbt = + // Psbt::from_str("cHNidP8BAHsCAAAAAphrwcRiTXqXScbvk3APDdzy162Yj+6JD/iSEO9KYQl+AAAAAAD9////3m1TuyoAZrjqhRV0R9XUdN5VU9iq9K/HLz928Xd3ClkBAAAAAP3///8BsOILVAIAAAAWABSFXU1Zrqx4EHmDpVkLaWd27F8RpQAAAAAAAQCgAgAAAAJgEjBIihNzFXar4wIYepzXJwQVpbqZep9GCY8pQCqh3wAAAAAA/f///x8caN/onT7AOPRWJz7vnT6yiNxcsAIs/U3RcgU4kiq4AAAAAAD9////AgDyBSoBAAAAGXapFDGh2kOIa5aNVHT2bHSoFfcawEMiiKyk6QUqAQAAABl2qRQY8AsQvx+jg9NdGUwCuShS3qk2KYisZwAAAAEBIgDyBSoBAAAAGXapFDGh2kOIa5aNVHT2bHSoFfcawEMiiKwBB2pHMEQCICQEE2dMDzlyH3ojsc0l98Da0yd2ARuy5AcWQjlgHHjkAiA70WPB+yQhW5zhsOBTg6qLsi0KzoofRAj1BZFpKT2QwAEhA68L99Q+xdIIp0rinuVDs+4qmqMZwg4E+aqbTQ8RClXLAAEA/QoBAgAAAAIT2eO393FPqJ4fw6NH0rXALebtTCderecX0y6DumtjNgAAAAAA/f///5hrwcRiTXqXScbvk3APDdzy162Yj+6JD/iSEO9KYQl+AQAAAGpHMEQCIGcFm57xH5tQvJMipWfzxS7OGRi7+JfTT6WA27kOt8fVAiAp2I3WGdLk3/dVhoVxN6Jl9Wp/xeCIZZ1OTukSs8jszgEhAjjEq9kNnhvQbdVlWsE9QTIe4h39UPQ8flvU5Ivq6DFm/f///wIo3gUqAQAAABl2qRTWng6zTFWPZX1k12UqqBI6kLz8z4isAPIFKgEAAAAZdqkUIz2wzl605b3cg3j72nXReQuXXaWIrGcAAAAAAA==").expect("known psbt should parse"); + // + // let psbt_ctx_p2pkh = PsbtContext { + // original_psbt: parsed_original_psbt_p2pkh.clone(), + // payjoin_psbt: parsed_payjoin_proposal_p2pkh.clone(), + // }; + // let monitor = Receiver { + // state: Monitor { psbt_context: psbt_ctx_p2pkh }, + // session_context: SHARED_CONTEXT.clone(), + // }; + // + // let persister = InMemoryPersister::default(); + // let res = monitor + // .check_payment_async(|_| async { + // panic!("check_payment should return before this closure is called") + // }) + // .await + // .save(&persister) + // .expect("InMemoryPersister shouldn't fail"); + // + // assert!(matches!(res, OptionalTransitionOutcome::Progress(_))); + // assert!(persister.inner.read().expect("Shouldn't be poisoned").is_closed); + // assert_eq!(persister.inner.read().expect("Shouldn't be poisoned").events.len(), 1); + // assert_eq!( + // persister.inner.read().expect("Shouldn't be poisoned").events.last(), + // Some(&SessionEvent::Closed(SessionOutcome::PayjoinProposalSent)) + // ); + + Ok(()) + } + #[test] fn test_v2_mutable_receiver_state_closures() { let persister = InMemoryPersister::default(); @@ -1601,25 +1908,64 @@ pub mod test { Ok(ret) } - let maybe_inputs_seen = - receiver.check_inputs_not_owned(&mut |_| mock_callback(&mut call_count, false)); + let maybe_inputs_seen = receiver + .check_inputs_not_owned(&mut |_| mock_callback(&mut call_count, false)) + .save(&persister) + .expect("Persister shouldn't fail"); assert_eq!(call_count, 1); let outputs_unknown = maybe_inputs_seen - .save(&persister) - .expect("Persister shouldn't fail") .check_no_inputs_seen_before(&mut |_| mock_callback(&mut call_count, false)) .save(&persister) .expect("Persister shouldn't fail"); assert_eq!(call_count, 2); let _wants_outputs = outputs_unknown - .identify_receiver_outputs(&mut |_| mock_callback(&mut call_count, true)); + .identify_receiver_outputs(&mut |_| mock_callback(&mut call_count, true)) + .save(&persister) + .expect("Persister shouldn't fail"); // there are 2 receiver outputs so we should expect this callback to run twice incrementing // call count twice assert_eq!(call_count, 4); } + #[tokio::test] + async fn test_v2_mutable_receiver_state_closures_async() { + let persister = InMemoryPersister::default(); + let call_count = std::cell::Cell::new(0usize); + let maybe_inputs_owned = maybe_inputs_owned_v2_from_test_vector(); + let receiver = + v2::Receiver { state: maybe_inputs_owned, session_context: SHARED_CONTEXT.clone() }; + + let mock_callback = |ret: bool| { + call_count.set(call_count.get() + 1); + async move { Ok::(ret) } + }; + + let maybe_inputs_seen = receiver + .check_inputs_not_owned_async(&mut |_| mock_callback(false)) + .await + .save(&persister) + .expect("Persister shouldn't fail"); + assert_eq!(call_count.get(), 1); + + let outputs_unknown = maybe_inputs_seen + .check_no_inputs_seen_before_async(&mut |_| mock_callback(false)) + .await + .save(&persister) + .expect("Persister shouldn't fail"); + assert_eq!(call_count.get(), 2); + + let _wants_outputs = outputs_unknown + .identify_receiver_outputs_async(&mut |_| mock_callback(true)) + .await + .save(&persister) + .expect("Persister shouldn't fail"); + // there are 2 receiver outputs so we should expect this callback to run twice incrementing + // call count twice + assert_eq!(call_count.get(), 4); + } + #[test] fn test_unchecked_proposal_transient_error() -> Result<(), BoxError> { let unchecked_proposal = unchecked_proposal_v2_from_test_vector(); @@ -1643,6 +1989,31 @@ pub mod test { Ok(()) } + #[tokio::test] + async fn test_unchecked_proposal_transient_error_async() -> Result<(), BoxError> { + let unchecked_proposal = unchecked_proposal_v2_from_test_vector(); + let receiver = + v2::Receiver { state: unchecked_proposal, session_context: SHARED_CONTEXT.clone() }; + + let unchecked_proposal = receiver + .check_broadcast_suitability_async(Some(FeeRate::MIN), |_| async { + Err(ImplementationError::new(Error::Implementation("mock error".into()))) + }) + .await; + + match unchecked_proposal { + MaybeFatalTransition(Err(Rejection::Transient(RejectTransient( + Error::Implementation(error), + )))) => assert_eq!( + error.to_string(), + Error::Implementation("mock error".into()).to_string() + ), + _ => panic!("Expected Implementation error"), + } + + Ok(()) + } + #[test] fn test_unchecked_proposal_fatal_error() -> Result<(), BoxError> { let persister = InMemoryPersister::default(); @@ -1660,6 +2031,24 @@ pub mod test { Ok(()) } + #[tokio::test] + async fn test_unchecked_proposal_fatal_error_async() -> Result<(), BoxError> { + let persister = InMemoryPersister::default(); + let unchecked_proposal = unchecked_proposal_v2_from_test_vector(); + let receiver = + v2::Receiver { state: unchecked_proposal, session_context: SHARED_CONTEXT.clone() }; + + let unchecked_proposal_err = receiver + .check_broadcast_suitability_async(Some(FeeRate::MIN), |_| async { Ok(false) }) + .await + .save(&persister) + .expect_err("should have replyable error"); + let has_error = unchecked_proposal_err.error_state().expect("should have state"); + + let _err_req = has_error.create_error_request(EXAMPLE_URL)?; + Ok(()) + } + #[test] fn test_maybe_inputs_seen_transient_error() -> Result<(), BoxError> { let persister = InMemoryPersister::default(); @@ -1688,6 +2077,36 @@ pub mod test { Ok(()) } + #[tokio::test] + async fn test_maybe_inputs_seen_transient_error_async() -> Result<(), BoxError> { + let persister = InMemoryPersister::default(); + let unchecked_proposal = unchecked_proposal_v2_from_test_vector(); + let receiver = + v2::Receiver { state: unchecked_proposal, session_context: SHARED_CONTEXT.clone() }; + + let maybe_inputs_owned = receiver + .assume_interactive_receiver() + .save(&persister) + .expect("Persister shouldn't fail"); + let maybe_inputs_seen = maybe_inputs_owned + .check_inputs_not_owned_async(&mut |_| async { + Err(ImplementationError::new(Error::Implementation("mock error".into()))) + }) + .await; + + match maybe_inputs_seen { + MaybeFatalTransition(Err(Rejection::Transient(RejectTransient( + Error::Implementation(error), + )))) => assert_eq!( + error.to_string(), + Error::Implementation("mock error".into()).to_string() + ), + _ => panic!("Expected Implementation error"), + } + + Ok(()) + } + #[test] fn test_outputs_unknown_transient_error() -> Result<(), BoxError> { let persister = InMemoryPersister::default(); @@ -1719,6 +2138,40 @@ pub mod test { Ok(()) } + #[tokio::test] + async fn test_outputs_unknown_transient_error_async() -> Result<(), BoxError> { + let persister = InMemoryPersister::default(); + let unchecked_proposal = unchecked_proposal_v2_from_test_vector(); + let receiver = + v2::Receiver { state: unchecked_proposal, session_context: SHARED_CONTEXT.clone() }; + + let maybe_inputs_owned = receiver + .assume_interactive_receiver() + .save(&persister) + .expect("Persister shouldn't fail"); + let maybe_inputs_seen = maybe_inputs_owned + .check_inputs_not_owned_async(&mut |_| async { Ok(false) }) + .await + .save(&persister) + .expect("Persister shouldn't fail"); + let outputs_unknown = maybe_inputs_seen + .check_no_inputs_seen_before_async(&mut |_| async { + Err(ImplementationError::new(Error::Implementation("mock error".into()))) + }) + .await; + match outputs_unknown { + MaybeFatalTransition(Err(Rejection::Transient(RejectTransient( + Error::Implementation(error), + )))) => assert_eq!( + error.to_string(), + Error::Implementation("mock error".into()).to_string() + ), + _ => panic!("Expected Implementation error"), + } + + Ok(()) + } + #[test] fn test_wants_outputs_transient_error() -> Result<(), BoxError> { let persister = InMemoryPersister::default(); @@ -1754,6 +2207,45 @@ pub mod test { Ok(()) } + #[tokio::test] + async fn test_wants_outputs_transient_error_async() -> Result<(), BoxError> { + let persister = InMemoryPersister::default(); + let unchecked_proposal = unchecked_proposal_v2_from_test_vector(); + let receiver = + v2::Receiver { state: unchecked_proposal, session_context: SHARED_CONTEXT.clone() }; + + let maybe_inputs_owned = receiver + .assume_interactive_receiver() + .save(&persister) + .expect("Persister shouldn't fail"); + let maybe_inputs_seen = maybe_inputs_owned + .check_inputs_not_owned_async(&mut |_| async { Ok(false) }) + .await + .save(&persister) + .expect("Persister shouldn't fail"); + let outputs_unknown = maybe_inputs_seen + .check_no_inputs_seen_before_async(&mut |_| async { Ok(false) }) + .await + .save(&persister) + .expect("Persister shouldn't fail"); + let wants_outputs = outputs_unknown + .identify_receiver_outputs_async(&mut |_| async { + Err(ImplementationError::new(Error::Implementation("mock error".into()))) + }) + .await; + match wants_outputs { + MaybeFatalTransition(Err(Rejection::Transient(RejectTransient( + Error::Implementation(error), + )))) => assert_eq!( + error.to_string(), + Error::Implementation("mock error".into()).to_string() + ), + _ => panic!("Expected Implementation error"), + } + + Ok(()) + } + #[test] fn test_create_error_request() -> Result<(), BoxError> { let mock_err = mock_err(); diff --git a/payjoin/tests/integration.rs b/payjoin/tests/integration.rs index a72f44ed6..ce32f706b 100644 --- a/payjoin/tests/integration.rs +++ b/payjoin/tests/integration.rs @@ -50,6 +50,22 @@ mod integration { do_v1_to_v1(sender, receiver, expected_weight) } + #[tokio::test] + async fn v1_to_v1_p2pkh_async() -> Result<(), BoxError> { + init_tracing(); + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver( + Some(AddressType::Legacy), + Some(AddressType::Legacy), + )?; + let expected_weight = Weight::from_wu( + TX_HEADER_LEGACY_WEIGHT + (P2PKH_INPUT_WEIGHT * 2) + (P2WPKH_OUTPUT_WEIGHT * 2), + ) + // bitcoin-cli wallet uses signature grinding to save one vbyte on the original PSBT. + // subtract it here + - Weight::from_vb_unchecked(1); + do_v1_to_v1_async(sender, receiver, expected_weight).await + } + #[test] fn v1_to_v1_nested_p2wpkh() -> Result<(), BoxError> { init_tracing(); @@ -63,6 +79,19 @@ mod integration { do_v1_to_v1(sender, receiver, expected_weight) } + #[tokio::test] + async fn v1_to_v1_nested_p2wpkh_async() -> Result<(), BoxError> { + init_tracing(); + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver( + Some(AddressType::P2shSegwit), + Some(AddressType::P2shSegwit), + )?; + let expected_weight = Weight::from_wu( + TX_HEADER_WEIGHT + (NESTED_P2WPKH_INPUT_WEIGHT * 2) + (P2WPKH_OUTPUT_WEIGHT * 2), + ); + do_v1_to_v1_async(sender, receiver, expected_weight).await + } + #[test] fn v1_to_v1_p2wpkh() -> Result<(), BoxError> { init_tracing(); @@ -76,6 +105,19 @@ mod integration { do_v1_to_v1(sender, receiver, expected_weight) } + #[tokio::test] + async fn v1_to_v1_p2wpkh_async() -> Result<(), BoxError> { + init_tracing(); + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver( + Some(AddressType::Bech32), + Some(AddressType::Bech32), + )?; + let expected_weight = Weight::from_wu( + TX_HEADER_WEIGHT + (P2WPKH_INPUT_WEIGHT * 2) + (P2WPKH_OUTPUT_WEIGHT * 2), + ); + do_v1_to_v1_async(sender, receiver, expected_weight).await + } + #[test] fn v1_to_v1_taproot() -> Result<(), BoxError> { init_tracing(); @@ -95,6 +137,25 @@ mod integration { do_v1_to_v1(sender, receiver, expected_weight) } + #[tokio::test] + async fn v1_to_v1_taproot_async() -> Result<(), BoxError> { + init_tracing(); + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver( + Some(AddressType::Bech32m), + Some(AddressType::Bech32m), + )?; + let expected_weight = Weight::from_wu( + TX_HEADER_WEIGHT + + (P2TR_INPUT_WEIGHT * 2) + + (P2WPKH_OUTPUT_WEIGHT * 2), + ) + // bitcoin-cli wallet overestimates taproot inputs in the original PSBT by one vbyte: + // https://github.com/payjoin/rust-payjoin/issues/369#issuecomment-2657539591 + // add it here + + Weight::from_vb_unchecked(1); + do_v1_to_v1_async(sender, receiver, expected_weight).await + } + fn do_v1_to_v1( sender: corepc_node::Client, receiver: corepc_node::Client, @@ -151,6 +212,63 @@ mod integration { Ok(()) } + async fn do_v1_to_v1_async( + sender: corepc_node::Client, + receiver: corepc_node::Client, + expected_weight: Weight, + ) -> Result<(), BoxError> { + // Receiver creates the payjoin URI + let pj_receiver_address = receiver.new_address()?; + let mut pj_uri = + build_v1_pj_uri(&pj_receiver_address, EXAMPLE_URL, OutputSubstitution::Enabled)?; + pj_uri.amount = Some(Amount::ONE_BTC); + + // ********************** + // Inside the Sender: + // Sender create a funded PSBT (not broadcasted) to address with amount given in the pj_uri + let uri = Uri::from_str(&pj_uri.to_string()) + .map_err(|e| e.to_string())? + .assume_checked() + .check_pj_supported() + .map_err(|e| e.to_string())?; + let psbt = build_original_psbt(&sender, &uri)?; + debug!("Original psbt: {psbt:#?}"); + let (req, ctx) = SenderBuilder::new(psbt, uri) + .build_with_additional_fee(Amount::from_sat(10000), None, FeeRate::ZERO, false)? + .create_v1_post_request(); + let headers = HeaderMock::new(&req.body, req.content_type); + + // ********************** + // Inside the Receiver: + // this data would transit from one party to another over the network in production + let response = + handle_v1_pj_request_async(req, headers, &receiver, None, None, None).await?; + // this response would be returned as http response to the sender + + // ********************** + // Inside the Sender: + // Sender checks, signs, finalizes, extracts, and broadcasts + let checked_payjoin_proposal_psbt = ctx.process_response(response.as_bytes())?; + let network_fees = checked_payjoin_proposal_psbt.fee()?; + let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; + assert_eq!(network_fees, expected_fee); + let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; + sender.send_raw_transaction(&payjoin_tx)?; + + // Check resulting transaction and balances + assert_eq!(payjoin_tx.input.len(), 2); + assert_eq!(payjoin_tx.output.len(), 2); + assert_eq!( + receiver.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(51.0)? + ); + assert_eq!( + sender.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(49.0)? - network_fees + ); + Ok(()) + } + #[test] fn allow_mixed_input_scripts() -> Result<(), BoxError> { init_tracing(); @@ -186,6 +304,44 @@ mod integration { assert!(handle_v1_pj_request(req, headers, &receiver, None, None, None).is_ok()); Ok(()) } + + #[tokio::test] + async fn allow_mixed_input_scripts_async() -> Result<(), BoxError> { + init_tracing(); + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver( + Some(AddressType::Bech32), + Some(AddressType::P2shSegwit), + )?; + + // Receiver creates the payjoin URI + let pj_receiver_address = receiver.new_address()?; + let mut pj_uri = + build_v1_pj_uri(&pj_receiver_address, EXAMPLE_URL, OutputSubstitution::Enabled)?; + pj_uri.amount = Some(Amount::ONE_BTC); + + // ********************** + // Inside the Sender: + // Sender create a funded PSBT (not broadcasted) to address with amount given in the pj_uri + let uri = Uri::from_str(&pj_uri.to_string()) + .map_err(|e| e.to_string())? + .assume_checked() + .check_pj_supported() + .map_err(|e| e.to_string())?; + let psbt = build_original_psbt(&sender, &uri)?; + debug!("Original psbt: {psbt:#?}"); + let (req, _ctx) = SenderBuilder::new(psbt, uri) + .build_with_additional_fee(Amount::from_sat(10000), None, FeeRate::ZERO, false)? + .create_v1_post_request(); + let headers = HeaderMock::new(&req.body, req.content_type); + + // ********************** + // Inside the Receiver: + // This should NOT error because the receiver is attempting to introduce mixed input script types + assert!(handle_v1_pj_request_async(req, headers, &receiver, None, None, None) + .await + .is_ok()); + Ok(()) + } } // not all needs v1 @@ -335,6 +491,15 @@ mod integration { assert!(result.is_ok(), "v2 send receive failed: {:#?}", result.unwrap_err()); + let mut services = TestServices::initialize().await?; + let result = tokio::select!( + err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), + err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), + res = do_err_test_async(&services) => res + ); + + assert!(result.is_ok(), "v2 send receive async failed: {:#?}", result.unwrap_err()); + async fn do_err_test(services: &TestServices) -> Result<(), BoxError> { let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver(None, None)?; let agent = services.http_agent(); @@ -489,43 +654,198 @@ mod integration { Ok(()) } - Ok(()) - } + async fn do_err_test_async(services: &TestServices) -> Result<(), BoxError> { + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver(None, None)?; + let agent = services.http_agent(); + services.wait_for_services_ready().await?; + let ohttp_keys = services.fetch_ohttp_keys().await?; + let persister = InMemoryPersister::default(); + let sender_persister = InMemoryPersister::default(); + // ********************** + // Inside the Receiver: + let address = receiver.new_address()?; - #[tokio::test] - async fn v2_to_v2_p2pkh() -> Result<(), BoxSendSyncError> { - init_tracing(); - let mut services = TestServices::initialize().await?; - let expected_weight = Weight::from_wu( - TX_HEADER_LEGACY_WEIGHT + (P2PKH_INPUT_WEIGHT * 2) + P2WPKH_OUTPUT_WEIGHT, - ) - // bitcoin-cli wallet uses signature grinding to save one vbyte on the original PSBT. - // subtract it here - - Weight::from_vb_unchecked(1); - let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; + let session = + ReceiverBuilder::new(address, services.directory_url().as_str(), ohttp_keys)? + .build() + .save(&persister)?; + println!("session: {:#?}", session); + // Poll receive request + let (req, ctx) = + session.create_poll_request(services.ohttp_relay_url().as_str())?; + let response = agent + .post(req.url) + .header("Content-Type", req.content_type) + .body(req.body) + .send() + .await?; + assert!(response.status().is_success(), "error response: {}", response.status()); + let response_body = session + .process_response(response.bytes().await?.to_vec().as_slice(), ctx) + .save(&persister)?; + // No proposal yet since sender has not responded + let session = + if let OptionalTransitionOutcome::Stasis(current_state) = response_body { + current_state + } else { + panic!("Should still be in initialized state") + }; - let (_bitcoind, sender, receiver) = - init_bitcoind_sender_receiver(Some(AddressType::Legacy), Some(AddressType::Legacy)) - .expect("should be able to initialize the sender and the receiver"); - let recv_persister = InMemoryPersister::default(); - let send_persister = InMemoryPersister::default(); + // ********************** + // Inside the Sender: + // Create a funded PSBT (not broadcasted) to address with amount given in the pj_uri + let pj_uri = Uri::from_str(&session.pj_uri().to_string()) + .map_err(|e| e.to_string())? + .assume_checked() + .check_pj_supported() + .map_err(|e| e.to_string())?; + let psbt = build_sweep_psbt(&sender, &pj_uri)?; + let req_ctx = SenderBuilder::new(psbt, pj_uri) + .build_recommended(FeeRate::BROADCAST_MIN)? + .save(&sender_persister)?; + let (Request { url, body, content_type, .. }, send_ctx) = + req_ctx.create_v2_post_request(services.ohttp_relay_url().as_str())?; + let response = + agent.post(url).header("Content-Type", content_type).body(body).send().await?; + tracing::info!("Response: {:#?}", &response); + assert!(response.status().is_success(), "error response: {}", response.status()); + let req_ctx = req_ctx + .process_response(&response.bytes().await?, send_ctx) + .save(&sender_persister)?; - let result = tokio::select!( - err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), - err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), - res = do_v2_to_v2(&services, &receiver, &sender, &recv_persister, &send_persister, SenderFinalAction::SignAndBroadcastPayjoinProposal) => res - ); + // POST Original PSBT - assert!(result.is_ok(), "v2 p2pkh send receive failed: {:#?}", result.unwrap_err()); + // ********************** + // Inside the Receiver: - let (broadcasted_transaction, monitoring_payment) = result.unwrap(); + // GET fallback psbt + let (req, ctx) = + session.create_poll_request(services.ohttp_relay_url().as_str())?; + let response = agent + .post(req.url) + .header("Content-Type", req.content_type) + .body(req.body) + .send() + .await?; + // POST payjoin + let outcome = session + .process_response(response.bytes().await?.to_vec().as_slice(), ctx) + .save(&persister)?; + let proposal = if let OptionalTransitionOutcome::Progress(psbt) = outcome { + psbt + } else { + panic!("proposal should exist"); + }; - // Sender should have sent the entire value of their UTXO to receiver (minus fees). - assert_eq!(broadcasted_transaction.input.len(), 2); - assert_eq!(broadcasted_transaction.output.len(), 1); - assert_eq!( - receiver.get_balances()?.into_model()?.mine.untrusted_pending, - Amount::from_btc(100.0)? - expected_fee + // Progress past the first typestate so we can send a encrypted error response + // TODO: when the reply key is being persisted as its own session event we can fail at the + // unchecked original typestate + let proposal = proposal.assume_interactive_receiver().save(&persister)?; + + // Generate replyable error + let server_error = proposal + .clone() + .check_inputs_not_owned_async(&mut |_| async { Ok(true) }) + .await + .save(&persister) + .expect_err("should fail") + .api_error() + .expect("expected api error"); + // TODO: this should be replaced by comparing the error itself once the error types impl PartialEq + // Issue: https://github.com/payjoin/rust-payjoin/issues/645 + assert_eq!( + server_error.to_string(), + "Protocol error: The receiver rejected the original PSBT." + ); + + let (session, session_history) = replay_receiver_event_log(&persister)?; + assert_eq!(session_history.status(), SessionStatus::Active); + let has_error = match session { + ReceiveSession::HasReplyableError(r) => r, + _ => panic!("Expected HasError"), + }; + let (err_req, err_ctx) = + has_error.create_error_request(services.ohttp_relay_url().as_str())?; + let err_response = agent + .post(err_req.url) + .header("Content-Type", err_req.content_type) + .body(err_req.body) + .send() + .await?; + + let err_bytes = err_response.bytes().await?; + has_error.process_error_response(&err_bytes, err_ctx).save(&persister)?; + + // Ensure the session is closed properly + let (_, session_history) = replay_receiver_event_log(&persister)?; + assert_eq!(session_history.status(), SessionStatus::Failed); + + // Check that we can read the error response as a sender + let (req, ctx) = + req_ctx.create_poll_request(services.ohttp_relay_url().as_str())?; + let response = agent + .post(req.url) + .header("Content-Type", req.content_type) + .body(req.body) + .send() + .await?; + assert!(response.status().is_success(), "error response: {}", response.status()); + let reply_error = req_ctx + .process_response(&response.bytes().await?, ctx) + .save(&sender_persister) + .expect_err("Should be a fatal error"); + + let api_error = reply_error.api_error().expect("expecting error from API"); + match api_error { + ResponseError::WellKnown(well_known_error) => { + assert_eq!( + well_known_error.to_string(), + "The receiver rejected the original PSBT." + ); + } + _ => panic!("Expected Unrecognized error, got {:?}", api_error), + } + + Ok(()) + } + + Ok(()) + } + + #[tokio::test] + async fn v2_to_v2_p2pkh() -> Result<(), BoxSendSyncError> { + init_tracing(); + let mut services = TestServices::initialize().await?; + let expected_weight = Weight::from_wu( + TX_HEADER_LEGACY_WEIGHT + (P2PKH_INPUT_WEIGHT * 2) + P2WPKH_OUTPUT_WEIGHT, + ) + // bitcoin-cli wallet uses signature grinding to save one vbyte on the original PSBT. + // subtract it here + - Weight::from_vb_unchecked(1); + let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; + + let (_bitcoind, sender, receiver) = + init_bitcoind_sender_receiver(Some(AddressType::Legacy), Some(AddressType::Legacy)) + .expect("should be able to initialize the sender and the receiver"); + let recv_persister = InMemoryPersister::default(); + let send_persister = InMemoryPersister::default(); + + let result = tokio::select!( + err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), + err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), + res = do_v2_to_v2(&services, &receiver, &sender, &recv_persister, &send_persister, SenderFinalAction::SignAndBroadcastPayjoinProposal) => res + ); + + assert!(result.is_ok(), "v2 p2pkh send receive failed: {:#?}", result.unwrap_err()); + + let (broadcasted_transaction, monitoring_payment) = result.unwrap(); + + // Sender should have sent the entire value of their UTXO to receiver (minus fees). + assert_eq!(broadcasted_transaction.input.len(), 2); + assert_eq!(broadcasted_transaction.output.len(), 1); + assert_eq!( + receiver.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(100.0)? - expected_fee ); assert_eq!( sender.get_balances()?.into_model()?.mine.untrusted_pending, @@ -552,6 +872,75 @@ mod integration { Ok(()) } + #[tokio::test] + async fn v2_to_v2_p2pkh_async() -> Result<(), BoxSendSyncError> { + init_tracing(); + let mut services = TestServices::initialize().await?; + let expected_weight = Weight::from_wu( + TX_HEADER_LEGACY_WEIGHT + (P2PKH_INPUT_WEIGHT * 2) + P2WPKH_OUTPUT_WEIGHT, + ) + // bitcoin-cli wallet uses signature grinding to save one vbyte on the original PSBT. + // subtract it here + - Weight::from_vb_unchecked(1); + let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; + + let (_bitcoind, sender, receiver) = + init_bitcoind_sender_receiver(Some(AddressType::Legacy), Some(AddressType::Legacy)) + .expect("should be able to initialize the sender and the receiver"); + let recv_persister = InMemoryPersister::default(); + let send_persister = InMemoryPersister::default(); + + let result = tokio::select!( + err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), + err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), + res = do_v2_to_v2_async(&services, &receiver, &sender, &recv_persister, &send_persister, SenderFinalAction::SignAndBroadcastPayjoinProposal) => res + ); + + assert!( + result.is_ok(), + "v2 p2pkh send receive async failed: {:#?}", + result.unwrap_err() + ); + + let (broadcasted_transaction, monitoring_payment) = result.unwrap(); + + // Sender should have sent the entire value of their UTXO to receiver (minus fees). + assert_eq!(broadcasted_transaction.input.len(), 2); + assert_eq!(broadcasted_transaction.output.len(), 1); + assert_eq!( + receiver.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(100.0)? - expected_fee + ); + assert_eq!( + sender.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(0.0)? + ); + + // Receiver cannot validate that the sender has broadcasted the Payjoin proposal or the fallback transaction. + // The sender is using a non-SegWit address, so their signature is going to change the TXID. So we test whether the + // function exists early and does not call the closure. + monitoring_payment + .check_payment_async(|_| async { + // NOTE: this was changed from panic! to None bc the async + // transaction_exists will precompute the results for payjoin + // proposal and fallback, and when fallback tx is not found the + // panic will cause this test to fail. + Ok(None) + }) + .await + .save(&recv_persister) + .expect("receiver should successfully monitor for the payment"); + + let (_session, session_history) = replay_receiver_event_log(&recv_persister)?; + assert_eq!( + recv_persister.load().unwrap().last(), + Some(payjoin::receive::v2::SessionEvent::Closed(payjoin::receive::v2::SessionOutcome::PayjoinProposalSent)), + "The last event of the persister should be a SessionOutcome::PayjoinProposalSent since the sender is going to change the TXID when they sign the Payjoin proposal", + ); + assert_eq!(session_history.status(), SessionStatus::Completed); + Ok(()) + } + #[tokio::test] async fn v2_to_v2_p2wpkh() -> Result<(), BoxSendSyncError> { init_tracing(); @@ -626,6 +1015,87 @@ mod integration { Ok(()) } + #[tokio::test] + async fn v2_to_v2_p2wpkh_async() -> Result<(), BoxSendSyncError> { + init_tracing(); + let mut services = TestServices::initialize().await?; + let expected_weight = Weight::from_wu( + TX_HEADER_WEIGHT + (P2WPKH_INPUT_WEIGHT * 2) + P2WPKH_OUTPUT_WEIGHT, + ); + let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; + + let (_bitcoind, sender, receiver) = + init_bitcoind_sender_receiver(Some(AddressType::Bech32), Some(AddressType::Bech32)) + .expect("should be able to initialize the sender and the receiver"); + let recv_persister = InMemoryPersister::default(); + let send_persister = InMemoryPersister::default(); + + let result = tokio::select!( + err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), + err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), + res = do_v2_to_v2_async(&services, &receiver, &sender, &recv_persister, &send_persister, SenderFinalAction::SignAndBroadcastPayjoinProposal) => res + ); + + assert!(result.is_ok(), "v2 p2wpkh send receive failed: {:#?}", result.unwrap_err()); + + let (broadcasted_transaction, monitoring_payment) = result.unwrap(); + + // Sender should have sent the entire value of their UTXO to receiver (minus fees). + assert_eq!(broadcasted_transaction.input.len(), 2); + assert_eq!(broadcasted_transaction.output.len(), 1); + assert_eq!( + receiver.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(100.0)? - expected_fee + ); + assert_eq!( + sender.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(0.0)? + ); + + // Receiver should be able to validate that the sender has broadcasted the Payjoin proposal. + monitoring_payment + .check_payment_async(|txid| { + let get_tx_result = receiver.get_raw_transaction(txid); + async { + match get_tx_result { + Ok(tx) => + Ok(Some(tx.transaction().expect("transaction should be decodable"))), + Err(_) => { + // NOTE: this was changed from panic! to None bc the async + // transaction_exists will precompute the results for payjoin + // proposal and fallback, and when fallback tx is not found the + // panic will cause this test to fail. + Ok(None) + } + } + } + }) + .await + .save(&recv_persister) + .expect("receiver should successfully monitor for the payment"); + + // Receiver session should have completed with a Success, along with information on the + // sender signatures on the Payjoin that was broadcasted. + let (_session, session_history) = replay_receiver_event_log(&recv_persister)?; + let sender_outpoint = session_history.fallback_tx().unwrap().input[0].previous_output; + let sender_signatures = { + let sender_txin = broadcasted_transaction + .input + .iter() + .find(|txin| txin.previous_output == sender_outpoint) + .expect("sender input must be present in payjoin_tx") + .clone(); + vec![(sender_txin.clone().script_sig, sender_txin.clone().witness)] + }; + assert_eq!( + recv_persister.load().unwrap().last(), + Some(payjoin::receive::v2::SessionEvent::Closed(payjoin::receive::v2::SessionOutcome::Success(sender_signatures))), + "The last event of the persister should be a SessionOutcome::Success with the correct sender signature", + ); + assert_eq!(session_history.status(), SessionStatus::Completed); + Ok(()) + } + #[tokio::test] async fn v2_to_v2_taproot() -> Result<(), BoxSendSyncError> { init_tracing(); @@ -709,28 +1179,119 @@ mod integration { } #[tokio::test] - async fn v2_to_v2_fallback_tx_broadcast() -> Result<(), BoxSendSyncError> { + async fn v2_to_v2_taproot_async() -> Result<(), BoxSendSyncError> { init_tracing(); let mut services = TestServices::initialize().await?; - let expected_weight = - Weight::from_wu(TX_HEADER_WEIGHT + P2WPKH_INPUT_WEIGHT + P2WPKH_OUTPUT_WEIGHT); + let expected_weight = Weight::from_wu( + TX_HEADER_WEIGHT + + (P2TR_INPUT_WEIGHT * 2) + + P2WPKH_OUTPUT_WEIGHT, + ) + // bitcoin-cli wallet overestimates taproot inputs in the original PSBT by one vbyte: + // https://github.com/payjoin/rust-payjoin/issues/369#issuecomment-2657539591 + // add it here + + Weight::from_vb_unchecked(1); let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; - let (_bitcoind, sender, receiver) = - init_bitcoind_sender_receiver(Some(AddressType::Bech32), Some(AddressType::Bech32)) - .expect("should be able to initialize the sender and the receiver"); + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver( + Some(AddressType::Bech32m), + Some(AddressType::Bech32m), + ) + .expect("should be able to initialize the sender and the receiver"); let recv_persister = InMemoryPersister::default(); let send_persister = InMemoryPersister::default(); let result = tokio::select!( err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), - res = do_v2_to_v2(&services, &receiver, &sender, &recv_persister, &send_persister, SenderFinalAction::BroadcastFallbackTransaction) => res + res = do_v2_to_v2_async(&services, &receiver, &sender, &recv_persister, &send_persister, SenderFinalAction::SignAndBroadcastPayjoinProposal) => res ); assert!( result.is_ok(), - "v2 send receive with fallback broadcast failed: {:#?}", + "v2 taproot send receive async failed: {:#?}", + result.unwrap_err() + ); + + let (broadcasted_transaction, monitoring_payment) = result.unwrap(); + + // Sender should have sent the entire value of their UTXO to receiver (minus fees). + assert_eq!(broadcasted_transaction.input.len(), 2); + assert_eq!(broadcasted_transaction.output.len(), 1); + assert_eq!( + receiver.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(100.0)? - expected_fee + ); + assert_eq!( + sender.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(0.0)? + ); + + // Receiver should be able to validate that the sender has broadcasted the Payjoin proposal. + monitoring_payment + .check_payment_async(|txid| { + let get_tx_result = receiver.get_raw_transaction(txid); + async move { + match get_tx_result { + Ok(tx) => + Ok(Some(tx.transaction().expect("transaction should be decodable"))), + // NOTE: this was changed from panic! to None bc the async + // transaction_exists will precompute the results for payjoin + // proposal and fallback, and when fallback tx is not found the + // panic will cause this test to fail. + Err(_) => Ok(None) + } + } + }) + .await + .save(&recv_persister) + .expect("receiver should successfully monitor for the payment"); + + // Receiver session should have completed with a Success, along with information on the + // sender signatures on the Payjoin that was broadcasted. + let (_session, session_history) = replay_receiver_event_log(&recv_persister)?; + let sender_outpoint = session_history.fallback_tx().unwrap().input[0].previous_output; + let sender_signatures = { + let sender_txin = broadcasted_transaction + .input + .iter() + .find(|txin| txin.previous_output == sender_outpoint) + .expect("sender input must be present in payjoin_tx") + .clone(); + vec![(sender_txin.clone().script_sig, sender_txin.clone().witness)] + }; + assert_eq!( + recv_persister.load().unwrap().last(), + Some(payjoin::receive::v2::SessionEvent::Closed(payjoin::receive::v2::SessionOutcome::Success(sender_signatures))), + "The last event of the persister should be a SessionOutcome::Success with the correct sender signature", + ); + assert_eq!(session_history.status(), SessionStatus::Completed); + Ok(()) + } + + #[tokio::test] + async fn v2_to_v2_fallback_tx_broadcast() -> Result<(), BoxSendSyncError> { + init_tracing(); + let mut services = TestServices::initialize().await?; + let expected_weight = + Weight::from_wu(TX_HEADER_WEIGHT + P2WPKH_INPUT_WEIGHT + P2WPKH_OUTPUT_WEIGHT); + let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; + + let (_bitcoind, sender, receiver) = + init_bitcoind_sender_receiver(Some(AddressType::Bech32), Some(AddressType::Bech32)) + .expect("should be able to initialize the sender and the receiver"); + let recv_persister = InMemoryPersister::default(); + let send_persister = InMemoryPersister::default(); + + let result = tokio::select!( + err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), + err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), + res = do_v2_to_v2(&services, &receiver, &sender, &recv_persister, &send_persister, SenderFinalAction::BroadcastFallbackTransaction) => res + ); + + assert!( + result.is_ok(), + "v2 send receive with fallback broadcast failed: {:#?}", result.unwrap_err() ); @@ -775,6 +1336,76 @@ mod integration { Ok(()) } + #[tokio::test] + async fn v2_to_v2_fallback_tx_broadcast_async() -> Result<(), BoxSendSyncError> { + init_tracing(); + let mut services = TestServices::initialize().await?; + let expected_weight = + Weight::from_wu(TX_HEADER_WEIGHT + P2WPKH_INPUT_WEIGHT + P2WPKH_OUTPUT_WEIGHT); + let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; + + let (_bitcoind, sender, receiver) = + init_bitcoind_sender_receiver(Some(AddressType::Bech32), Some(AddressType::Bech32)) + .expect("should be able to initialize the sender and the receiver"); + let recv_persister = InMemoryPersister::default(); + let send_persister = InMemoryPersister::default(); + + let result = tokio::select!( + err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), + err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), + res = do_v2_to_v2_async(&services, &receiver, &sender, &recv_persister, &send_persister, SenderFinalAction::BroadcastFallbackTransaction) => res + ); + + assert!( + result.is_ok(), + "v2 send receive async with fallback broadcast failed: {:#?}", + result.unwrap_err() + ); + + let (broadcasted_transaction, monitoring_payment) = result.unwrap(); + + // Fallback transaction was broadcasted, so there will only be a single input. + assert_eq!(broadcasted_transaction.input.len(), 1); + assert_eq!(broadcasted_transaction.output.len(), 1); + assert_eq!( + receiver.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(50.0)? - expected_fee + ); + assert_eq!( + sender.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(0.0)? + ); + + // Receiver should be able to validate that the sender has broadcasted the fallback transaction. + // The check_payment closure should be called twice: first for the Payjoin proposal, which will not be found, + // and then for the fallback transaction, which will be found.. + monitoring_payment + .check_payment_async(|txid| { + let get_tx_result = receiver.get_raw_transaction(txid); + async move { + match get_tx_result { + Ok(tx) => + Ok(Some(tx.transaction().expect("transaction should be decodable"))), + Err(_) => Ok(None), + } + } + }) + .await + .save(&recv_persister) + .expect("receiver should successfully monitor for the payment"); + + // Receiver session should have completed with a Success and a fallback session + // outcome. + let (_session, session_history) = replay_receiver_event_log(&recv_persister)?; + assert_eq!( + recv_persister.load().unwrap().last(), + Some(payjoin::receive::v2::SessionEvent::Closed(payjoin::receive::v2::SessionOutcome::FallbackBroadcasted)), + "The last event of the persister should be a SessionOutcome::Success with the correct sender signature", + ); + assert_eq!(session_history.status(), SessionStatus::FallbackBroadcasted); + Ok(()) + } + /// Helper function for running a Payjoin v2 session. Uses the `sender_final_action` /// parameter to determine what action the sender will take after they receive the Payjoin /// proposal from the receiver. @@ -914,6 +1545,145 @@ mod integration { Ok((broadcasted_transaction, monitoring_payment)) } + /// Helper function for running a Payjoin v2 session. Uses the `sender_final_action` + /// parameter to determine what action the sender will take after they receive the Payjoin + /// proposal from the receiver. + /// + /// Returns the transaction which the sender broadcasts and the state of the Receiver + /// before they begin monitoring ([`Receiver`]) so that different tests can modify + /// how the receiver is going to validate the action the sender takes. + async fn do_v2_to_v2_async( + services: &TestServices, + receiver: &corepc_node::Client, + sender: &corepc_node::Client, + recv_persister: &R, + send_persister: &S, + sender_final_action: SenderFinalAction, + ) -> Result<(Transaction, Receiver), BoxError> + where + R: SessionPersister + Clone, + S: SessionPersister + Clone, + { + let agent = services.http_agent(); + services.wait_for_services_ready().await?; + let ohttp_keys = services.fetch_ohttp_keys().await?; + // ********************** + // Inside the Receiver: + let address = receiver.new_address()?; + + // test session with expiration in the future + let session = + ReceiverBuilder::new(address, services.directory_url().as_str(), ohttp_keys)? + .build() + .save(recv_persister)?; + println!("session: {:#?}", session); + // Poll receive request + let (req, ctx) = session.create_poll_request(services.ohttp_relay_url().as_str())?; + let response = agent + .post(req.url) + .header("Content-Type", req.content_type) + .body(req.body) + .send() + .await?; + assert!(response.status().is_success(), "error response: {}", response.status()); + let response_body = session + .process_response(response.bytes().await?.to_vec().as_slice(), ctx) + .save(recv_persister)?; + // No proposal yet since sender has not responded + let session = if let OptionalTransitionOutcome::Stasis(current_state) = response_body { + current_state + } else { + panic!("Should still be in initialized state") + }; + + // ********************** + // Inside the Sender: + // Create a funded PSBT (not broadcasted) to address with amount given in the pj_uri + let pj_uri = Uri::from_str(&session.pj_uri().to_string()) + .map_err(|e| e.to_string())? + .assume_checked() + .check_pj_supported() + .map_err(|e| e.to_string())?; + let psbt = build_sweep_psbt(sender, &pj_uri)?; + let req_ctx = SenderBuilder::new(psbt, pj_uri) + .build_recommended(FeeRate::BROADCAST_MIN)? + .save(send_persister)?; + let (Request { url, body, content_type, .. }, send_ctx) = + req_ctx.create_v2_post_request(services.ohttp_relay_url().as_str())?; + let response = + agent.post(url).header("Content-Type", content_type).body(body).send().await?; + tracing::info!("Response: {:#?}", &response); + assert!(response.status().is_success(), "error response: {}", response.status()); + let send_ctx = req_ctx + .process_response(&response.bytes().await?, send_ctx) + .save(send_persister)?; + // POST Original PSBT + + // ********************** + // Inside the Receiver: + + // GET fallback psbt + let (req, ctx) = session.create_poll_request(services.ohttp_relay_url().as_str())?; + let response = agent + .post(req.url) + .header("Content-Type", req.content_type) + .body(req.body) + .send() + .await?; + // POST payjoin + let outcome = session + .process_response(response.bytes().await?.to_vec().as_slice(), ctx) + .save(recv_persister)?; + let proposal = if let OptionalTransitionOutcome::Progress(psbt) = outcome { + psbt + } else { + panic!("proposal should exist"); + }; + let payjoin_proposal = + handle_directory_proposal_async(receiver, proposal, recv_persister, None).await?; + let (req, ctx) = + payjoin_proposal.create_post_request(services.ohttp_relay_url().as_str())?; + let response = agent + .post(req.url) + .header("Content-Type", req.content_type) + .body(req.body) + .send() + .await?; + let monitoring_payment = payjoin_proposal + .process_response(&response.bytes().await?, ctx) + .save(recv_persister)?; + + // ********************** + // Inside the Sender: + // Sender checks, signs, finalizes, constructs, and broadcasts + // Replay post fallback to get the response + let (Request { url, body, content_type, .. }, ohttp_ctx) = + send_ctx.create_poll_request(services.ohttp_relay_url().as_str())?; + let response = + agent.post(url).header("Content-Type", content_type).body(body).send().await?; + tracing::info!("Response: {:#?}", &response); + let response = send_ctx + .process_response(&response.bytes().await?, ohttp_ctx) + .save(send_persister) + .expect("psbt should exist"); + + let checked_payjoin_proposal_psbt = + if let OptionalTransitionOutcome::Progress(psbt) = response { + psbt + } else { + panic!("psbt should exist"); + }; + + let broadcasted_transaction = match sender_final_action { + SenderFinalAction::SignAndBroadcastPayjoinProposal => + extract_pj_tx(sender, checked_payjoin_proposal_psbt.clone())?, + SenderFinalAction::BroadcastFallbackTransaction => + replay_sender_event_log(send_persister)?.1.fallback_tx(), + }; + sender.send_raw_transaction(&broadcasted_transaction)?; + Ok((broadcasted_transaction, monitoring_payment)) + } + #[test] fn v2_to_v1() -> Result<(), BoxError> { init_tracing(); @@ -961,33 +1731,239 @@ mod integration { let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; sender.send_raw_transaction(&payjoin_tx)?; - // Check resulting transaction and balances - assert_eq!(payjoin_tx.input.len(), 2); - assert_eq!(payjoin_tx.output.len(), 2); - assert_eq!( - receiver.get_balances()?.into_model()?.mine.untrusted_pending, - Amount::from_btc(51.0)? - ); - assert_eq!( - sender.get_balances()?.into_model()?.mine.untrusted_pending, - Amount::from_btc(49.0)? - network_fees - ); - Ok(()) - } + // Check resulting transaction and balances + assert_eq!(payjoin_tx.input.len(), 2); + assert_eq!(payjoin_tx.output.len(), 2); + assert_eq!( + receiver.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(51.0)? + ); + assert_eq!( + sender.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(49.0)? - network_fees + ); + Ok(()) + } + + #[tokio::test] + async fn v2_to_v1_async() -> Result<(), BoxError> { + init_tracing(); + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver(None, None)?; + // Receiver creates the payjoin URI + let pj_receiver_address = receiver.new_address()?; + let mut pj_uri = + build_v1_pj_uri(&pj_receiver_address, EXAMPLE_URL, OutputSubstitution::Enabled)?; + pj_uri.amount = Some(Amount::ONE_BTC); + + // ********************** + // Inside the Sender: + // Create a funded PSBT (not broadcasted) to address with amount given in the pj_uri + let pj_uri = Uri::from_str(&pj_uri.to_string()) + .map_err(|e| e.to_string())? + .assume_checked() + .check_pj_supported() + .map_err(|e| e.to_string())?; + // FIXME this test no longer sends v2 to v1 because that concept is gone and should now be + // Handled by the implementation. Therefore, the e2e test should now test v2-capable sender + // successfully sending to v1. + assert!(matches!(pj_uri.extras.pj_param(), payjoin::PjParam::V1(_))); + let psbt = build_original_psbt(&sender, &pj_uri)?; + let req_ctx = payjoin::send::v1::SenderBuilder::new(psbt, pj_uri) + .build_recommended(FeeRate::BROADCAST_MIN)?; + let (req, ctx) = req_ctx.create_v1_post_request(); + let headers = HeaderMock::new(&req.body, req.content_type); + + // ********************** + // Inside the Receiver: + // this data would transit from one party to another over the network in production + let response = + handle_v1_pj_request_async(req, headers, &receiver, None, None, None).await?; + // this response would be returned as http response to the sender + + // ********************** + // Inside the Sender: + // Sender checks, signs, finalizes, constructs, and broadcasts + let checked_payjoin_proposal_psbt = ctx.process_response(response.as_bytes())?; + let network_fees = checked_payjoin_proposal_psbt.fee()?; + let expected_weight = Weight::from_wu( + TX_HEADER_WEIGHT + (P2WPKH_INPUT_WEIGHT * 2) + (P2WPKH_OUTPUT_WEIGHT * 2), + ); + let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; + assert_eq!(network_fees, expected_fee); + let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; + sender.send_raw_transaction(&payjoin_tx)?; + + // Check resulting transaction and balances + assert_eq!(payjoin_tx.input.len(), 2); + assert_eq!(payjoin_tx.output.len(), 2); + assert_eq!( + receiver.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(51.0)? + ); + assert_eq!( + sender.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(49.0)? - network_fees + ); + Ok(()) + } + + #[tokio::test] + async fn v1_to_v2() -> Result<(), BoxSendSyncError> { + init_tracing(); + let mut services = TestServices::initialize().await?; + let result = tokio::select!( + err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), + err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), + res = do_v1_to_v2(&services) => res + ); + + assert!(result.is_ok(), "v2 send receive failed: {:#?}", result.unwrap_err()); + + let mut services = TestServices::initialize().await?; + let result = tokio::select!( + err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), + err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), + res = do_v1_to_v2_async(&services) => res + ); + + assert!(result.is_ok(), "v2 send receive async failed: {:#?}", result.unwrap_err()); + + async fn do_v1_to_v2(services: &TestServices) -> Result<(), BoxError> { + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver(None, None)?; + let agent = services.http_agent(); + services.wait_for_services_ready().await?; + let ohttp_keys = services.fetch_ohttp_keys().await?; + let recv_persister = InMemoryPersister::default(); + let address = receiver.new_address()?; + let session = ReceiverBuilder::new( + address, + services.directory_url().as_str(), + ohttp_keys.clone(), + )? + .build() + .save(&recv_persister)?; + + // ********************** + // Inside the V1 Sender: + // Create a funded PSBT (not broadcasted) to address with amount given in the pj_uri + let pj_uri = Uri::from_str(&session.pj_uri().to_string()) + .map_err(|e| e.to_string())? + .assume_checked() + .check_pj_supported() + .map_err(|e| e.to_string())?; + let psbt = build_original_psbt(&sender, &pj_uri)?; + let req_ctx = payjoin::send::v1::SenderBuilder::new(psbt, pj_uri) + .build_with_additional_fee( + Amount::from_sat(10000), + None, + FeeRate::ZERO, + false, + )?; + let (Request { url, body, content_type, .. }, send_ctx) = + req_ctx.create_v1_post_request(); + tracing::info!("send fallback v1 to offline receiver fail"); + let res = agent + .post(url.clone()) + .header("Content-Type", content_type) + .body(body.clone()) + .send() + .await; + assert!(res?.status() == StatusCode::SERVICE_UNAVAILABLE); + + // ********************** + // Inside the Receiver: + let agent_clone: Arc = agent.clone(); + let receiver: Arc = Arc::new(receiver); + let receiver_clone = receiver.clone(); + let ohttp_relay = services.ohttp_relay_url().to_string(); + let receiver_loop = tokio::task::spawn(async move { + let agent_clone = agent_clone.clone(); + let proposal = loop { + let (req, ctx) = session.create_poll_request(&ohttp_relay)?; + let response = agent_clone + .post(req.url) + .header("Content-Type", req.content_type) + .body(req.body) + .send() + .await?; + + if response.status() == 200 { + let proposal = session + .clone() + .process_response(response.bytes().await?.to_vec().as_slice(), ctx) + .save(&recv_persister)?; + if let OptionalTransitionOutcome::Progress(unchecked_proposal) = + proposal + { + break unchecked_proposal.clone(); + } else { + tracing::info!( + "No response yet for POST payjoin request, retrying some seconds" + ); + } + } else { + tracing::error!("Unexpected response status: {}", response.status()); + panic!("Unexpected response status: {}", response.status()) + } + }; + let payjoin_proposal = + handle_directory_proposal(&receiver_clone, proposal, &recv_persister, None) + .map_err(|e| e.to_string())?; + // Respond with payjoin psbt within the time window the sender is willing to wait + // this response would be returned as http response to the sender + let (req, ctx) = payjoin_proposal.create_post_request(ohttp_relay)?; + let response = agent_clone + .post(req.url) + .header("Content-Type", req.content_type) + .body(req.body) + .send() + .await?; + payjoin_proposal + .process_response(&response.bytes().await?, ctx) + .save(&recv_persister) + .map_err(|e| e.to_string())?; + Ok::<_, BoxSendSyncError>(()) + }); + + // ********************** + // send fallback v1 to online receiver + tracing::info!("send fallback v1 to online receiver should succeed"); + let response = + agent.post(url).header("Content-Type", content_type).body(body).send().await?; + tracing::info!("Response: {:#?}", &response); + assert!(response.status().is_success(), "error response: {}", response.status()); - #[tokio::test] - async fn v1_to_v2() -> Result<(), BoxSendSyncError> { - init_tracing(); - let mut services = TestServices::initialize().await?; - let result = tokio::select!( - err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err), - err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err), - res = do_v1_to_v2(&services) => res - ); + let checked_payjoin_proposal_psbt = + send_ctx.process_response(&response.bytes().await?)?; + let network_fees = checked_payjoin_proposal_psbt.fee()?; + let expected_weight = Weight::from_wu( + TX_HEADER_WEIGHT + (P2WPKH_INPUT_WEIGHT * 2) + (P2WPKH_OUTPUT_WEIGHT * 2), + ); + let expected_fee = expected_weight * FeeRate::BROADCAST_MIN; + assert_eq!(network_fees, expected_fee); + let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; + sender.send_raw_transaction(&payjoin_tx)?; + tracing::info!("sent"); + assert!( + receiver_loop.await.is_ok(), + "The spawned task panicked or returned an error" + ); - assert!(result.is_ok(), "v2 send receive failed: {:#?}", result.unwrap_err()); + // Check resulting transaction and balances + assert_eq!(payjoin_tx.input.len(), 2); + assert_eq!(payjoin_tx.output.len(), 2); + assert_eq!( + receiver.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(51.0)? + ); + assert_eq!( + sender.get_balances()?.into_model()?.mine.untrusted_pending, + Amount::from_btc(49.0)? - network_fees + ); + Ok(()) + } - async fn do_v1_to_v2(services: &TestServices) -> Result<(), BoxError> { + async fn do_v1_to_v2_async(services: &TestServices) -> Result<(), BoxError> { let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver(None, None)?; let agent = services.http_agent(); services.wait_for_services_ready().await?; @@ -1065,9 +2041,14 @@ mod integration { panic!("Unexpected response status: {}", response.status()) } }; - let payjoin_proposal = - handle_directory_proposal(&receiver_clone, proposal, &recv_persister, None) - .map_err(|e| e.to_string())?; + let payjoin_proposal = handle_directory_proposal_async( + &receiver_clone, + proposal, + &recv_persister, + None, + ) + .await + .map_err(|e| e.to_string())?; // Respond with payjoin psbt within the time window the sender is willing to wait // this response would be returned as http response to the sender let (req, ctx) = payjoin_proposal.create_post_request(ohttp_relay)?; @@ -1121,7 +2102,6 @@ mod integration { ); Ok(()) } - Ok(()) } @@ -1228,6 +2208,123 @@ mod integration { Ok(payjoin) } + async fn handle_directory_proposal_async( + receiver: &corepc_node::Client, + proposal: Receiver, + recv_persister: &impl SessionPersister, + custom_inputs: Option>, + ) -> Result, BoxError> { + // Receive Check 1: Can Broadcast + let proposal = proposal + .check_broadcast_suitability_async(None, |tx| { + let tx = tx.clone(); + async move { + Ok(receiver + .test_mempool_accept(std::slice::from_ref(&tx)) + .map_err(ImplementationError::new)? + .0 + .first() + .ok_or(ImplementationError::from( + "testmempoolaccept should return a result", + ))? + .allowed) + } + }) + .await + .save(recv_persister)?; + + // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx + let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast(); + + // Receive Check 2: receiver can't sign for proposal inputs + let proposal = proposal + .check_inputs_not_owned_async(&mut |input| { + let address = bitcoin::Address::from_script(input, bitcoin::Network::Regtest) + .map_err(ImplementationError::new); + async move { + let address = address?; + receiver + .get_address_info(&address) + .map(|info| info.is_mine) + .map_err(ImplementationError::new) + } + }) + .await + .save(recv_persister)?; + + // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers. + let payjoin = proposal + .check_no_inputs_seen_before_async(&mut |_| async { Ok(false) }) + .await + .save(recv_persister)? + .identify_receiver_outputs_async(&mut |output_script| { + let address = + bitcoin::Address::from_script(output_script, bitcoin::Network::Regtest) + .map_err(ImplementationError::new); + async move { + let address = address?; + receiver + .get_address_info(&address) + .map(|info| info.is_mine) + .map_err(ImplementationError::new) + } + }) + .await + .save(recv_persister)?; + + let payjoin = payjoin.commit_outputs().save(recv_persister)?; + + let inputs = match custom_inputs { + Some(inputs) => inputs, + None => { + let candidate_inputs = receiver + .list_unspent() + .map_err(ImplementationError::new)? + .0 + .into_iter() + .map(input_pair_from_list_unspent); + let selected_input = + payjoin.try_preserving_privacy(candidate_inputs).map_err(|e| { + format!("Failed to make privacy preserving selection: {e:?}") + })?; + vec![selected_input] + } + }; + let payjoin = payjoin + .contribute_inputs(inputs) + .map_err(|e| format!("Failed to contribute inputs: {e:?}"))? + .commit_inputs() + .save(recv_persister)?; + + let payjoin = payjoin + .apply_fee_range( + Some(FeeRate::BROADCAST_MIN), + Some(FeeRate::from_sat_per_vb_u32(2)), + ) + .save(recv_persister)?; + + // Sign and finalize the proposal PSBT + let payjoin = payjoin + .finalize_proposal_async(|psbt: &Psbt| { + let result = receiver + .call::( + "walletprocesspsbt", + &[ + json!(psbt.to_string()), + json!(None as Option), + json!(None as Option<&str>), + json!(Some(true)), + ], + ) + .map(|res| Psbt::from_str(&res.psbt).expect("psbt should be valid")) + .map_err(ImplementationError::new); + async move { result } + }) + .await + .save(recv_persister)?; + Ok(payjoin) + } + pub fn build_sweep_psbt( sender: &corepc_node::Client, pj_uri: &PjUri, @@ -1497,6 +2594,28 @@ mod integration { Ok(psbt.to_string()) } + async fn handle_v1_pj_request_async( + req: Request, + headers: impl payjoin::receive::v1::Headers, + receiver: &corepc_node::Client, + custom_outputs: Option>, + drain_script: Option<&bitcoin::Script>, + custom_inputs: Option>, + ) -> Result { + // Receiver receive payjoin proposal, IRL it will be an HTTP request (over ssl or onion) + let proposal = payjoin::receive::v1::UncheckedOriginalPayload::from_request( + req.body.as_slice(), + Url::from_str(&req.url).expect("Could not parse url").query().unwrap_or(""), + headers, + )?; + let proposal = + handle_proposal_async(proposal, receiver, custom_outputs, drain_script, custom_inputs) + .await?; + let psbt = proposal.psbt(); + tracing::debug!("Receiver's Payjoin proposal PSBT: {psbt:#?}"); + Ok(psbt.to_string()) + } + fn handle_proposal( proposal: payjoin::receive::v1::UncheckedOriginalPayload, receiver: &corepc_node::Client, @@ -1585,6 +2704,108 @@ mod integration { Ok(payjoin_proposal) } + async fn handle_proposal_async( + proposal: payjoin::receive::v1::UncheckedOriginalPayload, + receiver: &corepc_node::Client, + custom_outputs: Option>, + drain_script: Option<&bitcoin::Script>, + custom_inputs: Option>, + ) -> Result { + // Receive Check 1: Can Broadcast + let proposal = proposal + .check_broadcast_suitability_async(None, |tx| { + let tx = tx.clone(); + async move { + Ok(receiver + .test_mempool_accept(std::slice::from_ref(&tx)) + .map_err(ImplementationError::new)? + .0 + .first() + .ok_or(ImplementationError::from( + "testmempoolaccept should return a result", + ))? + .allowed) + } + }) + .await?; + // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx + let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast(); + // Receive Check 2: receiver can't sign for proposal inputs + let proposal = proposal + .check_inputs_not_owned_async(&mut |input| { + let address = bitcoin::Address::from_script(input, bitcoin::Network::Regtest) + .map_err(ImplementationError::new); + async move { + let address = address?; + receiver + .get_address_info(&address) + .map(|info| info.is_mine) + .map_err(ImplementationError::new) + } + }) + .await?; + // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers. + let payjoin = proposal + .check_no_inputs_seen_before_async(&mut |_| async { Ok(false) }) + .await? + .identify_receiver_outputs_async(&mut |output_script| { + let address = + bitcoin::Address::from_script(output_script, bitcoin::Network::Regtest) + .map_err(ImplementationError::new); + async move { + let address = address?; + receiver + .get_address_info(&address) + .map(|info| info.is_mine) + .map_err(ImplementationError::new) + } + }) + .await?; + let payjoin = match custom_outputs { + Some(txos) => payjoin.replace_receiver_outputs( + txos, + drain_script.expect("drain_script should be provided with custom_outputs"), + )?, + None => payjoin.substitute_receiver_script(&receiver.new_address()?.script_pubkey())?, + } + .commit_outputs(); + let inputs = match custom_inputs { + Some(inputs) => inputs, + None => { + let candidate_inputs = + receiver.list_unspent()?.0.into_iter().map(input_pair_from_list_unspent); + let selected_input = payjoin + .try_preserving_privacy(candidate_inputs) + .map_err(|e| format!("Failed to make privacy preserving selection: {e:?}"))?; + vec![selected_input] + } + }; + let payjoin = payjoin + .contribute_inputs(inputs) + .map_err(|e| format!("Failed to contribute inputs: {e:?}"))? + .commit_inputs(); + let payjoin = payjoin + .apply_fee_range(Some(FeeRate::BROADCAST_MIN), Some(FeeRate::from_sat_per_vb_u32(2)))?; + let payjoin_proposal = payjoin + .finalize_proposal_async(|psbt: &Psbt| { + let result = receiver + .call::( + "walletprocesspsbt", + &[ + json!(psbt.to_string()), + json!(None as Option), + json!(None as Option<&str>), + json!(Some(true)), + ], + ) + .map(|res| Psbt::from_str(&res.psbt).expect("psbt should be valid")) + .map_err(ImplementationError::new); + async move { result } + }) + .await?; + Ok(payjoin_proposal) + } + fn extract_pj_tx( sender: &corepc_node::Client, psbt: Psbt,