Skip to content

Commit 19e6c6e

Browse files
committed
Introduce EventQueueNotifierGuard type
Previously, when enqueuing new events to the `EventQueue`, we'd directly attempt to wake any notifiers/notify any threads waiting on the `Condvar` about the newly available events. This could of course mean we'd notify them while ourselves still holding some locks, e.g., on the peer state. Here, we instead introduce a `EventQueueNotifierGuard` type that will notify about pending events if necesssary, which mitigates any potential lock contention: we now simply have to ensure that any method calling `enqueue` holds the notifier before retrieving any locks.
1 parent 84c6be3 commit 19e6c6e

File tree

5 files changed

+67
-4
lines changed

5 files changed

+67
-4
lines changed

lightning-liquidity/src/events.rs

+45-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub(crate) struct EventQueue {
3131
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
3232
waker: Arc<Mutex<Option<Waker>>>,
3333
#[cfg(feature = "std")]
34-
condvar: crate::sync::Condvar,
34+
condvar: Arc<crate::sync::Condvar>,
3535
}
3636

3737
impl EventQueue {
@@ -40,7 +40,7 @@ impl EventQueue {
4040
let waker = Arc::new(Mutex::new(None));
4141
#[cfg(feature = "std")]
4242
{
43-
let condvar = crate::sync::Condvar::new();
43+
let condvar = Arc::new(crate::sync::Condvar::new());
4444
Self { queue, waker, condvar }
4545
}
4646
#[cfg(not(feature = "std"))]
@@ -100,6 +100,49 @@ impl EventQueue {
100100
pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
101101
self.queue.lock().unwrap().split_off(0).into()
102102
}
103+
104+
// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
105+
pub fn notifier(&self) -> EventQueueNotifierGuard {
106+
#[cfg(feature = "std")]
107+
{
108+
EventQueueNotifierGuard {
109+
queue: Arc::clone(&self.queue),
110+
waker: Arc::clone(&self.waker),
111+
condvar: Arc::clone(&self.condvar),
112+
}
113+
}
114+
#[cfg(not(feature = "std"))]
115+
{
116+
EventQueueNotifierGuard {
117+
queue: Arc::clone(&self.queue),
118+
waker: Arc::clone(&self.waker),
119+
}
120+
}
121+
}
122+
}
123+
124+
// A guard type that will notify about new events when dropped.
125+
#[must_use]
126+
pub(crate) struct EventQueueNotifierGuard {
127+
queue: Arc<Mutex<VecDeque<Event>>>,
128+
waker: Arc<Mutex<Option<Waker>>>,
129+
#[cfg(feature = "std")]
130+
condvar: Arc<crate::sync::Condvar>,
131+
}
132+
133+
impl Drop for EventQueueNotifierGuard {
134+
fn drop(&mut self) {
135+
let should_notify = !self.queue.lock().unwrap().is_empty();
136+
137+
if should_notify {
138+
if let Some(waker) = self.waker.lock().unwrap().take() {
139+
waker.wake();
140+
}
141+
142+
#[cfg(feature = "std")]
143+
self.condvar.notify_one();
144+
}
145+
}
103146
}
104147

105148
/// An event which you should probably take some action in response to.

lightning-liquidity/src/lsps0/client.rs

+2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ where
6161
fn handle_response(
6262
&self, response: LSPS0Response, counterparty_node_id: &PublicKey,
6363
) -> Result<(), LightningError> {
64+
let _event_queue_notifier = self.pending_events.notifier();
65+
6466
match response {
6567
LSPS0Response::ListProtocols(LSPS0ListProtocolsResponse { protocols }) => {
6668
self.pending_events.enqueue(LSPS0ClientEvent::ListProtocolsResponse {

lightning-liquidity/src/lsps1/client.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,9 @@ where
105105
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
106106
result: LSPS1GetInfoResponse,
107107
) -> Result<(), LightningError> {
108-
let outer_state_lock = self.per_peer_state.write().unwrap();
108+
let _event_queue_notifier = self.pending_events.notifier();
109109

110+
let outer_state_lock = self.per_peer_state.write().unwrap();
110111
match outer_state_lock.get(counterparty_node_id) {
111112
Some(inner_state_lock) => {
112113
let mut peer_state_lock = inner_state_lock.lock().unwrap();
@@ -142,6 +143,8 @@ where
142143
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
143144
error: LSPSResponseError,
144145
) -> Result<(), LightningError> {
146+
let _event_queue_notifier = self.pending_events.notifier();
147+
145148
let outer_state_lock = self.per_peer_state.read().unwrap();
146149
match outer_state_lock.get(counterparty_node_id) {
147150
Some(inner_state_lock) => {
@@ -219,6 +222,8 @@ where
219222
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
220223
response: LSPS1CreateOrderResponse,
221224
) -> Result<(), LightningError> {
225+
let _event_queue_notifier = self.pending_events.notifier();
226+
222227
let outer_state_lock = self.per_peer_state.read().unwrap();
223228
match outer_state_lock.get(counterparty_node_id) {
224229
Some(inner_state_lock) => {
@@ -261,6 +266,8 @@ where
261266
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
262267
error: LSPSResponseError,
263268
) -> Result<(), LightningError> {
269+
let _event_queue_notifier = self.pending_events.notifier();
270+
264271
let outer_state_lock = self.per_peer_state.read().unwrap();
265272
match outer_state_lock.get(counterparty_node_id) {
266273
Some(inner_state_lock) => {
@@ -338,6 +345,8 @@ where
338345
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
339346
response: LSPS1CreateOrderResponse,
340347
) -> Result<(), LightningError> {
348+
let _event_queue_notifier = self.pending_events.notifier();
349+
341350
let outer_state_lock = self.per_peer_state.read().unwrap();
342351
match outer_state_lock.get(counterparty_node_id) {
343352
Some(inner_state_lock) => {
@@ -380,6 +389,8 @@ where
380389
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
381390
error: LSPSResponseError,
382391
) -> Result<(), LightningError> {
392+
let _event_queue_notifier = self.pending_events.notifier();
393+
383394
let outer_state_lock = self.per_peer_state.read().unwrap();
384395
match outer_state_lock.get(counterparty_node_id) {
385396
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/client.rs

+4
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ where
184184
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
185185
result: LSPS2GetInfoResponse,
186186
) -> Result<(), LightningError> {
187+
let _event_queue_notifier = self.pending_events.notifier();
188+
187189
let outer_state_lock = self.per_peer_state.read().unwrap();
188190
match outer_state_lock.get(counterparty_node_id) {
189191
Some(inner_state_lock) => {
@@ -250,6 +252,8 @@ where
250252
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
251253
result: LSPS2BuyResponse,
252254
) -> Result<(), LightningError> {
255+
let _event_queue_notifier = self.pending_events.notifier();
256+
253257
let outer_state_lock = self.per_peer_state.read().unwrap();
254258
match outer_state_lock.get(counterparty_node_id) {
255259
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/service.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,8 @@ where
805805
&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
806806
payment_hash: PaymentHash,
807807
) -> Result<(), APIError> {
808+
let _event_queue_notifier = self.pending_events.notifier();
809+
808810
let peer_by_intercept_scid = self.peer_by_intercept_scid.read().unwrap();
809811
if let Some(counterparty_node_id) = peer_by_intercept_scid.get(&intercept_scid) {
810812
let outer_state_lock = self.per_peer_state.read().unwrap();
@@ -1094,6 +1096,7 @@ where
10941096
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
10951097
params: LSPS2GetInfoRequest,
10961098
) -> Result<(), LightningError> {
1099+
let _event_queue_notifier = self.pending_events.notifier();
10971100
let (result, response) = {
10981101
let mut outer_state_lock = self.per_peer_state.write().unwrap();
10991102
let inner_state_lock =
@@ -1113,7 +1116,6 @@ where
11131116
token: params.token,
11141117
};
11151118
self.pending_events.enqueue(event);
1116-
11171119
(Ok(()), msg)
11181120
},
11191121
(e, msg) => (e, msg),
@@ -1130,6 +1132,7 @@ where
11301132
fn handle_buy_request(
11311133
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
11321134
) -> Result<(), LightningError> {
1135+
let _event_queue_notifier = self.pending_events.notifier();
11331136
if let Some(payment_size_msat) = params.payment_size_msat {
11341137
if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
11351138
let response = LSPS2Response::BuyError(LSPSResponseError {

0 commit comments

Comments
 (0)