From a3967b57580c3a33751adca9c91fcafe4898386c Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 10 Jun 2026 13:30:15 +0200 Subject: [PATCH] Queue async monitor update writes synchronously Monitor update persistence relies on KVStore write calls being queued in call order. The incremental async update path deferred that write until executor polling, allowing async scheduling to reorder persistence operations relative to later updates or cleanup. Add a regression test that builds an incremental update future without polling it and verifies that the write has already been queued. Co-Authored-By: HAL 9000 This finding was discovered by Project Loupe --- lightning/src/util/persist.rs | 64 +++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index bf8a0cf8342..f5127600c9f 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -1579,9 +1579,15 @@ impl< // future for the completion of the write. This ensures monitor persistence // ordering is preserved. let encoded = update.encode(); - res_a = Some(async move { - self.kv_store.write(primary, &monitor_key, update_name.as_str(), encoded).await - }); + let write_fut: Pin< + Box> + 'static>, + > = Box::pin(self.kv_store.write( + primary, + &monitor_key, + update_name.as_str(), + encoded, + )); + res_a = Some(write_fut); } else { // We could write this update, but it meets criteria of our design that calls for a full monitor write. // Note that this is NOT an async function, but rather calls the *sync* KVStore @@ -2107,6 +2113,58 @@ mod tests { do_persister_with_real_monitors(4, 2); } + #[test] + fn async_update_persist_queues_write_synchronously() { + // Build, but do not poll, an incremental monitor-update persistence future. The + // underlying KVStore::write call must queue the write synchronously so that queue order + // is set by call order rather than executor poll order. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let _ = create_announced_chan_between_nodes(&nodes, 0, 1); + + send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000); + send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000); + + let cmu_map = nodes[0].chain_monitor.monitor_updates.lock().unwrap(); + let (channel_id, updates) = + cmu_map.iter().next().expect("expected a channel with monitor updates"); + let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(*channel_id).unwrap(); + let monitor_name = monitor.persistence_key(); + let cmu = updates + .iter() + .find(|u| u.update_id != 0 && u.update_id != u64::MAX) + .expect("expected a normal monitor update"); + let update_id = cmu.update_id; + + let store = TestStore::new(false); + let persister = MonitorUpdatingPersisterAsync::new( + &store, + PanicingSpawner, + node_cfgs[0].logger, + update_id + 1, + node_cfgs[0].keys_manager, + node_cfgs[0].keys_manager, + node_cfgs[0].tx_broadcaster, + node_cfgs[0].fee_estimator, + ); + + let _future = + Arc::clone(&persister.0).update_persisted_channel(monitor_name, Some(cmu), &monitor); + + let update_name = UpdateName::from(update_id); + let pending = store.list_pending_async_writes( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + &monitor_name.to_string(), + update_name.as_str(), + ); + assert!( + !pending.is_empty(), + "monitor update write was not queued synchronously by update_persisted_channel" + ); + } + // Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a // monitor or update with it results in the persister returning an UnrecoverableError status. #[test]