Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,13 @@ struct TestChainMonitor {
Arc<KeyProvider>,
>,
>,
pub deferred: bool,
pub latest_monitors: Mutex<HashMap<ChannelId, LatestMonitorState>>,
}
impl TestChainMonitor {
pub fn new(
broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>,
persister: Arc<TestPersister>, keys: Arc<KeyProvider>,
persister: Arc<TestPersister>, keys: Arc<KeyProvider>, deferred: bool,
) -> Self {
Self {
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
Expand All @@ -282,14 +283,44 @@ impl TestChainMonitor {
Arc::clone(&persister),
Arc::clone(&keys),
keys.get_peer_storage_key(),
false,
deferred,
)),
logger,
keys,
persister,
deferred,
latest_monitors: Mutex::new(new_hash_map()),
}
}

/// Flushes all deferred monitor operations and, if the persister reports success, promotes
/// pending monitor states to persisted in our shadow records. `TestChainMonitor` maintains
/// its own `latest_monitors` map that tracks serialized monitor snapshots independently of
/// `ChainMonitor`, so that the fuzzer can simulate node restarts by deserializing from these
/// snapshots rather than relying on the persister's storage.
///
/// This simulates the pattern of snapshotting the pending count, persisting the
/// `ChannelManager`, then flushing the queued monitor writes.
fn flush_and_update_latest_monitors(&self) {
let count = self.chain_monitor.pending_operation_count();
if count == 0 {
return;
}
// Execute all queued watch_channel/update_channel operations inside the ChainMonitor.
self.chain_monitor.flush(count, &self.logger);
let persister_res = *self.persister.update_ret.lock().unwrap();
// Only update our local tracking state when the persister signals completion. When
// persistence is still in-progress, the monitors stay in the pending set so that a
// simulated restart can still reload from the last fully-persisted snapshot.
if persister_res == chain::ChannelMonitorUpdateStatus::Completed {
for (_channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() {
if let Some((id, data)) = state.pending_monitors.drain(..).last() {
state.persisted_monitor_id = id;
state.persisted_monitor = data;
}
}
}
}
}
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
fn watch_channel(
Expand All @@ -299,6 +330,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
monitor.write(&mut ser).unwrap();
let monitor_id = monitor.get_latest_update_id();
let res = self.chain_monitor.watch_channel(channel_id, monitor);
if self.deferred {
assert_eq!(res, Ok(chain::ChannelMonitorUpdateStatus::InProgress));
}
let state = match res {
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
persisted_monitor_id: monitor_id,
Expand Down Expand Up @@ -348,6 +382,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
let res = self.chain_monitor.update_channel(channel_id, update);
if self.deferred {
assert_eq!(res, chain::ChannelMonitorUpdateStatus::InProgress);
}
match res {
chain::ChannelMonitorUpdateStatus::Completed => {
map_entry.persisted_monitor_id = update.update_id;
Expand All @@ -364,6 +401,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
fn release_pending_monitor_events(
&self,
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
if self.deferred {
self.flush_and_update_latest_monitors();
}
return self.chain_monitor.release_pending_monitor_events();
}
}
Expand Down Expand Up @@ -891,6 +931,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
ChannelMonitorUpdateStatus::Completed
}),
];
let deferred = [
initial_mon_styles & 0b001_000 != 0,
initial_mon_styles & 0b010_000 != 0,
initial_mon_styles & 0b100_000 != 0,
];

let mut chain_state = ChainState::new();
let mut node_height_a: u32 = 0;
Expand Down Expand Up @@ -919,6 +964,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
}),
Arc::clone(&keys_manager),
deferred[$node_id as usize],
));

let mut config = UserConfig::default();
Expand Down Expand Up @@ -971,6 +1017,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
}),
Arc::clone(keys),
deferred[node_id as usize],
));

let mut config = UserConfig::default();
Expand Down Expand Up @@ -1037,18 +1084,28 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
let manager =
<(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager");
let res = (manager.1, chain_monitor.clone());
let expected_status = if deferred[node_id as usize] {
ChannelMonitorUpdateStatus::InProgress
} else {
ChannelMonitorUpdateStatus::Completed
};
for (channel_id, mon) in monitors.drain() {
assert_eq!(
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
Ok(ChannelMonitorUpdateStatus::Completed)
Ok(expected_status)
);
}
if deferred[node_id as usize] {
let count = chain_monitor.chain_monitor.pending_operation_count();
chain_monitor.chain_monitor.flush(count, &chain_monitor.logger);
}
*chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow();
res
};

macro_rules! complete_all_pending_monitor_updates {
($monitor: expr) => {{
$monitor.flush_and_update_latest_monitors();
for (channel_id, state) in $monitor.latest_monitors.lock().unwrap().iter_mut() {
for (id, data) in state.pending_monitors.drain(..) {
$monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap();
Expand Down Expand Up @@ -2008,6 +2065,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
|monitor: &Arc<TestChainMonitor>,
chan_funding,
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
monitor.flush_and_update_latest_monitors();
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
assert!(
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
Expand All @@ -2024,6 +2082,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
};

let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_id| {
monitor.flush_and_update_latest_monitors();
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) {
assert!(
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
Expand Down
Loading