Skip to content

Commit ff4e60b

Browse files
Initiate and refresh async receive offers
As an async recipient, we need to interactively build static invoices that an always-online node will serve to payers on our behalf. At the start of this process, we send a requests for paths to include in our offers to the always-online node on startup and refresh the cached offers when they start to get stale.
1 parent 6dd4fce commit ff4e60b

File tree

4 files changed

+229
-1
lines changed

4 files changed

+229
-1
lines changed

lightning/src/blinded_path/message.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,21 @@ pub enum OffersContext {
404404
/// [`AsyncPaymentsMessage`]: crate::onion_message::async_payments::AsyncPaymentsMessage
405405
#[derive(Clone, Debug)]
406406
pub enum AsyncPaymentsContext {
407+
/// Context used by a reply path to an [`OfferPathsRequest`], provided back to us as an async
408+
/// recipient in corresponding [`OfferPaths`] messages from the static invoice server.
409+
///
410+
/// [`OfferPathsRequest`]: crate::onion_message::async_payments::OfferPathsRequest
411+
/// [`OfferPaths`]: crate::onion_message::async_payments::OfferPaths
412+
OfferPaths {
413+
/// The time as duration since the Unix epoch at which this path expires and messages sent over
414+
/// it should be ignored.
415+
///
416+
/// This avoids the situation where the [`OfferPaths`] message is very delayed and thus
417+
/// outdated.
418+
///
419+
/// [`OfferPaths`]: crate::onion_message::async_payments::OfferPaths
420+
path_absolute_expiry: core::time::Duration,
421+
},
407422
/// Context contained within the reply [`BlindedMessagePath`] we put in outbound
408423
/// [`HeldHtlcAvailable`] messages, provided back to us in corresponding [`ReleaseHeldHtlc`]
409424
/// messages.
@@ -486,6 +501,9 @@ impl_writeable_tlv_based_enum!(AsyncPaymentsContext,
486501
(2, hmac, required),
487502
(4, path_absolute_expiry, required),
488503
},
504+
(2, OfferPaths) => {
505+
(0, path_absolute_expiry, required),
506+
},
489507
);
490508

491509
/// Contains a simple nonce for use in a blinded path's context.

lightning/src/ln/channelmanager.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5248,6 +5248,20 @@ where
52485248
)
52495249
}
52505250

5251+
#[cfg(async_payments)]
5252+
fn check_refresh_async_receive_offers(&self, timer_tick_occurred: bool) {
5253+
let peers = self.get_peers_for_blinded_path();
5254+
match self.flow.check_refresh_async_receive_offers(peers, timer_tick_occurred) {
5255+
Err(()) => {
5256+
log_error!(
5257+
self.logger,
5258+
"Failed to create blinded paths when requesting async receive offer paths"
5259+
);
5260+
},
5261+
Ok(()) => {},
5262+
}
5263+
}
5264+
52515265
#[cfg(async_payments)]
52525266
fn initiate_async_payment(
52535267
&self, invoice: &StaticInvoice, payment_id: PaymentId,
@@ -7242,6 +7256,9 @@ where
72427256
duration_since_epoch, &self.pending_events
72437257
);
72447258

7259+
#[cfg(async_payments)]
7260+
self.check_refresh_async_receive_offers(true);
7261+
72457262
// Technically we don't need to do this here, but if we have holding cell entries in a
72467263
// channel that need freeing, it's better to do that here and block a background task
72477264
// than block the message queueing pipeline.
@@ -11978,6 +11995,13 @@ where
1197811995
return NotifyOption::SkipPersistHandleEvents;
1197911996
//TODO: Also re-broadcast announcement_signatures
1198011997
});
11998+
11999+
// While we usually refresh the AsyncReceiveOfferCache on a timer, we also want to start
12000+
// interactively building offers as soon as we can after startup. We can't start building offers
12001+
// until we have some peer connection(s) to send onion messages over, so as a minor optimization
12002+
// refresh the cache when a peer connects.
12003+
#[cfg(async_payments)]
12004+
self.check_refresh_async_receive_offers(false);
1198112005
res
1198212006
}
1198312007

lightning/src/offers/async_receive_offer_cache.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,129 @@ impl AsyncReceiveOfferCache {
171171
#[cfg(async_payments)]
172172
const MAX_CACHED_OFFERS_TARGET: usize = 10;
173173

174+
// The max number of times we'll attempt to request offer paths per timer tick.
175+
#[cfg(async_payments)]
176+
const MAX_UPDATE_ATTEMPTS: u8 = 3;
177+
178+
// If we have an offer that is replaceable and its invoice was confirmed as persisted more than 2
179+
// hours ago, we can go ahead and refresh it because we always want to have the freshest offer
180+
// possible when a user goes to retrieve a cached offer.
181+
//
182+
// We avoid replacing unused offers too quickly -- this prevents the case where we send multiple
183+
// invoices from different offers competing for the same slot to the server, messages are received
184+
// delayed or out-of-order, and we end up providing an offer to the user that the server just
185+
// deleted and replaced.
186+
#[cfg(async_payments)]
187+
const OFFER_REFRESH_THRESHOLD: Duration = Duration::from_secs(2 * 60 * 60);
188+
189+
#[cfg(async_payments)]
190+
impl AsyncReceiveOfferCache {
191+
/// Remove expired offers from the cache, returning whether new offers are needed.
192+
pub(super) fn prune_expired_offers(
193+
&mut self, duration_since_epoch: Duration, timer_tick_occurred: bool,
194+
) -> bool {
195+
// Remove expired offers from the cache.
196+
let mut offer_was_removed = false;
197+
for offer_opt in self.offers.iter_mut() {
198+
let offer_is_expired = offer_opt
199+
.as_ref()
200+
.map_or(false, |offer| offer.offer.is_expired_no_std(duration_since_epoch));
201+
if offer_is_expired {
202+
offer_opt.take();
203+
offer_was_removed = true;
204+
}
205+
}
206+
207+
// Allow more offer paths requests to be sent out in a burst roughly once per minute, or if an
208+
// offer was removed.
209+
if timer_tick_occurred || offer_was_removed {
210+
self.reset_offer_paths_request_attempts()
211+
}
212+
213+
self.needs_new_offer_idx(duration_since_epoch).is_some()
214+
&& self.offer_paths_request_attempts < MAX_UPDATE_ATTEMPTS
215+
}
216+
217+
/// If we have any empty slots in the cache or offers where the offer can and should be replaced
218+
/// with a fresh offer, here we return the index of the slot that needs a new offer. The index is
219+
/// used for setting [`ServeStaticInvoice::invoice_slot`] when sending the corresponding new
220+
/// static invoice to the server, so the server knows which existing persisted invoice is being
221+
/// replaced, if any.
222+
///
223+
/// Returns `None` if the cache is full and no offers can currently be replaced.
224+
///
225+
/// [`ServeStaticInvoice::invoice_slot`]: crate::onion_message::async_payments::ServeStaticInvoice::invoice_slot
226+
fn needs_new_offer_idx(&self, duration_since_epoch: Duration) -> Option<usize> {
227+
// If we have any empty offer slots, return the first one we find
228+
let empty_slot_idx_opt = self.offers.iter().position(|offer_opt| offer_opt.is_none());
229+
if empty_slot_idx_opt.is_some() {
230+
return empty_slot_idx_opt;
231+
}
232+
233+
// If all of our offers are already used or pending, then none are available to be replaced
234+
let no_replaceable_offers = self.offers_with_idx().all(|(_, offer)| {
235+
matches!(offer.status, OfferStatus::Used)
236+
|| matches!(offer.status, OfferStatus::Pending)
237+
});
238+
if no_replaceable_offers {
239+
return None;
240+
}
241+
242+
// All offers are pending except for one, so we shouldn't request an update of the only usable
243+
// offer
244+
let num_payable_offers = self
245+
.offers_with_idx()
246+
.filter(|(_, offer)| {
247+
matches!(offer.status, OfferStatus::Used)
248+
|| matches!(offer.status, OfferStatus::Ready { .. })
249+
})
250+
.count();
251+
if num_payable_offers <= 1 {
252+
return None;
253+
}
254+
255+
// Filter for unused offers that were last updated more than two hours ago, so they are stale
256+
// enough to warrant replacement.
257+
let two_hours_ago = duration_since_epoch.saturating_sub(OFFER_REFRESH_THRESHOLD);
258+
self.offers_with_idx()
259+
.filter_map(|(idx, offer)| match offer.status {
260+
OfferStatus::Ready { invoice_confirmed_persisted_at } => {
261+
Some((idx, offer, invoice_confirmed_persisted_at))
262+
},
263+
_ => None,
264+
})
265+
.filter(|(_, _, invoice_confirmed_persisted_at)| {
266+
*invoice_confirmed_persisted_at < two_hours_ago
267+
})
268+
// Get the stalest offer and return its index
269+
.min_by(|a, b| a.2.cmp(&b.2))
270+
.map(|(idx, _, _)| idx)
271+
}
272+
273+
/// Returns an iterator over (offer_idx, offer)
274+
fn offers_with_idx(&self) -> impl Iterator<Item = (usize, &AsyncReceiveOffer)> {
275+
self.offers.iter().enumerate().filter_map(|(idx, offer_opt)| {
276+
if let Some(offer) = offer_opt {
277+
Some((idx, offer))
278+
} else {
279+
None
280+
}
281+
})
282+
}
283+
284+
// Indicates that onion messages requesting new offer paths have been sent to the static invoice
285+
// server. Calling this method allows the cache to self-limit how many requests are sent.
286+
pub(super) fn new_offers_requested(&mut self) {
287+
self.offer_paths_request_attempts += 1;
288+
}
289+
290+
/// Called on timer tick (roughly once per minute) to allow another MAX_UPDATE_ATTEMPTS offer
291+
/// paths requests to go out.
292+
fn reset_offer_paths_request_attempts(&mut self) {
293+
self.offer_paths_request_attempts = 0;
294+
}
295+
}
296+
174297
impl Writeable for AsyncReceiveOfferCache {
175298
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
176299
write_tlv_fields!(w, {

lightning/src/offers/flow.rs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ use {
6666
crate::offers::offer::Amount,
6767
crate::offers::signer,
6868
crate::offers::static_invoice::{StaticInvoice, StaticInvoiceBuilder},
69-
crate::onion_message::async_payments::HeldHtlcAvailable,
69+
crate::onion_message::async_payments::{HeldHtlcAvailable, OfferPathsRequest},
7070
};
7171

7272
#[cfg(feature = "dnssec")]
@@ -238,6 +238,11 @@ where
238238
/// even if multiple invoices are received.
239239
const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
240240

241+
/// The default relative expiry for reply paths where a quick response is expected and the reply
242+
/// path is single-use.
243+
#[cfg(async_payments)]
244+
const TEMP_REPLY_PATH_RELATIVE_EXPIRY: Duration = Duration::from_secs(7200);
245+
241246
impl<MR: Deref> OffersMessageFlow<MR>
242247
where
243248
MR::Target: MessageRouter,
@@ -1126,6 +1131,64 @@ where
11261131
core::mem::take(&mut self.pending_dns_onion_messages.lock().unwrap())
11271132
}
11281133

1134+
/// Sends out [`OfferPathsRequest`] onion messages if we are an often-offline recipient and are
1135+
/// configured to interactively build offers and static invoices with a static invoice server.
1136+
///
1137+
/// # Usage
1138+
///
1139+
/// This method should be called on peer connection and once per minute or so, to keep the offers
1140+
/// cache updated. When calling this method once per minute, SHOULD set `timer_tick_occurred`.
1141+
///
1142+
/// Errors if we failed to create blinded reply paths when sending an [`OfferPathsRequest`] message.
1143+
#[cfg(async_payments)]
1144+
pub(crate) fn check_refresh_async_receive_offers(
1145+
&self, peers: Vec<MessageForwardNode>, timer_tick_occurred: bool,
1146+
) -> Result<(), ()> {
1147+
// Terminate early if this node does not intend to receive async payments.
1148+
if self.paths_to_static_invoice_server.lock().unwrap().is_empty() {
1149+
return Ok(());
1150+
}
1151+
1152+
let duration_since_epoch = self.duration_since_epoch();
1153+
1154+
// Update the cache to remove expired offers, and check to see whether we need new offers to be
1155+
// interactively built with the static invoice server.
1156+
let needs_new_offers = self
1157+
.async_receive_offer_cache
1158+
.lock()
1159+
.unwrap()
1160+
.prune_expired_offers(duration_since_epoch, timer_tick_occurred);
1161+
1162+
// If we need new offers, send out offer paths request messages to the static invoice server.
1163+
if needs_new_offers {
1164+
let context = MessageContext::AsyncPayments(AsyncPaymentsContext::OfferPaths {
1165+
path_absolute_expiry: duration_since_epoch
1166+
.saturating_add(TEMP_REPLY_PATH_RELATIVE_EXPIRY),
1167+
});
1168+
let reply_paths = match self.create_blinded_paths(peers, context) {
1169+
Ok(paths) => paths,
1170+
Err(()) => {
1171+
return Err(());
1172+
},
1173+
};
1174+
1175+
// We can't fail past this point, so indicate to the cache that we've requested new offers.
1176+
self.async_receive_offer_cache.lock().unwrap().new_offers_requested();
1177+
1178+
let mut pending_async_payments_messages =
1179+
self.pending_async_payments_messages.lock().unwrap();
1180+
let message = AsyncPaymentsMessage::OfferPathsRequest(OfferPathsRequest {});
1181+
enqueue_onion_message_with_reply_paths(
1182+
message,
1183+
&self.paths_to_static_invoice_server.lock().unwrap()[..],
1184+
reply_paths,
1185+
&mut pending_async_payments_messages,
1186+
);
1187+
}
1188+
1189+
Ok(())
1190+
}
1191+
11291192
/// Get the `AsyncReceiveOfferCache` for persistence.
11301193
pub(crate) fn writeable_async_receive_offer_cache(&self) -> impl Writeable + '_ {
11311194
&self.async_receive_offer_cache

0 commit comments

Comments
 (0)