Skip to content

Commit 8fb4a3d

Browse files
authored
Merge pull request #649 from jkczyz/2020-06-refactor-chain-listener
Refactor chain monitoring
2 parents 8640173 + 6cd6816 commit 8fb4a3d

26 files changed

+1242
-1425
lines changed

ARCH.md

+23-23
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ need to use are `ChannelManager` and `ChannelMonitor`. `ChannelManager` holds mu
66
channels, routes payments between them, and exposes a simple API to make and receive
77
payments. Individual `ChannelMonitor`s monitor the on-chain state of a channel, punish
88
counterparties if they misbehave, and force-close channels if they contain unresolved
9-
HTLCs which are near expiration. The `ManyChannelMonitor` API provides a way for you to
9+
HTLCs which are near expiration. The `chain::Watch` interface provides a way for you to
1010
receive `ChannelMonitorUpdate`s from `ChannelManager` and persist them to disk before the
1111
channel steps forward.
1212

@@ -37,26 +37,26 @@ At a high level, some of the common interfaces fit together as follows:
3737
-----------------
3838
| KeysInterface | --------------
3939
----------------- | UserConfig |
40-
-------------------- | --------------
41-
/------| MessageSendEvent | | | ----------------
42-
| -------------------- | | | FeeEstimator |
43-
| (as MessageSendEventsProvider) | | ----------------
44-
| ^ | | / | ------------------------
45-
| \ | | / ---------> | BroadcasterInterface |
46-
| \ | | / / | ------------------------
47-
| \ v v v / v ^
48-
| (as ------------------ ----------------------
49-
| ChannelMessageHandler)-> | ChannelManager | ----> | ManyChannelMonitor |
50-
v / ------------------ ----------------------
51-
--------------- / ^ (as EventsProvider) ^
52-
| PeerManager |- | \ / /
53-
--------------- | -------\---/----------
54-
| ----------------------- / \ /
55-
| | ChainWatchInterface | - v
56-
| ----------------------- ---------
57-
| | | Event |
58-
(as RoutingMessageHandler) v ---------
59-
\ --------------------
60-
-----------------> | NetGraphMsgHandler |
61-
--------------------
40+
-------------------- ^ --------------
41+
------| MessageSendEvent | | ^ ----------------
42+
/ -------------------- | | | FeeEstimator | <-----------------------
43+
| (as MessageSendEventsProvider) | | ---------------- \
44+
| ^ | | ^ ------------------------ |
45+
| \ | | / ---------> | BroadcasterInterface | |
46+
| \ | | / / ------------------------ |
47+
| \ | | / / ^ |
48+
| (as ------------------ ---------------- | |
49+
| ChannelMessageHandler)-> | ChannelManager | ----> | chain::Watch | | |
50+
v / ------------------ ---------------- | |
51+
--------------- / (as EventsProvider) ^ | |
52+
| PeerManager |- \ | | |
53+
--------------- \ | (is-a) | |
54+
| ----------------- \ _---------------- / /
55+
| | chain::Access | \ / | ChainMonitor |---------------
56+
| ----------------- \ / ----------------
57+
| ^ \ / |
58+
(as RoutingMessageHandler) | v v
59+
\ ---------------------- --------- -----------------
60+
-----------------> | NetGraphMsgHandler | | Event | | chain::Filter |
61+
---------------------- --------- -----------------
6262
```

fuzz/src/chanmon_consistency.rs

+25-31
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ use bitcoin::hashes::Hash as TraitImport;
2828
use bitcoin::hashes::sha256::Hash as Sha256;
2929
use bitcoin::hash_types::{BlockHash, WPubkeyHash};
3030

31-
use lightning::chain::chaininterface;
31+
use lightning::chain;
32+
use lightning::chain::chainmonitor;
33+
use lightning::chain::channelmonitor;
34+
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, MonitorEvent};
3235
use lightning::chain::transaction::OutPoint;
33-
use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,ChainListener,FeeEstimator,ChainWatchInterfaceUtil,ChainWatchInterface};
36+
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
3437
use lightning::chain::keysinterface::{KeysInterface, InMemoryChannelKeys};
35-
use lightning::ln::channelmonitor;
36-
use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, MonitorEvent};
3738
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, ChannelManagerReadArgs};
3839
use lightning::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
3940
use lightning::ln::msgs::{CommitmentUpdate, ChannelMessageHandler, ErrorAction, UpdateAddHTLC, Init};
@@ -81,9 +82,9 @@ impl Writer for VecWriter {
8182
}
8283
}
8384

84-
struct TestChannelMonitor {
85+
struct TestChainMonitor {
8586
pub logger: Arc<dyn Logger>,
86-
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<dyn ChainWatchInterface>>>,
87+
pub chain_monitor: Arc<chainmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
8788
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
8889
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
8990
// logic will automatically force-close our channels for us (as we don't have an up-to-date
@@ -93,32 +94,32 @@ struct TestChannelMonitor {
9394
pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
9495
pub should_update_manager: atomic::AtomicBool,
9596
}
96-
impl TestChannelMonitor {
97-
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>) -> Self {
97+
impl TestChainMonitor {
98+
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>) -> Self {
9899
Self {
99-
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest)),
100+
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest)),
100101
logger,
101102
update_ret: Mutex::new(Ok(())),
102103
latest_monitors: Mutex::new(HashMap::new()),
103104
should_update_manager: atomic::AtomicBool::new(false),
104105
}
105106
}
106107
}
107-
impl channelmonitor::ManyChannelMonitor for TestChannelMonitor {
108+
impl chain::Watch for TestChainMonitor {
108109
type Keys = EnforcingChannelKeys;
109110

110-
fn add_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
111+
fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
111112
let mut ser = VecWriter(Vec::new());
112113
monitor.write_for_disk(&mut ser).unwrap();
113114
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
114-
panic!("Already had monitor pre-add_monitor");
115+
panic!("Already had monitor pre-watch_channel");
115116
}
116117
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
117-
assert!(self.simple_monitor.add_monitor(funding_txo, monitor).is_ok());
118+
assert!(self.chain_monitor.watch_channel(funding_txo, monitor).is_ok());
118119
self.update_ret.lock().unwrap().clone()
119120
}
120121

121-
fn update_monitor(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
122+
fn update_channel(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
122123
let mut map_lock = self.latest_monitors.lock().unwrap();
123124
let mut map_entry = match map_lock.entry(funding_txo) {
124125
hash_map::Entry::Occupied(entry) => entry,
@@ -134,8 +135,8 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMonitor {
134135
self.update_ret.lock().unwrap().clone()
135136
}
136137

137-
fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent> {
138-
return self.simple_monitor.get_and_clear_pending_monitor_events();
138+
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
139+
return self.chain_monitor.release_pending_monitor_events();
139140
}
140141
}
141142

@@ -191,8 +192,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
191192
macro_rules! make_node {
192193
($node_id: expr) => { {
193194
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
194-
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin));
195-
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone()));
195+
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), fee_est.clone()));
196196

197197
let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) });
198198
let mut config = UserConfig::default();
@@ -207,8 +207,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
207207
macro_rules! reload_node {
208208
($ser: expr, $node_id: expr, $old_monitors: expr) => { {
209209
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
210-
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin));
211-
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone()));
210+
let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), fee_est.clone()));
212211

213212
let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) });
214213
let mut config = UserConfig::default();
@@ -220,7 +219,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
220219
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
221220
for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
222221
monitors.insert(outpoint, <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut Cursor::new(&monitor_ser)).expect("Failed to read monitor").1);
223-
monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
222+
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
224223
}
225224
let mut monitor_refs = HashMap::new();
226225
for (outpoint, monitor) in monitors.iter_mut() {
@@ -230,14 +229,14 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
230229
let read_args = ChannelManagerReadArgs {
231230
keys_manager,
232231
fee_estimator: fee_est.clone(),
233-
monitor: monitor.clone(),
232+
chain_monitor: chain_monitor.clone(),
234233
tx_broadcaster: broadcast.clone(),
235234
logger,
236235
default_config: config,
237236
channel_monitors: monitor_refs,
238237
};
239238

240-
(<(BlockHash, ChannelManager<EnforcingChannelKeys, Arc<TestChannelMonitor>, Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor)
239+
(<(BlockHash, ChannelManager<EnforcingChannelKeys, Arc<TestChainMonitor>, Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, chain_monitor)
241240
} }
242241
}
243242

@@ -308,16 +307,11 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
308307
macro_rules! confirm_txn {
309308
($node: expr) => { {
310309
let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
311-
let mut txn = Vec::with_capacity(channel_txn.len());
312-
let mut posn = Vec::with_capacity(channel_txn.len());
313-
for i in 0..channel_txn.len() {
314-
txn.push(&channel_txn[i]);
315-
posn.push(i + 1);
316-
}
317-
$node.block_connected(&header, 1, &txn, &posn);
310+
let txdata: Vec<_> = channel_txn.iter().enumerate().map(|(i, tx)| (i + 1, tx)).collect();
311+
$node.block_connected(&header, &txdata, 1);
318312
for i in 2..100 {
319313
header = BlockHeader { version: 0x20000000, prev_blockhash: header.block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
320-
$node.block_connected(&header, i, &Vec::new(), &[0; 0]);
314+
$node.block_connected(&header, &[], i);
321315
}
322316
} }
323317
}

fuzz/src/chanmon_deser.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
use bitcoin::hash_types::BlockHash;
55

6+
use lightning::chain::channelmonitor;
67
use lightning::util::enforcing_trait_impls::EnforcingChannelKeys;
7-
use lightning::ln::channelmonitor;
88
use lightning::util::ser::{Readable, Writer};
99

1010
use utils::test_logger;

0 commit comments

Comments
 (0)