Skip to content

Commit 789faaf

Browse files
committed
fixes wip
1 parent 88787dc commit 789faaf

File tree

4 files changed

+152
-37
lines changed

4 files changed

+152
-37
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 100 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ use crate::ln::types::ChannelId;
4343
use crate::sign::ecdsa::EcdsaChannelSigner;
4444
use crate::sign::{EntropySource, PeerStorageKey};
4545
use crate::sync::Arc;
46-
use crate::util::async_poll::{poll_or_spawn, AsyncResult, AsyncVoid, FutureSpawner};
46+
use crate::util::async_poll::{
47+
poll_or_spawn, AsyncResult, AsyncVoid, FutureSpawner, FutureSpawnerSync,
48+
};
4749
use crate::util::errors::APIError;
4850
use crate::util::logger::{Logger, WithContext};
4951
use crate::util::persist::MonitorName;
@@ -296,8 +298,7 @@ pub struct ChainMonitorSync<
296298
L: Deref,
297299
P: Deref,
298300
ES: Deref,
299-
FS: FutureSpawner,
300-
>(ChainMonitor<ChannelSigner, C, T, F, L, PersistSyncWrapper<P>, ES, FS>)
301+
>(ChainMonitor<ChannelSigner, C, T, F, L, PersistSyncWrapper<P>, ES, FutureSpawnerSync>)
301302
where
302303
C::Target: chain::Filter,
303304
T::Target: BroadcasterInterface,
@@ -314,8 +315,7 @@ impl<
314315
L: Deref,
315316
P: Deref,
316317
ES: Deref,
317-
FS: FutureSpawner,
318-
> ChainMonitorSync<ChannelSigner, C, T, F, L, P, ES, FS>
318+
> ChainMonitorSync<ChannelSigner, C, T, F, L, P, ES>
319319
where
320320
C::Target: chain::Filter,
321321
T::Target: BroadcasterInterface,
@@ -324,11 +324,12 @@ where
324324
P::Target: PersistSync<ChannelSigner>,
325325
ES::Target: EntropySource,
326326
{
327-
fn new(
327+
pub fn new(
328328
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P,
329-
entropy_source: ES, our_peerstorage_encryption_key: PeerStorageKey, future_spawner: FS,
329+
entropy_source: ES, our_peerstorage_encryption_key: PeerStorageKey,
330330
) -> Self {
331331
let persister = PersistSyncWrapper(persister);
332+
let future_spawner = FutureSpawnerSync {};
332333

333334
Self(ChainMonitor::new(
334335
chain_source,
@@ -351,6 +352,11 @@ where
351352
pub fn get_update_future(&self) -> Future {
352353
self.0.get_update_future()
353354
}
355+
356+
/// See [`ChainMonitor::list_pending_monitor_updates`].
357+
pub fn list_pending_monitor_updates(&self) -> HashMap<ChannelId, Vec<u64>> {
358+
self.0.list_pending_monitor_updates()
359+
}
354360
}
355361

356362
impl<
@@ -361,8 +367,47 @@ impl<
361367
L: Deref,
362368
P: Deref,
363369
ES: Deref,
364-
FS: FutureSpawner,
365-
> events::EventsProvider for ChainMonitorSync<ChannelSigner, C, T, F, L, P, ES, FS>
370+
> BaseMessageHandler for ChainMonitorSync<ChannelSigner, C, T, F, L, P, ES>
371+
where
372+
C::Target: chain::Filter,
373+
T::Target: BroadcasterInterface,
374+
F::Target: FeeEstimator,
375+
L::Target: Logger,
376+
P::Target: PersistSync<ChannelSigner>,
377+
ES::Target: EntropySource,
378+
{
379+
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
380+
self.0.get_and_clear_pending_msg_events()
381+
}
382+
383+
fn peer_disconnected(&self, their_node_id: PublicKey) {
384+
self.0.peer_disconnected(their_node_id);
385+
}
386+
387+
fn provided_node_features(&self) -> NodeFeatures {
388+
self.0.provided_node_features()
389+
}
390+
391+
fn provided_init_features(&self, their_node_id: PublicKey) -> InitFeatures {
392+
self.0.provided_init_features(their_node_id)
393+
}
394+
395+
fn peer_connected(
396+
&self, their_node_id: PublicKey, msg: &Init, inbound: bool,
397+
) -> Result<(), ()> {
398+
self.0.peer_connected(their_node_id, msg, inbound)
399+
}
400+
}
401+
402+
impl<
403+
ChannelSigner: EcdsaChannelSigner + 'static,
404+
C: Deref,
405+
T: Deref,
406+
F: Deref,
407+
L: Deref,
408+
P: Deref,
409+
ES: Deref,
410+
> events::EventsProvider for ChainMonitorSync<ChannelSigner, C, T, F, L, P, ES>
366411
where
367412
C::Target: chain::Filter,
368413
T::Target: BroadcasterInterface,
@@ -379,6 +424,40 @@ where
379424
}
380425
}
381426

427+
impl<
428+
ChannelSigner: EcdsaChannelSigner + 'static,
429+
C: Deref,
430+
T: Deref,
431+
F: Deref,
432+
L: Deref,
433+
P: Deref,
434+
ES: Deref,
435+
> chain::Confirm for ChainMonitorSync<ChannelSigner, C, T, F, L, P, ES>
436+
where
437+
C::Target: chain::Filter,
438+
T::Target: BroadcasterInterface,
439+
F::Target: FeeEstimator,
440+
L::Target: Logger,
441+
P::Target: PersistSync<ChannelSigner>,
442+
ES::Target: EntropySource,
443+
{
444+
fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
445+
self.0.transactions_confirmed(header, txdata, height);
446+
}
447+
448+
fn transaction_unconfirmed(&self, txid: &Txid) {
449+
self.0.transaction_unconfirmed(txid);
450+
}
451+
452+
fn best_block_updated(&self, header: &Header, height: u32) {
453+
self.0.best_block_updated(header, height);
454+
}
455+
456+
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
457+
self.0.get_relevant_txids()
458+
}
459+
}
460+
382461
impl<
383462
ChannelSigner: EcdsaChannelSigner + 'static,
384463
C: Deref,
@@ -1581,11 +1660,12 @@ mod tests {
15811660
.unwrap()
15821661
.1
15831662
.contains(&next_update));
1584-
nodes[1]
1585-
.chain_monitor
1586-
.chain_monitor
1587-
.channel_monitor_updated(channel_id, next_update.clone())
1588-
.unwrap();
1663+
// TODO: RE-ENABLE
1664+
// nodes[1]
1665+
// .chain_monitor
1666+
// .chain_monitor
1667+
// .channel_monitor_updated(channel_id, next_update.clone())
1668+
// .unwrap();
15891669
// Should not contain the previously pending next_update when pending updates listed.
15901670
#[cfg(not(c_bindings))]
15911671
assert!(!nodes[1]
@@ -1608,11 +1688,12 @@ mod tests {
16081688
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
16091689
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
16101690
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
1611-
nodes[1]
1612-
.chain_monitor
1613-
.chain_monitor
1614-
.channel_monitor_updated(channel_id, update_iter.next().unwrap().clone())
1615-
.unwrap();
1691+
// TODO: RE-ENABLE
1692+
// nodes[1]
1693+
// .chain_monitor
1694+
// .chain_monitor
1695+
// .channel_monitor_updated(channel_id, update_iter.next().unwrap().clone())
1696+
// .unwrap();
16161697

16171698
let claim_events = nodes[1].node.get_and_clear_pending_events();
16181699
assert_eq!(claim_events.len(), 2);

lightning/src/util/async_poll.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::prelude::*;
1313
use core::future::Future;
1414
use core::marker::Unpin;
1515
use core::pin::Pin;
16-
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
16+
use core::task::{self, Context, Poll, RawWaker, RawWakerVTable, Waker};
1717

1818
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> {
1919
Pending(F),
@@ -131,6 +131,16 @@ pub trait FutureSpawner: Send + Sync + 'static {
131131
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T);
132132
}
133133

134+
pub struct FutureSpawnerSync {}
135+
136+
impl FutureSpawner for FutureSpawnerSync {
137+
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, fut: T) {
138+
unreachable!(
139+
"FutureSpawnerSync should not be used directly, use a concrete implementation instead"
140+
);
141+
}
142+
}
143+
134144
/// Polls a future and either returns true if it is ready or spawns it on the tokio runtime if it is not.
135145
pub fn poll_or_spawn<F, C, S>(
136146
mut fut: Pin<Box<F>>, callback: C, future_spawner: &S,

lightning/src/util/persist.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
1313
use bitcoin::hashes::hex::FromHex;
1414
use bitcoin::{BlockHash, Txid};
15-
use core::cmp;
15+
use core::future::Future;
1616
use core::ops::Deref;
1717
use core::str::FromStr;
18+
use core::{cmp, task};
1819

1920
use crate::prelude::*;
21+
use crate::util::async_poll::dummy_waker;
2022
use crate::{io, log_error};
2123

2224
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
@@ -751,6 +753,23 @@ where
751753
);
752754
Self(persister)
753755
}
756+
757+
pub fn read_all_channel_monitors_with_updates(
758+
&self,
759+
) -> Result<
760+
Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
761+
io::Error,
762+
> {
763+
let mut fut = Box::pin(self.0.state.read_all_channel_monitors_with_updates());
764+
let mut waker = dummy_waker();
765+
let mut ctx = task::Context::from_waker(&mut waker);
766+
match fut.as_mut().poll(&mut ctx) {
767+
task::Poll::Ready(result) => result,
768+
task::Poll::Pending => {
769+
unreachable!("Can't poll a future in a sync context, this should never happen");
770+
},
771+
}
772+
}
754773
}
755774

756775
struct MonitorUpdatingPersisterState<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
@@ -1548,7 +1567,7 @@ mod tests {
15481567
let chanmon_cfgs = create_chanmon_cfgs(4);
15491568
let kv_store = &TestStore::new(false);
15501569
let logger = &TestLogger::new();
1551-
let persister_0 = MonitorUpdatingPersister::new(
1570+
let persister_0 = MonitorUpdatingPersisterSync::new(
15521571
kv_store,
15531572
logger,
15541573
persister_0_max_pending_updates,
@@ -1559,7 +1578,7 @@ mod tests {
15591578
);
15601579
let kv_store = &TestStore::new(false);
15611580
let logger = &TestLogger::new();
1562-
let persister_1 = MonitorUpdatingPersister::new(
1581+
let persister_1 = MonitorUpdatingPersisterSync::new(
15631582
kv_store,
15641583
logger,
15651584
persister_1_max_pending_updates,
@@ -1726,7 +1745,7 @@ mod tests {
17261745

17271746
let kv_store = &TestStore::new(true);
17281747
let logger = &TestLogger::new();
1729-
let ro_persister = MonitorUpdatingPersister::new(
1748+
let ro_persister = MonitorUpdatingPersisterSync::new(
17301749
kv_store,
17311750
logger,
17321751
11,
@@ -1774,7 +1793,7 @@ mod tests {
17741793
let chanmon_cfgs = create_chanmon_cfgs(3);
17751794
let kv_store = &TestStore::new(false);
17761795
let logger = &TestLogger::new();
1777-
let persister_0 = MonitorUpdatingPersister::new(
1796+
let persister_0 = MonitorUpdatingPersisterSync::new(
17781797
kv_store,
17791798
logger,
17801799
test_max_pending_updates,
@@ -1785,7 +1804,7 @@ mod tests {
17851804
);
17861805
let kv_store = &TestStore::new(false);
17871806
let logger = &TestLogger::new();
1788-
let persister_1 = MonitorUpdatingPersister::new(
1807+
let persister_1 = MonitorUpdatingPersisterSync::new(
17891808
kv_store,
17901809
logger,
17911810
test_max_pending_updates,

lightning/src/util/test_utils.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::chain::chaininterface;
1515
use crate::chain::chaininterface::ConfirmationTarget;
1616
#[cfg(any(test, feature = "_externalize_tests"))]
1717
use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW;
18-
use crate::chain::chainmonitor::{ChainMonitor, Persist};
18+
use crate::chain::chainmonitor::{ChainMonitor, ChainMonitorSync, Persist, PersistSync};
1919
use crate::chain::channelmonitor::{
2020
ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent,
2121
};
@@ -50,14 +50,15 @@ use crate::sign;
5050
use crate::sign::{ChannelSigner, PeerStorageKey};
5151
use crate::sync::RwLock;
5252
use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures};
53+
use crate::util::async_poll::FutureSpawnerSync;
5354
use crate::util::config::UserConfig;
5455
use crate::util::dyn_signer::{
5556
DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner,
5657
};
5758
use crate::util::logger::{Logger, Record};
5859
#[cfg(feature = "std")]
5960
use crate::util::mut_global::MutGlobal;
60-
use crate::util::persist::{KVStore, MonitorName};
61+
use crate::util::persist::{KVStore, KVStoreSync, MonitorName};
6162
use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer};
6263
use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner};
6364

@@ -381,7 +382,7 @@ impl SignerProvider for OnlyReadsKeysInterface {
381382
#[cfg(feature = "std")]
382383
pub trait SyncBroadcaster: chaininterface::BroadcasterInterface + Sync {}
383384
#[cfg(feature = "std")]
384-
pub trait SyncPersist: Persist<TestChannelSigner> + Sync {}
385+
pub trait SyncPersist: PersistSync<TestChannelSigner> + Sync {}
385386
#[cfg(feature = "std")]
386387
impl<T: chaininterface::BroadcasterInterface + Sync> SyncBroadcaster for T {}
387388
#[cfg(feature = "std")]
@@ -400,7 +401,7 @@ pub struct TestChainMonitor<'a> {
400401
pub added_monitors: Mutex<Vec<(ChannelId, ChannelMonitor<TestChannelSigner>)>>,
401402
pub monitor_updates: Mutex<HashMap<ChannelId, Vec<ChannelMonitorUpdate>>>,
402403
pub latest_monitor_update_id: Mutex<HashMap<ChannelId, (u64, u64)>>,
403-
pub chain_monitor: ChainMonitor<
404+
pub chain_monitor: ChainMonitorSync<
404405
TestChannelSigner,
405406
&'a TestChainSource,
406407
&'a dyn SyncBroadcaster,
@@ -430,7 +431,7 @@ impl<'a> TestChainMonitor<'a> {
430431
added_monitors: Mutex::new(Vec::new()),
431432
monitor_updates: Mutex::new(new_hash_map()),
432433
latest_monitor_update_id: Mutex::new(new_hash_map()),
433-
chain_monitor: ChainMonitor::new(
434+
chain_monitor: ChainMonitorSync::new(
434435
chain_source,
435436
broadcaster,
436437
logger,
@@ -720,20 +721,24 @@ impl TestPersister {
720721
self.update_rets.lock().unwrap().push_back(next_ret);
721722
}
722723
}
723-
impl<Signer: sign::ecdsa::EcdsaChannelSigner> Persist<Signer> for TestPersister {
724+
impl<Signer: sign::ecdsa::EcdsaChannelSigner> PersistSync<Signer> for TestPersister {
724725
fn persist_new_channel(
725726
&self, _monitor_name: MonitorName, _data: &ChannelMonitor<Signer>,
726-
) -> chain::ChannelMonitorUpdateStatus {
727+
) -> Result<(), ()> {
727728
if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() {
728-
return update_ret;
729+
return match update_ret {
730+
chain::ChannelMonitorUpdateStatus::Completed => Ok(()),
731+
chain::ChannelMonitorUpdateStatus::InProgress => Err(()),
732+
chain::ChannelMonitorUpdateStatus::UnrecoverableError => Err(()),
733+
};
729734
}
730-
chain::ChannelMonitorUpdateStatus::Completed
735+
Ok(())
731736
}
732737

733738
fn update_persisted_channel(
734739
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
735740
_data: &ChannelMonitor<Signer>,
736-
) -> chain::ChannelMonitorUpdateStatus {
741+
) -> Result<(), ()> {
737742
let mut ret = chain::ChannelMonitorUpdateStatus::Completed;
738743
if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() {
739744
ret = update_ret;
@@ -771,7 +776,7 @@ impl TestStore {
771776
}
772777
}
773778

774-
impl KVStore for TestStore {
779+
impl KVStoreSync for TestStore {
775780
fn read(
776781
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
777782
) -> io::Result<Vec<u8>> {

0 commit comments

Comments
 (0)