diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index bf8a0cf8342..f5127600c9f 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -1579,9 +1579,15 @@ impl< // future for the completion of the write. This ensures monitor persistence // ordering is preserved. let encoded = update.encode(); - res_a = Some(async move { - self.kv_store.write(primary, &monitor_key, update_name.as_str(), encoded).await - }); + let write_fut: Pin< + Box> + 'static>, + > = Box::pin(self.kv_store.write( + primary, + &monitor_key, + update_name.as_str(), + encoded, + )); + res_a = Some(write_fut); } else { // We could write this update, but it meets criteria of our design that calls for a full monitor write. // Note that this is NOT an async function, but rather calls the *sync* KVStore @@ -2107,6 +2113,58 @@ mod tests { do_persister_with_real_monitors(4, 2); } + #[test] + fn async_update_persist_queues_write_synchronously() { + // Build, but do not poll, an incremental monitor-update persistence future. The + // underlying KVStore::write call must queue the write synchronously so that queue order + // is set by call order rather than executor poll order. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let _ = create_announced_chan_between_nodes(&nodes, 0, 1); + + send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000); + send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000); + + let cmu_map = nodes[0].chain_monitor.monitor_updates.lock().unwrap(); + let (channel_id, updates) = + cmu_map.iter().next().expect("expected a channel with monitor updates"); + let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(*channel_id).unwrap(); + let monitor_name = monitor.persistence_key(); + let cmu = updates + .iter() + .find(|u| u.update_id != 0 && u.update_id != u64::MAX) + .expect("expected a normal monitor update"); + let update_id = cmu.update_id; + + let store = TestStore::new(false); + let persister = MonitorUpdatingPersisterAsync::new( + &store, + PanicingSpawner, + node_cfgs[0].logger, + update_id + 1, + node_cfgs[0].keys_manager, + node_cfgs[0].keys_manager, + node_cfgs[0].tx_broadcaster, + node_cfgs[0].fee_estimator, + ); + + let _future = + Arc::clone(&persister.0).update_persisted_channel(monitor_name, Some(cmu), &monitor); + + let update_name = UpdateName::from(update_id); + let pending = store.list_pending_async_writes( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + &monitor_name.to_string(), + update_name.as_str(), + ); + assert!( + !pending.is_empty(), + "monitor update write was not queued synchronously by update_persisted_channel" + ); + } + // Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a // monitor or update with it results in the persister returning an UnrecoverableError status. #[test]