Skip to content

Commit 6ad40f9

Browse files
authored
Merge pull request #3436 from tnull/2024-12-add-lightning-liquidity-crate
Add `lightning-liquidity` crate to the workspace
2 parents f53a09d + f68c6c5 commit 6ad40f9

39 files changed

+8056
-4
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ members = [
1616
"lightning-transaction-sync",
1717
"lightning-macros",
1818
"lightning-dns-resolver",
19+
"lightning-liquidity",
1920
"possiblyrandom",
2021
]
2122

ci/ci-tests.sh

+5-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ PIN_RELEASE_DEPS # pin the release dependencies in our main workspace
3838
# Starting with version 0.5.9 (there is no .6-.8), the `home` crate has an MSRV of rustc 1.70.0.
3939
[ "$RUSTC_MINOR_VERSION" -lt 70 ] && cargo update -p home --precise "0.5.5" --verbose
4040

41+
# proptest 1.3.0 requires rustc 1.64.0
42+
[ "$RUSTC_MINOR_VERSION" -lt 64 ] && cargo update -p proptest --precise "1.2.0" --verbose
43+
4144
export RUST_BACKTRACE=1
4245

4346
echo -e "\n\nChecking the full workspace."
@@ -58,6 +61,7 @@ WORKSPACE_MEMBERS=(
5861
lightning-transaction-sync
5962
lightning-macros
6063
lightning-dns-resolver
64+
lightning-liquidity
6165
possiblyrandom
6266
)
6367

@@ -110,7 +114,7 @@ echo -e "\n\nTest backtrace-debug builds"
110114
cargo test -p lightning --verbose --color always --features backtrace
111115

112116
echo -e "\n\nTesting no_std builds"
113-
for DIR in lightning-invoice lightning-rapid-gossip-sync; do
117+
for DIR in lightning-invoice lightning-rapid-gossip-sync lightning-liquidity; do
114118
cargo test -p $DIR --verbose --color always --no-default-features
115119
done
116120

lightning-liquidity/Cargo.toml

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
[package]
2+
name = "lightning-liquidity"
3+
version = "0.1.0-alpha.6"
4+
authors = ["John Cantrell <[email protected]>", "Elias Rohrer <[email protected]>"]
5+
homepage = "https://lightningdevkit.org/"
6+
license = "MIT OR Apache-2.0"
7+
edition = "2021"
8+
description = "Types and primitives to integrate a spec-compliant LSP with an LDK-based node."
9+
repository = "https://github.com/lightningdevkit/lightning-liquidity/"
10+
readme = "README.md"
11+
keywords = ["bitcoin", "lightning", "ldk", "bdk"]
12+
categories = ["cryptography::cryptocurrencies"]
13+
14+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
15+
16+
[features]
17+
default = ["std"]
18+
std = ["lightning/std"]
19+
backtrace = ["dep:backtrace"]
20+
21+
[dependencies]
22+
lightning = { version = "0.0.124", path = "../lightning", default-features = false }
23+
lightning-types = { version = "0.1", path = "../lightning-types", default-features = false }
24+
lightning-invoice = { version = "0.32.0", path = "../lightning-invoice", default-features = false, features = ["serde"] }
25+
26+
bitcoin = { version = "0.32.2", default-features = false, features = ["serde"] }
27+
28+
chrono = { version = "0.4", default-features = false, features = ["serde", "alloc"] }
29+
serde = { version = "1.0", default-features = false, features = ["derive", "alloc"] }
30+
serde_json = "1.0"
31+
backtrace = { version = "0.3", optional = true }
32+
33+
[dev-dependencies]
34+
lightning = { version = "0.0.124", path = "../lightning", default-features = false, features = ["_test_utils"] }
35+
lightning-invoice = { version = "0.32.0", path = "../lightning-invoice", default-features = false, features = ["serde", "std"] }
36+
lightning-persister = { version = "0.0.124", path = "../lightning-persister", default-features = false }
37+
lightning-background-processor = { version = "0.0.124", path = "../lightning-background-processor", default-features = false, features = ["std"] }
38+
39+
proptest = "1.0.0"
40+
tokio = { version = "1.35", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }
41+
42+
[lints.rust.unexpected_cfgs]
43+
level = "forbid"
44+
# When adding a new cfg attribute, ensure that it is added to this list.
45+
check-cfg = [
46+
"cfg(lsps1_service)",
47+
"cfg(c_bindings)",
48+
"cfg(backtrace)",
49+
"cfg(ldk_bench)",
50+
]

lightning-liquidity/README.md

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# lightning-liquidity
2+
3+
The goal of this crate is to provide types and primitives to integrate a spec-compliant LSP with an LDK-based node. To this end, this crate provides client-side as well as service-side logic to implement the [LSP specifications].
4+
5+
Currently the following specifications are supported:
6+
- [LSPS0] defines the transport protocol with the LSP over which the other protocols communicate.
7+
- [LSPS1] allows to order Lightning channels from an LSP. This is useful when the client needs
8+
inbound Lightning liquidity for which they are willing and able to pay in bitcoin.
9+
- [LSPS2] allows to generate a special invoice for which, when paid, an LSP will open a "just-in-time".
10+
This is useful for the initial on-boarding of clients as the channel opening fees are deducted
11+
from the incoming payment, i.e., no funds are required client-side to initiate this flow.
12+
13+
To get started, you'll want to setup a `LiquidityManager` and configure it to be the `CustomMessageHandler` of your LDK node. You can then call `LiquidityManager::lsps1_client_handler` / `LiquidityManager::lsps2_client_handler`, or `LiquidityManager::lsps2_service_handler`, to access the respective client-side or service-side handlers.
14+
15+
`LiquidityManager` uses an eventing system to notify the user about important updates to the protocol flow. To this end, you will need to handle events emitted via one of the event handling methods provided by `LiquidityManager`, e.g., `LiquidityManager::next_event`.
16+
17+
[LSP specifications]: https://github.com/BitcoinAndLightningLayerSpecs/lsp
18+
[LSPS0]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS0
19+
[LSPS1]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS1
20+
[LSPS2]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS2

lightning-liquidity/src/events.rs

+245
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
// This file is Copyright its original authors, visible in version control
2+
// history.
3+
//
4+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5+
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7+
// You may not use this file except in accordance with one or both of these
8+
// licenses.
9+
10+
//! Events are surfaced by the library to indicate some action must be taken
11+
//! by the end-user.
12+
//!
13+
//! Because we don't have a built-in runtime, it's up to the end-user to poll
14+
//! [`LiquidityManager::get_and_clear_pending_events`] to receive events.
15+
//!
16+
//! [`LiquidityManager::get_and_clear_pending_events`]: crate::LiquidityManager::get_and_clear_pending_events
17+
18+
use crate::lsps0;
19+
use crate::lsps1;
20+
use crate::lsps2;
21+
use crate::prelude::{Vec, VecDeque};
22+
use crate::sync::{Arc, Mutex};
23+
24+
use core::future::Future;
25+
use core::task::{Poll, Waker};
26+
27+
/// The maximum queue size we allow before starting to drop events.
28+
pub const MAX_EVENT_QUEUE_SIZE: usize = 1000;
29+
30+
pub(crate) struct EventQueue {
31+
queue: Arc<Mutex<VecDeque<Event>>>,
32+
waker: Arc<Mutex<Option<Waker>>>,
33+
#[cfg(feature = "std")]
34+
condvar: crate::sync::Condvar,
35+
}
36+
37+
impl EventQueue {
38+
pub fn new() -> Self {
39+
let queue = Arc::new(Mutex::new(VecDeque::new()));
40+
let waker = Arc::new(Mutex::new(None));
41+
#[cfg(feature = "std")]
42+
{
43+
let condvar = crate::sync::Condvar::new();
44+
Self { queue, waker, condvar }
45+
}
46+
#[cfg(not(feature = "std"))]
47+
Self { queue, waker }
48+
}
49+
50+
pub fn enqueue(&self, event: Event) {
51+
{
52+
let mut queue = self.queue.lock().unwrap();
53+
if queue.len() < MAX_EVENT_QUEUE_SIZE {
54+
queue.push_back(event);
55+
} else {
56+
return;
57+
}
58+
}
59+
60+
if let Some(waker) = self.waker.lock().unwrap().take() {
61+
waker.wake();
62+
}
63+
#[cfg(feature = "std")]
64+
self.condvar.notify_one();
65+
}
66+
67+
pub fn next_event(&self) -> Option<Event> {
68+
self.queue.lock().unwrap().pop_front()
69+
}
70+
71+
pub async fn next_event_async(&self) -> Event {
72+
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
73+
}
74+
75+
#[cfg(feature = "std")]
76+
pub fn wait_next_event(&self) -> Event {
77+
let mut queue = self
78+
.condvar
79+
.wait_while(self.queue.lock().unwrap(), |queue: &mut VecDeque<Event>| queue.is_empty())
80+
.unwrap();
81+
82+
let event = queue.pop_front().expect("non-empty queue");
83+
let should_notify = !queue.is_empty();
84+
85+
drop(queue);
86+
87+
if should_notify {
88+
if let Some(waker) = self.waker.lock().unwrap().take() {
89+
waker.wake();
90+
}
91+
92+
self.condvar.notify_one();
93+
}
94+
95+
event
96+
}
97+
98+
pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
99+
self.queue.lock().unwrap().split_off(0).into()
100+
}
101+
}
102+
103+
/// An event which you should probably take some action in response to.
104+
#[derive(Debug, Clone, PartialEq, Eq)]
105+
pub enum Event {
106+
/// An LSPS0 client event.
107+
LSPS0Client(lsps0::event::LSPS0ClientEvent),
108+
/// An LSPS1 (Channel Request) client event.
109+
LSPS1Client(lsps1::event::LSPS1ClientEvent),
110+
/// An LSPS1 (Channel Request) server event.
111+
#[cfg(lsps1_service)]
112+
LSPS1Service(lsps1::event::LSPS1ServiceEvent),
113+
/// An LSPS2 (JIT Channel) client event.
114+
LSPS2Client(lsps2::event::LSPS2ClientEvent),
115+
/// An LSPS2 (JIT Channel) server event.
116+
LSPS2Service(lsps2::event::LSPS2ServiceEvent),
117+
}
118+
119+
struct EventFuture {
120+
event_queue: Arc<Mutex<VecDeque<Event>>>,
121+
waker: Arc<Mutex<Option<Waker>>>,
122+
}
123+
124+
impl Future for EventFuture {
125+
type Output = Event;
126+
127+
fn poll(
128+
self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
129+
) -> core::task::Poll<Self::Output> {
130+
if let Some(event) = self.event_queue.lock().unwrap().pop_front() {
131+
Poll::Ready(event)
132+
} else {
133+
*self.waker.lock().unwrap() = Some(cx.waker().clone());
134+
Poll::Pending
135+
}
136+
}
137+
}
138+
139+
#[cfg(test)]
140+
mod tests {
141+
#[tokio::test]
142+
#[cfg(feature = "std")]
143+
async fn event_queue_works() {
144+
use super::*;
145+
use crate::lsps0::event::LSPS0ClientEvent;
146+
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
147+
use core::sync::atomic::{AtomicU16, Ordering};
148+
use std::sync::Arc;
149+
use std::time::Duration;
150+
151+
let event_queue = Arc::new(EventQueue::new());
152+
assert_eq!(event_queue.next_event(), None);
153+
154+
let secp_ctx = Secp256k1::new();
155+
let counterparty_node_id =
156+
PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap());
157+
let expected_event = Event::LSPS0Client(LSPS0ClientEvent::ListProtocolsResponse {
158+
counterparty_node_id,
159+
protocols: Vec::new(),
160+
});
161+
162+
for _ in 0..3 {
163+
event_queue.enqueue(expected_event.clone());
164+
}
165+
166+
assert_eq!(event_queue.wait_next_event(), expected_event);
167+
assert_eq!(event_queue.next_event_async().await, expected_event);
168+
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
169+
assert_eq!(event_queue.next_event(), None);
170+
171+
// Check `next_event_async` won't return if the queue is empty and always rather timeout.
172+
tokio::select! {
173+
_ = tokio::time::sleep(Duration::from_millis(10)) => {
174+
// Timeout
175+
}
176+
_ = event_queue.next_event_async() => {
177+
panic!();
178+
}
179+
}
180+
assert_eq!(event_queue.next_event(), None);
181+
182+
// Check we get the expected number of events when polling/enqueuing concurrently.
183+
let enqueued_events = AtomicU16::new(0);
184+
let received_events = AtomicU16::new(0);
185+
let mut delayed_enqueue = false;
186+
187+
for _ in 0..25 {
188+
event_queue.enqueue(expected_event.clone());
189+
enqueued_events.fetch_add(1, Ordering::SeqCst);
190+
}
191+
192+
loop {
193+
tokio::select! {
194+
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
195+
event_queue.enqueue(expected_event.clone());
196+
enqueued_events.fetch_add(1, Ordering::SeqCst);
197+
delayed_enqueue = true;
198+
}
199+
e = event_queue.next_event_async() => {
200+
assert_eq!(e, expected_event);
201+
received_events.fetch_add(1, Ordering::SeqCst);
202+
203+
event_queue.enqueue(expected_event.clone());
204+
enqueued_events.fetch_add(1, Ordering::SeqCst);
205+
}
206+
e = event_queue.next_event_async() => {
207+
assert_eq!(e, expected_event);
208+
received_events.fetch_add(1, Ordering::SeqCst);
209+
}
210+
}
211+
212+
if delayed_enqueue
213+
&& received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
214+
{
215+
break;
216+
}
217+
}
218+
assert_eq!(event_queue.next_event(), None);
219+
220+
// Check we operate correctly, even when mixing and matching blocking and async API calls.
221+
let (tx, mut rx) = tokio::sync::watch::channel(());
222+
let thread_queue = Arc::clone(&event_queue);
223+
let thread_event = expected_event.clone();
224+
std::thread::spawn(move || {
225+
let e = thread_queue.wait_next_event();
226+
assert_eq!(e, thread_event);
227+
tx.send(()).unwrap();
228+
});
229+
230+
let thread_queue = Arc::clone(&event_queue);
231+
let thread_event = expected_event.clone();
232+
std::thread::spawn(move || {
233+
// Sleep a bit before we enqueue the events everybody is waiting for.
234+
std::thread::sleep(Duration::from_millis(20));
235+
thread_queue.enqueue(thread_event.clone());
236+
thread_queue.enqueue(thread_event.clone());
237+
});
238+
239+
let e = event_queue.next_event_async().await;
240+
assert_eq!(e, expected_event.clone());
241+
242+
rx.changed().await.unwrap();
243+
assert_eq!(event_queue.next_event(), None);
244+
}
245+
}

0 commit comments

Comments
 (0)