Skip to content

Commit d988c46

Browse files
Initiate and refresh async receive offers
As an async recipient, we need to interactively build static invoices that an always-online node will serve to payers on our behalf. At the start of this process, we send a requests for paths to include in our offers to the always-online node on startup and refresh the cached offers when they start to get stale.
1 parent 5cd2b7d commit d988c46

File tree

4 files changed

+232
-1
lines changed

4 files changed

+232
-1
lines changed

lightning/src/blinded_path/message.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,19 @@ pub enum OffersContext {
404404
/// [`AsyncPaymentsMessage`]: crate::onion_message::async_payments::AsyncPaymentsMessage
405405
#[derive(Clone, Debug)]
406406
pub enum AsyncPaymentsContext {
407+
/// Context used by a reply path to an [`OfferPathsRequest`], provided back to us as an async
408+
/// recipient in corresponding [`OfferPaths`] messages from the static invoice server.
409+
///
410+
/// [`OfferPathsRequest`]: crate::onion_message::async_payments::OfferPathsRequest
411+
/// [`OfferPaths`]: crate::onion_message::async_payments::OfferPaths
412+
OfferPaths {
413+
/// The time as duration since the Unix epoch at which this path expires and messages sent over
414+
/// it should be ignored.
415+
///
416+
/// As an async recipient we use this field to time out a static invoice server from sending us
417+
/// offer paths if we are no longer configured to accept paths from them.
418+
path_absolute_expiry: core::time::Duration,
419+
},
407420
/// Context contained within the reply [`BlindedMessagePath`] we put in outbound
408421
/// [`HeldHtlcAvailable`] messages, provided back to us in corresponding [`ReleaseHeldHtlc`]
409422
/// messages.
@@ -486,6 +499,9 @@ impl_writeable_tlv_based_enum!(AsyncPaymentsContext,
486499
(2, hmac, required),
487500
(4, path_absolute_expiry, required),
488501
},
502+
(2, OfferPaths) => {
503+
(0, path_absolute_expiry, required),
504+
},
489505
);
490506

491507
/// Contains a simple nonce for use in a blinded path's context.

lightning/src/ln/channelmanager.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5139,6 +5139,20 @@ where
51395139
)
51405140
}
51415141

5142+
#[cfg(async_payments)]
5143+
fn check_refresh_async_receive_offers(&self, timer_tick_occurred: bool) {
5144+
let peers = self.get_peers_for_blinded_path();
5145+
match self.flow.check_refresh_async_receive_offers(peers, timer_tick_occurred) {
5146+
Err(()) => {
5147+
log_error!(
5148+
self.logger,
5149+
"Failed to create blinded paths when requesting async receive offer paths"
5150+
);
5151+
},
5152+
Ok(()) => {},
5153+
}
5154+
}
5155+
51425156
#[cfg(async_payments)]
51435157
fn initiate_async_payment(
51445158
&self, invoice: &StaticInvoice, payment_id: PaymentId,
@@ -7133,6 +7147,9 @@ where
71337147
duration_since_epoch, &self.pending_events
71347148
);
71357149

7150+
#[cfg(async_payments)]
7151+
self.check_refresh_async_receive_offers(true);
7152+
71367153
// Technically we don't need to do this here, but if we have holding cell entries in a
71377154
// channel that need freeing, it's better to do that here and block a background task
71387155
// than block the message queueing pipeline.
@@ -11664,6 +11681,13 @@ where
1166411681
return NotifyOption::SkipPersistHandleEvents;
1166511682
//TODO: Also re-broadcast announcement_signatures
1166611683
});
11684+
11685+
// While we usually refresh the AsyncReceiveOfferCache on a timer, we also want to start
11686+
// interactively building offers as soon as we can after startup. We can't start building offers
11687+
// until we have some peer connection(s) to send onion messages over, so as a minor optimization
11688+
// refresh the cache when a peer connects.
11689+
#[cfg(async_payments)]
11690+
self.check_refresh_async_receive_offers(false);
1166711691
res
1166811692
}
1166911693

lightning/src/offers/async_receive_offer_cache.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,131 @@ impl AsyncReceiveOfferCache {
171171
#[cfg(async_payments)]
172172
const MAX_CACHED_OFFERS_TARGET: usize = 10;
173173

174+
// The max number of times we'll attempt to request offer paths per timer tick.
175+
#[cfg(async_payments)]
176+
const MAX_UPDATE_ATTEMPTS: u8 = 3;
177+
178+
// If we have an offer that is replaceable and its invoice was confirmed as persisted more than 2
179+
// hours ago, we can go ahead and refresh it because we always want to have the freshest offer
180+
// possible when a user goes to retrieve a cached offer.
181+
//
182+
// We avoid replacing unused offers too quickly -- this prevents the case where we send multiple
183+
// invoices from different offers competing for the same slot to the server, messages are received
184+
// delayed or out-of-order, and we end up providing an offer to the user that the server just
185+
// deleted and replaced.
186+
#[cfg(async_payments)]
187+
const OFFER_REFRESH_THRESHOLD: Duration = Duration::from_secs(2 * 60 * 60);
188+
189+
#[cfg(async_payments)]
190+
impl AsyncReceiveOfferCache {
191+
/// Remove expired offers from the cache, returning whether new offers are needed.
192+
pub(super) fn prune_expired_offers(
193+
&mut self, duration_since_epoch: Duration, timer_tick_occurred: bool,
194+
) -> bool {
195+
// Remove expired offers from the cache.
196+
let mut offer_was_removed = false;
197+
for offer_opt in self.offers.iter_mut() {
198+
let offer_is_expired = offer_opt
199+
.as_ref()
200+
.map_or(false, |offer| offer.offer.is_expired_no_std(duration_since_epoch));
201+
if offer_is_expired {
202+
offer_opt.take();
203+
offer_was_removed = true;
204+
}
205+
}
206+
207+
// Allow more offer paths requests to be sent out in a burst roughly once per minute, or if an
208+
// offer was removed.
209+
if timer_tick_occurred || offer_was_removed {
210+
self.reset_offer_paths_request_attempts()
211+
}
212+
213+
self.needs_new_offer_idx(duration_since_epoch).is_some()
214+
&& self.offer_paths_request_attempts < MAX_UPDATE_ATTEMPTS
215+
}
216+
217+
/// If we have any empty slots in the cache or offers where the offer can and should be replaced
218+
/// with a fresh offer, here we return the index of the slot that needs a new offer. The index is
219+
/// used for setting [`ServeStaticInvoice::invoice_slot`] when sending the corresponding new
220+
/// static invoice to the server, so the server knows which existing persisted invoice is being
221+
/// replaced, if any.
222+
///
223+
/// Returns `None` if the cache is full and no offers can currently be replaced.
224+
///
225+
/// [`ServeStaticInvoice::invoice_slot`]: crate::onion_message::async_payments::ServeStaticInvoice::invoice_slot
226+
fn needs_new_offer_idx(&self, duration_since_epoch: Duration) -> Option<usize> {
227+
// If we have any empty offer slots, return the first one we find
228+
let mut offers_opt_iter = self.offers.iter().enumerate();
229+
let empty_slot_idx_opt =
230+
offers_opt_iter.find_map(|(idx, offer_opt)| offer_opt.is_none().then(|| idx));
231+
if empty_slot_idx_opt.is_some() {
232+
return empty_slot_idx_opt;
233+
}
234+
235+
// If all of our offers are already used or pending, then none are available to be replaced
236+
let no_replaceable_offers = self.offers_with_idx().all(|(_, offer)| {
237+
matches!(offer.status, OfferStatus::Used)
238+
|| matches!(offer.status, OfferStatus::Pending)
239+
});
240+
if no_replaceable_offers {
241+
return None;
242+
}
243+
244+
// If we only have 1 offer that is available for payments, then none are available to be
245+
// replaced
246+
let num_payable_offers = self
247+
.offers_with_idx()
248+
.filter(|(_, offer)| {
249+
matches!(offer.status, OfferStatus::Used)
250+
|| matches!(offer.status, OfferStatus::Ready { .. })
251+
})
252+
.count();
253+
if num_payable_offers <= 1 {
254+
return None;
255+
}
256+
257+
// Filter for unused offers that were last updated more than two hours ago, so they are stale
258+
// enough to warrant replacement.
259+
let two_hours_ago = duration_since_epoch.saturating_sub(OFFER_REFRESH_THRESHOLD);
260+
self.offers_with_idx()
261+
.filter_map(|(idx, offer)| match offer.status {
262+
OfferStatus::Ready { invoice_confirmed_persisted_at } => {
263+
Some((idx, offer, invoice_confirmed_persisted_at))
264+
},
265+
_ => None,
266+
})
267+
.filter(|(_, _, invoice_confirmed_persisted_at)| {
268+
*invoice_confirmed_persisted_at < two_hours_ago
269+
})
270+
// Get the stalest offer and return its index
271+
.min_by(|a, b| a.2.cmp(&b.2))
272+
.map(|(idx, _, _)| idx)
273+
}
274+
275+
/// Returns an iterator over (offer_idx, offer)
276+
fn offers_with_idx(&self) -> impl Iterator<Item = (usize, &AsyncReceiveOffer)> {
277+
self.offers.iter().enumerate().filter_map(|(idx, offer_opt)| {
278+
if let Some(offer) = offer_opt {
279+
Some((idx, offer))
280+
} else {
281+
None
282+
}
283+
})
284+
}
285+
286+
// Indicates that onion messages requesting new offer paths have been sent to the static invoice
287+
// server. Calling this method allows the cache to self-limit how many requests are sent.
288+
pub(super) fn new_offers_requested(&mut self) {
289+
self.offer_paths_request_attempts += 1;
290+
}
291+
292+
/// Called on timer tick (roughly once per minute) to allow another MAX_UPDATE_ATTEMPTS offer
293+
/// paths requests to go out.
294+
fn reset_offer_paths_request_attempts(&mut self) {
295+
self.offer_paths_request_attempts = 0;
296+
}
297+
}
298+
174299
impl Writeable for AsyncReceiveOfferCache {
175300
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
176301
write_tlv_fields!(w, {

lightning/src/offers/flow.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ use {
6666
crate::offers::offer::Amount,
6767
crate::offers::signer,
6868
crate::offers::static_invoice::{StaticInvoice, StaticInvoiceBuilder},
69-
crate::onion_message::async_payments::HeldHtlcAvailable,
69+
crate::onion_message::async_payments::{HeldHtlcAvailable, OfferPathsRequest},
7070
};
7171

7272
#[cfg(feature = "dnssec")]
@@ -235,6 +235,11 @@ where
235235
/// even if multiple invoices are received.
236236
const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
237237

238+
/// The default relative expiry for reply paths where a quick response is expected and the reply
239+
/// path is single-use.
240+
#[cfg(async_payments)]
241+
const TEMP_REPLY_PATH_RELATIVE_EXPIRY: Duration = Duration::from_secs(7200);
242+
238243
impl<MR: Deref> OffersMessageFlow<MR>
239244
where
240245
MR::Target: MessageRouter,
@@ -1123,6 +1128,67 @@ where
11231128
core::mem::take(&mut self.pending_dns_onion_messages.lock().unwrap())
11241129
}
11251130

1131+
/// Sends out [`OfferPathsRequest`] onion messages if we are an often-offline recipient and are
1132+
/// configured to interactively build offers and static invoices with a static invoice server.
1133+
///
1134+
/// # Usage
1135+
///
1136+
/// This method should be called on peer connection and once per minute or so, to keep the offers
1137+
/// cache updated.
1138+
///
1139+
/// SHOULD NOT be called much more than once per minute or too many messages may be sent to the
1140+
/// server node.
1141+
///
1142+
/// Errors if we failed to create blinded reply paths when sending an [`OfferPathsRequest`] message.
1143+
#[cfg(async_payments)]
1144+
pub(crate) fn check_refresh_async_receive_offers(
1145+
&self, peers: Vec<MessageForwardNode>, timer_tick_occurred: bool,
1146+
) -> Result<(), ()> {
1147+
// Terminate early if this node does not intend to receive async payments.
1148+
if self.paths_to_static_invoice_server.lock().unwrap().is_empty() {
1149+
return Ok(());
1150+
}
1151+
1152+
let duration_since_epoch = self.duration_since_epoch();
1153+
1154+
// Update the cache to remove expired offers, and check to see whether we need new offers to be
1155+
// interactively built with the static invoice server.
1156+
let needs_new_offers = self
1157+
.async_receive_offer_cache
1158+
.lock()
1159+
.unwrap()
1160+
.prune_expired_offers(duration_since_epoch, timer_tick_occurred);
1161+
1162+
// If we need new offers, send out offer paths request messages to the static invoice server.
1163+
if needs_new_offers {
1164+
let context = MessageContext::AsyncPayments(AsyncPaymentsContext::OfferPaths {
1165+
path_absolute_expiry: duration_since_epoch
1166+
.saturating_add(TEMP_REPLY_PATH_RELATIVE_EXPIRY),
1167+
});
1168+
let reply_paths = match self.create_blinded_paths(peers, context) {
1169+
Ok(paths) => paths,
1170+
Err(()) => {
1171+
return Err(());
1172+
},
1173+
};
1174+
1175+
// We can't fail past this point, so indicate to the cache that we've requested new offers.
1176+
self.async_receive_offer_cache.lock().unwrap().new_offers_requested();
1177+
1178+
let mut pending_async_payments_messages =
1179+
self.pending_async_payments_messages.lock().unwrap();
1180+
let message = AsyncPaymentsMessage::OfferPathsRequest(OfferPathsRequest {});
1181+
enqueue_onion_message_with_reply_paths(
1182+
message,
1183+
&self.paths_to_static_invoice_server.lock().unwrap()[..],
1184+
reply_paths,
1185+
&mut pending_async_payments_messages,
1186+
);
1187+
}
1188+
1189+
Ok(())
1190+
}
1191+
11261192
/// Get the `AsyncReceiveOfferCache` for persistence.
11271193
pub(crate) fn writeable_async_receive_offer_cache(&self) -> impl Writeable + '_ {
11281194
&self.async_receive_offer_cache

0 commit comments

Comments
 (0)