Skip to content

Commit 0aebb10

Browse files
joostjagerclaude
andcommitted
Add deferred monitor write support to chanmon_consistency fuzz target
Add a `deferred` flag to `TestChainMonitor` that controls whether the underlying `ChainMonitor` queues operations instead of executing them immediately. The flag is derived from the fuzz input alongside the existing monitor style bits, so each of the three nodes can independently run in deferred or immediate mode. In deferred mode, `watch_channel` and `update_channel` always return `InProgress`. A new `flush_and_update_latest_monitors` method drains the queued operations and, when the persister reports `Completed`, promotes the pending shadow monitor snapshots to persisted state. This method is called before `release_pending_monitor_events` and at each point where the fuzzer completes pending monitor updates. On node reload, deferred monitors are flushed immediately after `watch_channel` so the node starts with a consistent state. AI tools were used in preparing this commit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent dcf0c20 commit 0aebb10

File tree

1 file changed

+62
-3
lines changed

1 file changed

+62
-3
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,12 +266,13 @@ struct TestChainMonitor {
266266
Arc<KeyProvider>,
267267
>,
268268
>,
269+
pub deferred: bool,
269270
pub latest_monitors: Mutex<HashMap<ChannelId, LatestMonitorState>>,
270271
}
271272
impl TestChainMonitor {
272273
pub fn new(
273274
broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>,
274-
persister: Arc<TestPersister>, keys: Arc<KeyProvider>,
275+
persister: Arc<TestPersister>, keys: Arc<KeyProvider>, deferred: bool,
275276
) -> Self {
276277
Self {
277278
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
@@ -282,14 +283,44 @@ impl TestChainMonitor {
282283
Arc::clone(&persister),
283284
Arc::clone(&keys),
284285
keys.get_peer_storage_key(),
285-
false,
286+
deferred,
286287
)),
287288
logger,
288289
keys,
289290
persister,
291+
deferred,
290292
latest_monitors: Mutex::new(new_hash_map()),
291293
}
292294
}
295+
296+
/// Flushes all deferred monitor operations and, if the persister reports success, promotes
297+
/// pending monitor states to persisted in our shadow records. `TestChainMonitor` maintains
298+
/// its own `latest_monitors` map that tracks serialized monitor snapshots independently of
299+
/// `ChainMonitor`, so that the fuzzer can simulate node restarts by deserializing from these
300+
/// snapshots rather than relying on the persister's storage.
301+
///
302+
/// This simulates the pattern of snapshotting the pending count, persisting the
303+
/// `ChannelManager`, then flushing the queued monitor writes.
304+
fn flush_and_update_latest_monitors(&self) {
305+
let count = self.chain_monitor.pending_operation_count();
306+
if count == 0 {
307+
return;
308+
}
309+
// Execute all queued watch_channel/update_channel operations inside the ChainMonitor.
310+
self.chain_monitor.flush(count, &self.logger);
311+
let persister_res = *self.persister.update_ret.lock().unwrap();
312+
// Only update our local tracking state when the persister signals completion. When
313+
// persistence is still in-progress, the monitors stay in the pending set so that a
314+
// simulated restart can still reload from the last fully-persisted snapshot.
315+
if persister_res == chain::ChannelMonitorUpdateStatus::Completed {
316+
for (_channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() {
317+
if let Some((id, data)) = state.pending_monitors.drain(..).last() {
318+
state.persisted_monitor_id = id;
319+
state.persisted_monitor = data;
320+
}
321+
}
322+
}
323+
}
293324
}
294325
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
295326
fn watch_channel(
@@ -299,6 +330,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
299330
monitor.write(&mut ser).unwrap();
300331
let monitor_id = monitor.get_latest_update_id();
301332
let res = self.chain_monitor.watch_channel(channel_id, monitor);
333+
if self.deferred {
334+
assert_eq!(res, Ok(chain::ChannelMonitorUpdateStatus::InProgress));
335+
}
302336
let state = match res {
303337
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
304338
persisted_monitor_id: monitor_id,
@@ -348,6 +382,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
348382
let mut ser = VecWriter(Vec::new());
349383
deserialized_monitor.write(&mut ser).unwrap();
350384
let res = self.chain_monitor.update_channel(channel_id, update);
385+
if self.deferred {
386+
assert_eq!(res, chain::ChannelMonitorUpdateStatus::InProgress);
387+
}
351388
match res {
352389
chain::ChannelMonitorUpdateStatus::Completed => {
353390
map_entry.persisted_monitor_id = update.update_id;
@@ -364,6 +401,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
364401
fn release_pending_monitor_events(
365402
&self,
366403
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
404+
if self.deferred {
405+
self.flush_and_update_latest_monitors();
406+
}
367407
return self.chain_monitor.release_pending_monitor_events();
368408
}
369409
}
@@ -893,6 +933,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
893933
ChannelMonitorUpdateStatus::Completed
894934
}),
895935
];
936+
let deferred = [
937+
initial_mon_styles & 0b001_000 != 0,
938+
initial_mon_styles & 0b010_000 != 0,
939+
initial_mon_styles & 0b100_000 != 0,
940+
];
896941

897942
let mut chain_state = ChainState::new();
898943
let mut node_height_a: u32 = 0;
@@ -921,6 +966,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
921966
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
922967
}),
923968
Arc::clone(&keys_manager),
969+
deferred[$node_id as usize],
924970
));
925971

926972
let mut config = UserConfig::default();
@@ -973,6 +1019,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
9731019
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
9741020
}),
9751021
Arc::clone(keys),
1022+
deferred[node_id as usize],
9761023
));
9771024

9781025
let mut config = UserConfig::default();
@@ -1039,18 +1086,28 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
10391086
let manager =
10401087
<(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager");
10411088
let res = (manager.1, chain_monitor.clone());
1089+
let expected_status = if deferred[node_id as usize] {
1090+
ChannelMonitorUpdateStatus::InProgress
1091+
} else {
1092+
ChannelMonitorUpdateStatus::Completed
1093+
};
10421094
for (channel_id, mon) in monitors.drain() {
10431095
assert_eq!(
10441096
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
1045-
Ok(ChannelMonitorUpdateStatus::Completed)
1097+
Ok(expected_status)
10461098
);
10471099
}
1100+
if deferred[node_id as usize] {
1101+
let count = chain_monitor.chain_monitor.pending_operation_count();
1102+
chain_monitor.chain_monitor.flush(count, &chain_monitor.logger);
1103+
}
10481104
*chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow();
10491105
res
10501106
};
10511107

10521108
macro_rules! complete_all_pending_monitor_updates {
10531109
($monitor: expr) => {{
1110+
$monitor.flush_and_update_latest_monitors();
10541111
for (channel_id, state) in $monitor.latest_monitors.lock().unwrap().iter_mut() {
10551112
for (id, data) in state.pending_monitors.drain(..) {
10561113
$monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap();
@@ -1978,6 +2035,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
19782035
|monitor: &Arc<TestChainMonitor>,
19792036
chan_funding,
19802037
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
2038+
monitor.flush_and_update_latest_monitors();
19812039
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
19822040
assert!(
19832041
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
@@ -1994,6 +2052,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
19942052
};
19952053

19962054
let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_id| {
2055+
monitor.flush_and_update_latest_monitors();
19972056
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) {
19982057
assert!(
19992058
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),

0 commit comments

Comments
 (0)