Skip to content

Commit cbc89da

Browse files
Initiate and refresh async receive offers
As an async recipient, we need to interactively build static invoices that an always-online node will serve to payers on our behalf. At the start of this process, we send a requests for paths to include in our offers to the always-online node on startup and refresh the cached offers when they start to get stale.
1 parent f2909d0 commit cbc89da

File tree

4 files changed

+234
-1
lines changed

4 files changed

+234
-1
lines changed

lightning/src/blinded_path/message.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,21 @@ pub enum OffersContext {
404404
/// [`AsyncPaymentsMessage`]: crate::onion_message::async_payments::AsyncPaymentsMessage
405405
#[derive(Clone, Debug)]
406406
pub enum AsyncPaymentsContext {
407+
/// Context used by a reply path to an [`OfferPathsRequest`], provided back to us as an async
408+
/// recipient in corresponding [`OfferPaths`] messages from the static invoice server.
409+
///
410+
/// [`OfferPathsRequest`]: crate::onion_message::async_payments::OfferPathsRequest
411+
/// [`OfferPaths`]: crate::onion_message::async_payments::OfferPaths
412+
OfferPaths {
413+
/// The time as duration since the Unix epoch at which this path expires and messages sent over
414+
/// it should be ignored.
415+
///
416+
/// This avoids the situation where the [`OfferPaths`] message is very delayed and thus
417+
/// outdated.
418+
///
419+
/// [`OfferPaths`]: crate::onion_message::async_payments::OfferPaths
420+
path_absolute_expiry: core::time::Duration,
421+
},
407422
/// Context contained within the reply [`BlindedMessagePath`] we put in outbound
408423
/// [`HeldHtlcAvailable`] messages, provided back to us in corresponding [`ReleaseHeldHtlc`]
409424
/// messages.
@@ -486,6 +501,9 @@ impl_writeable_tlv_based_enum!(AsyncPaymentsContext,
486501
(2, hmac, required),
487502
(4, path_absolute_expiry, required),
488503
},
504+
(2, OfferPaths) => {
505+
(0, path_absolute_expiry, required),
506+
},
489507
);
490508

491509
/// Contains a simple nonce for use in a blinded path's context.

lightning/src/ln/channelmanager.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5248,6 +5248,20 @@ where
52485248
)
52495249
}
52505250

5251+
#[cfg(async_payments)]
5252+
fn check_refresh_async_receive_offer_cache(&self, timer_tick_occurred: bool) {
5253+
let peers = self.get_peers_for_blinded_path();
5254+
match self.flow.check_refresh_async_receive_offer_cache(peers, timer_tick_occurred) {
5255+
Err(()) => {
5256+
log_error!(
5257+
self.logger,
5258+
"Failed to create blinded paths when requesting async receive offer paths"
5259+
);
5260+
},
5261+
Ok(()) => {},
5262+
}
5263+
}
5264+
52515265
#[cfg(async_payments)]
52525266
fn initiate_async_payment(
52535267
&self, invoice: &StaticInvoice, payment_id: PaymentId,
@@ -7242,6 +7256,9 @@ where
72427256
duration_since_epoch, &self.pending_events
72437257
);
72447258

7259+
#[cfg(async_payments)]
7260+
self.check_refresh_async_receive_offer_cache(true);
7261+
72457262
// Technically we don't need to do this here, but if we have holding cell entries in a
72467263
// channel that need freeing, it's better to do that here and block a background task
72477264
// than block the message queueing pipeline.
@@ -11978,6 +11995,13 @@ where
1197811995
return NotifyOption::SkipPersistHandleEvents;
1197911996
//TODO: Also re-broadcast announcement_signatures
1198011997
});
11998+
11999+
// While we usually refresh the AsyncReceiveOfferCache on a timer, we also want to start
12000+
// interactively building offers as soon as we can after startup. We can't start building offers
12001+
// until we have some peer connection(s) to send onion messages over, so as a minor optimization
12002+
// refresh the cache when a peer connects.
12003+
#[cfg(async_payments)]
12004+
self.check_refresh_async_receive_offer_cache(false);
1198112005
res
1198212006
}
1198312007

lightning/src/offers/async_receive_offer_cache.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,133 @@ impl AsyncReceiveOfferCache {
172172
#[cfg(async_payments)]
173173
const MAX_CACHED_OFFERS_TARGET: usize = 10;
174174

175+
// The max number of times we'll attempt to request offer paths per timer tick.
176+
#[cfg(async_payments)]
177+
const MAX_UPDATE_ATTEMPTS: u8 = 3;
178+
179+
// If we have an offer that is replaceable and its invoice was confirmed as persisted more than 2
180+
// hours ago, we can go ahead and refresh it because we always want to have the freshest offer
181+
// possible when a user goes to retrieve a cached offer.
182+
//
183+
// We avoid replacing unused offers too quickly -- this prevents the case where we send multiple
184+
// invoices from different offers competing for the same slot to the server, messages are received
185+
// delayed or out-of-order, and we end up providing an offer to the user that the server just
186+
// deleted and replaced.
187+
#[cfg(async_payments)]
188+
const OFFER_REFRESH_THRESHOLD: Duration = Duration::from_secs(2 * 60 * 60);
189+
190+
#[cfg(async_payments)]
191+
impl AsyncReceiveOfferCache {
192+
/// Remove expired offers from the cache, returning whether new offers are needed.
193+
pub(super) fn prune_expired_offers(
194+
&mut self, duration_since_epoch: Duration, force_reset_request_attempts: bool,
195+
) -> bool {
196+
// Remove expired offers from the cache.
197+
let mut offer_was_removed = false;
198+
for offer_opt in self.offers.iter_mut() {
199+
let offer_is_expired = offer_opt
200+
.as_ref()
201+
.map_or(false, |offer| offer.offer.is_expired_no_std(duration_since_epoch));
202+
if offer_is_expired {
203+
offer_opt.take();
204+
offer_was_removed = true;
205+
}
206+
}
207+
208+
// Allow up to `MAX_UPDATE_ATTEMPTS` offer paths requests to be sent out roughly once per
209+
// minute, or if an offer was removed.
210+
if force_reset_request_attempts || offer_was_removed {
211+
self.reset_offer_paths_request_attempts()
212+
}
213+
214+
self.needs_new_offer_idx(duration_since_epoch).is_some()
215+
&& self.offer_paths_request_attempts < MAX_UPDATE_ATTEMPTS
216+
}
217+
218+
/// If we have any empty slots in the cache or offers that can and should be replaced with a fresh
219+
/// offer, here we return the index of the slot that needs a new offer. The index is used for
220+
/// setting [`ServeStaticInvoice::invoice_slot`] when sending the corresponding new static invoice
221+
/// to the server, so the server knows which existing persisted invoice is being replaced, if any.
222+
///
223+
/// Returns `None` if the cache is full and no offers can currently be replaced.
224+
///
225+
/// [`ServeStaticInvoice::invoice_slot`]: crate::onion_message::async_payments::ServeStaticInvoice::invoice_slot
226+
fn needs_new_offer_idx(&self, duration_since_epoch: Duration) -> Option<usize> {
227+
// If we have any empty offer slots, return the first one we find
228+
let empty_slot_idx_opt = self.offers.iter().position(|offer_opt| offer_opt.is_none());
229+
if empty_slot_idx_opt.is_some() {
230+
return empty_slot_idx_opt;
231+
}
232+
233+
// If all of our offers are already used or pending, then none are available to be replaced
234+
let no_replaceable_offers = self
235+
.offers_with_idx()
236+
.all(|(_, offer)| matches!(offer.status, OfferStatus::Used | OfferStatus::Pending));
237+
if no_replaceable_offers {
238+
return None;
239+
}
240+
241+
// All offers are pending except for one, so we shouldn't request an update of the only usable
242+
// offer
243+
let num_payable_offers = self
244+
.offers_with_idx()
245+
.filter(|(_, offer)| {
246+
matches!(offer.status, OfferStatus::Used | OfferStatus::Ready { .. })
247+
})
248+
.count();
249+
if num_payable_offers <= 1 {
250+
return None;
251+
}
252+
253+
// Filter for unused offers where longer than OFFER_REFRESH_THRESHOLD time has passed since they
254+
// were last updated, so they are stale enough to warrant replacement.
255+
let awhile_ago = duration_since_epoch.saturating_sub(OFFER_REFRESH_THRESHOLD);
256+
self.unused_offers()
257+
.filter(|(_, _, invoice_confirmed_persisted_at)| {
258+
*invoice_confirmed_persisted_at < awhile_ago
259+
})
260+
// Get the stalest offer and return its index
261+
.min_by(|(_, _, persisted_at_a), (_, _, persisted_at_b)| {
262+
persisted_at_a.cmp(&persisted_at_b)
263+
})
264+
.map(|(idx, _, _)| idx)
265+
}
266+
267+
/// Returns an iterator over (offer_idx, offer)
268+
fn offers_with_idx(&self) -> impl Iterator<Item = (usize, &AsyncReceiveOffer)> {
269+
self.offers.iter().enumerate().filter_map(|(idx, offer_opt)| {
270+
if let Some(offer) = offer_opt {
271+
Some((idx, offer))
272+
} else {
273+
None
274+
}
275+
})
276+
}
277+
278+
/// Returns an iterator over (offer_idx, offer, invoice_confirmed_persisted_at)
279+
/// where all returned offers are `OfferStatus::Ready`
280+
fn unused_offers(&self) -> impl Iterator<Item = (usize, &AsyncReceiveOffer, Duration)> {
281+
self.offers_with_idx().filter_map(|(idx, offer)| match offer.status {
282+
OfferStatus::Ready { invoice_confirmed_persisted_at } => {
283+
Some((idx, offer, invoice_confirmed_persisted_at))
284+
},
285+
_ => None,
286+
})
287+
}
288+
289+
// Indicates that onion messages requesting new offer paths have been sent to the static invoice
290+
// server. Calling this method allows the cache to self-limit how many requests are sent.
291+
pub(super) fn new_offers_requested(&mut self) {
292+
self.offer_paths_request_attempts += 1;
293+
}
294+
295+
/// Called on timer tick (roughly once per minute) to allow another MAX_UPDATE_ATTEMPTS offer
296+
/// paths requests to go out.
297+
fn reset_offer_paths_request_attempts(&mut self) {
298+
self.offer_paths_request_attempts = 0;
299+
}
300+
}
301+
175302
impl Writeable for AsyncReceiveOfferCache {
176303
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
177304
write_tlv_fields!(w, {

lightning/src/offers/flow.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ use {
6666
crate::offers::offer::Amount,
6767
crate::offers::signer,
6868
crate::offers::static_invoice::{StaticInvoice, StaticInvoiceBuilder},
69-
crate::onion_message::async_payments::HeldHtlcAvailable,
69+
crate::onion_message::async_payments::{HeldHtlcAvailable, OfferPathsRequest},
7070
};
7171

7272
#[cfg(feature = "dnssec")]
@@ -238,6 +238,11 @@ where
238238
/// even if multiple invoices are received.
239239
const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
240240

241+
/// The default relative expiry for reply paths where a quick response is expected and the reply
242+
/// path is single-use.
243+
#[cfg(async_payments)]
244+
const TEMP_REPLY_PATH_RELATIVE_EXPIRY: Duration = Duration::from_secs(7200);
245+
241246
impl<MR: Deref> OffersMessageFlow<MR>
242247
where
243248
MR::Target: MessageRouter,
@@ -1126,6 +1131,65 @@ where
11261131
core::mem::take(&mut self.pending_dns_onion_messages.lock().unwrap())
11271132
}
11281133

1134+
/// Sends out [`OfferPathsRequest`] onion messages if we are an often-offline recipient and are
1135+
/// configured to interactively build offers and static invoices with a static invoice server.
1136+
///
1137+
/// # Usage
1138+
///
1139+
/// This method should be called on peer connection and once per minute or so, to keep the offers
1140+
/// cache updated. When calling this method once per minute, SHOULD set `timer_tick_occurred` so
1141+
/// the cache can self-regulate the number of messages sent out.
1142+
///
1143+
/// Errors if we failed to create blinded reply paths when sending an [`OfferPathsRequest`] message.
1144+
#[cfg(async_payments)]
1145+
pub(crate) fn check_refresh_async_receive_offer_cache(
1146+
&self, peers: Vec<MessageForwardNode>, timer_tick_occurred: bool,
1147+
) -> Result<(), ()> {
1148+
// Terminate early if this node does not intend to receive async payments.
1149+
if self.paths_to_static_invoice_server.lock().unwrap().is_empty() {
1150+
return Ok(());
1151+
}
1152+
1153+
let duration_since_epoch = self.duration_since_epoch();
1154+
1155+
// Update the cache to remove expired offers, and check to see whether we need new offers to be
1156+
// interactively built with the static invoice server.
1157+
let needs_new_offers = self
1158+
.async_receive_offer_cache
1159+
.lock()
1160+
.unwrap()
1161+
.prune_expired_offers(duration_since_epoch, timer_tick_occurred);
1162+
1163+
// If we need new offers, send out offer paths request messages to the static invoice server.
1164+
if needs_new_offers {
1165+
let context = MessageContext::AsyncPayments(AsyncPaymentsContext::OfferPaths {
1166+
path_absolute_expiry: duration_since_epoch
1167+
.saturating_add(TEMP_REPLY_PATH_RELATIVE_EXPIRY),
1168+
});
1169+
let reply_paths = match self.create_blinded_paths(peers, context) {
1170+
Ok(paths) => paths,
1171+
Err(()) => {
1172+
return Err(());
1173+
},
1174+
};
1175+
1176+
// We can't fail past this point, so indicate to the cache that we've requested new offers.
1177+
self.async_receive_offer_cache.lock().unwrap().new_offers_requested();
1178+
1179+
let mut pending_async_payments_messages =
1180+
self.pending_async_payments_messages.lock().unwrap();
1181+
let message = AsyncPaymentsMessage::OfferPathsRequest(OfferPathsRequest {});
1182+
enqueue_onion_message_with_reply_paths(
1183+
message,
1184+
&self.paths_to_static_invoice_server.lock().unwrap()[..],
1185+
reply_paths,
1186+
&mut pending_async_payments_messages,
1187+
);
1188+
}
1189+
1190+
Ok(())
1191+
}
1192+
11291193
/// Get the `AsyncReceiveOfferCache` for persistence.
11301194
pub(crate) fn writeable_async_receive_offer_cache(&self) -> impl Writeable + '_ {
11311195
&self.async_receive_offer_cache

0 commit comments

Comments
 (0)