Skip to content

Commit 538e42c

Browse files
committed
f Add PayjoinScheduler
1 parent e726227 commit 538e42c

File tree

6 files changed

+205
-132
lines changed

6 files changed

+205
-132
lines changed

src/builder.rs

+12
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::logger::{log_error, FilesystemLogger, Logger};
1212
use crate::message_handler::NodeCustomMessageHandler;
1313
use crate::payment_store::PaymentStore;
1414
use crate::peer_store::PeerStore;
15+
use crate::pj::{PayjoinExecuter, PayjoinScheduler, PendingChannels};
1516
use crate::sweep::OutputSweeper;
1617
use crate::tx_broadcaster::TransactionBroadcaster;
1718
use crate::types::{
@@ -943,6 +944,15 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
943944
},
944945
};
945946

947+
let payjoin_executer = Arc::new(PayjoinExecuter::new(
948+
Arc::clone(&wallet),
949+
Arc::clone(&logger),
950+
Arc::clone(&peer_manager),
951+
Arc::clone(&channel_manager),
952+
));
953+
954+
let pending_channels = Arc::new(Mutex::new(PendingChannels::new()));
955+
946956
let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
947957

948958
Ok(Node {
@@ -958,6 +968,8 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
958968
channel_manager,
959969
chain_monitor,
960970
output_sweeper,
971+
payjoin_executer,
972+
pending_channels,
961973
peer_manager,
962974
keys_manager,
963975
network_graph,

src/config.rs

+6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10;
4444
// The length in bytes of our wallets' keys seed.
4545
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;
4646

47+
// Port used by the Payjoin HTTP server.
48+
pub(crate) const DEFAULT_PAYJOIN_HTTP_SERVER_PORT: u16 = 3227;
49+
4750
#[derive(Debug, Clone)]
4851
/// Represents the configuration of an [`Node`] instance.
4952
///
@@ -104,6 +107,8 @@ pub struct Config {
104107
///
105108
/// Any messages below this level will be excluded from the logs.
106109
pub log_level: LogLevel,
110+
/// Payjoin server port
111+
pub payjoin_server_port: u16,
107112
}
108113

109114
impl Default for Config {
@@ -120,6 +125,7 @@ impl Default for Config {
120125
trusted_peers_0conf: Vec::new(),
121126
probing_liquidity_limit_multiplier: DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER,
122127
log_level: DEFAULT_LOG_LEVEL,
128+
payjoin_server_port: DEFAULT_PAYJOIN_HTTP_SERVER_PORT,
123129
}
124130
}
125131
}

src/lib.rs

+53-13
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,18 @@ mod wallet;
100100
pub use bip39;
101101
pub use bitcoin;
102102
pub use lightning;
103+
use lightning::routing::gossip::NodeId;
103104
pub use lightning_invoice;
104105

105106
pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance};
106107
pub use config::{default_config, Config};
107108
pub use error::Error as NodeError;
108109
use error::Error;
109110

110-
use ::payjoin::Uri;
111+
use payjoin::Uri;
111112
pub use event::Event;
113+
use pj::{PayjoinExecuter, PayjoinScheduler, PendingChannels};
114+
use tokio::sync::mpsc;
112115
pub use types::ChannelConfig;
113116

114117
pub use io::utils::generate_entropy_mnemonic;
@@ -172,6 +175,8 @@ use std::sync::{Arc, Mutex, RwLock};
172175
use std::time::{Duration, Instant, SystemTime};
173176
use std::{default::Default, str::FromStr};
174177

178+
use crate::pj::{PayjoinRequest, PayjoinResponse, ScheduledChannel};
179+
175180
#[cfg(feature = "uniffi")]
176181
uniffi::include_scaffolding!("ldk_node");
177182

@@ -191,6 +196,8 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
191196
channel_manager: Arc<ChannelManager<K>>,
192197
chain_monitor: Arc<ChainMonitor<K>>,
193198
output_sweeper: Arc<Sweeper<K>>,
199+
pending_channels: Arc<Mutex<PendingChannels>>,
200+
payjoin_executer: Arc<PayjoinExecuter<K>>,
194201
peer_manager: Arc<PeerManager<K>>,
195202
keys_manager: Arc<KeysManager>,
196203
network_graph: Arc<NetworkGraph>,
@@ -470,21 +477,29 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
470477
use tokio::net::TcpListener;
471478

472479
// Start the HTTP server for payjoin
473-
let wallet = Arc::clone(&self.wallet);
480+
let (payjoin_queue_sender, mut payjoin_queue_receiver) = mpsc::channel::<PayjoinRequest>(1);
481+
let executor = Arc::clone(&self.payjoin_executer);
482+
// listen for payjoin_queue_receiver
483+
runtime.spawn(async move {
484+
loop {
485+
let payjoin_request = payjoin_queue_receiver.recv().await.unwrap();
486+
Self::create_channel_from_pj(payjoin_request, executor.clone()).await;
487+
}
488+
});
489+
490+
let pj_port = self.config.payjoin_server_port;
474491
runtime.spawn(async move {
475-
let addr = SocketAddr::from(([127, 0, 0, 1], PAYJOIN_HTTP_SERVER_PORT));
492+
let addr = SocketAddr::from(([127, 0, 0, 1], pj_port));
476493
let listener = TcpListener::bind(addr).await.unwrap();
477494
dbg!("Started HTTP server on http://{}", addr);
478-
// let our_pubkey= wallet.get_new_address().unwrap().script_pubkey().into_bytes();
479-
let create_channel_request = pj::CreateChannelRequest::init(wallet);
495+
let pj_scheduler = pj::PayjoinScheduler::new(payjoin_queue_sender);
480496
loop {
481497
let (stream, _) = listener.accept().await.unwrap();
482498
let io = TokioIo::new(stream);
483-
let clone_ccr = create_channel_request.clone();
499+
let clone_pj_scheduler = pj_scheduler.clone();
484500
tokio::task::spawn(async move {
485-
if let Err(err) = http1::Builder::new()
486-
.serve_connection(io, clone_ccr)
487-
.await
501+
if let Err(err) =
502+
http1::Builder::new().serve_connection(io, clone_pj_scheduler).await
488503
{
489504
println!("Failed to serve connection: {:?}", err);
490505
}
@@ -705,11 +720,36 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
705720
self.runtime.read().unwrap().is_some()
706721
}
707722

708-
/// Request Payjoin Payment
709-
pub fn payjoin_uri(&self) -> Result<String, Error> {
723+
async fn create_channel_from_pj(
724+
pj_req: PayjoinRequest, _payjoin_executer: Arc<PayjoinExecuter<K>>,
725+
) {
726+
let psbt = pj_req.clone().original_psbt();
727+
let pj_response = PayjoinResponse::new(psbt);
728+
// Send OpenChannel message
729+
// Wait for AcceptChannel message
730+
// Send FundingCreated message
731+
// Wait for FundingSigned message
732+
// Build PayjoinResponse
733+
// Send PayjoinResponse to queue
734+
pj_req.clone().queue(pj_response);
735+
}
736+
737+
/// Request a new channel to be opened with a remote peer.
738+
pub fn payjoin_channel(
739+
&self, channel_amount_sats: u64, push_msat: Option<u64>, announce_channel: bool,
740+
node_id: PublicKey,
741+
) -> Result<String, Error> {
742+
let user_channel_id: u128 = rand::thread_rng().gen::<u128>();
743+
self.pending_channels.lock().unwrap().push(ScheduledChannel::new(
744+
channel_amount_sats,
745+
push_msat,
746+
user_channel_id,
747+
announce_channel,
748+
node_id,
749+
));
710750
let address = self.wallet.get_new_address()?;
711-
let amount = Amount::from_sat(1000);
712-
let pj = "https://0.0.0.0:3227/payjoin";
751+
let amount = Amount::from_sat(channel_amount_sats);
752+
let pj = format!("https://0.0.0.0:{}/payjoin", self.config.payjoin_server_port);
713753
let pj_uri_string = format!("{}?amount={}&pj={}", address.to_qr_uri(), amount.to_btc(), pj);
714754
assert!(Uri::from_str(&pj_uri_string).is_ok());
715755
Ok(pj_uri_string)

0 commit comments

Comments
 (0)