Skip to content

Commit 6ed79d9

Browse files
joostjagerclaude
andcommitted
Defer MonitorUpdatingPersister writes to flush()
Update MonitorUpdatingPersister and MonitorUpdatingPersisterAsync to queue persist operations in memory instead of writing immediately to disk. The Persist trait methods now return ChannelMonitorUpdateStatus:: InProgress and the actual writes happen when flush() is called. This fixes a race condition that could cause channel force closures: previously, if the node crashed after writing channel monitors but before writing the channel manager, the monitors would be ahead of the manager on restart. By deferring monitor writes until after the channel manager is persisted (via flush()), we ensure the manager is always at least as up-to-date as the monitors. The flush() method takes a count parameter specifying how many queued writes to flush. The background processor captures the queue size before persisting the channel manager, then flushes exactly that many writes afterward. This prevents flushing monitor updates that arrived after the manager state was captured. Key changes: - Add PendingWrite enum with FullMonitor and Update variants for queued writes - Add pending_writes queue to MonitorUpdatingPersisterAsyncInner - Add pending_write_count() and flush(count) to Persist trait and ChainMonitor - ChainMonitor::flush() calls channel_monitor_updated for each completed write - Stale update cleanup happens in flush() after full monitor is written - Call flush() in background processor after channel manager persistence Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent b4fb555 commit 6ed79d9

File tree

4 files changed

+314
-220
lines changed

4 files changed

+314
-220
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,6 +1152,11 @@ where
11521152

11531153
let mut futures = Joiner::new();
11541154

1155+
// Capture the number of pending monitor writes before persisting the channel manager.
1156+
// We'll only flush this many writes after the manager is persisted, to avoid flushing
1157+
// monitor updates that arrived after the manager state was captured.
1158+
let pending_monitor_writes = chain_monitor.pending_write_count();
1159+
11551160
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11561161
log_trace!(logger, "Persisting ChannelManager...");
11571162

@@ -1349,6 +1354,15 @@ where
13491354
res?;
13501355
}
13511356

1357+
// Flush the monitor writes that were pending before we persisted the channel manager.
1358+
// Any writes that arrived after are left in the queue for the next iteration.
1359+
if pending_monitor_writes > 0 {
1360+
match chain_monitor.flush(pending_monitor_writes) {
1361+
Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes),
1362+
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
1363+
}
1364+
}
1365+
13521366
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13531367
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13541368
}) {
@@ -1413,6 +1427,16 @@ where
14131427
channel_manager.get_cm().encode(),
14141428
)
14151429
.await?;
1430+
1431+
// Flush all pending monitor writes after final channel manager persistence.
1432+
let pending_monitor_writes = chain_monitor.pending_write_count();
1433+
if pending_monitor_writes > 0 {
1434+
match chain_monitor.flush(pending_monitor_writes) {
1435+
Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes),
1436+
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
1437+
}
1438+
}
1439+
14161440
if let Some(ref scorer) = scorer {
14171441
kv_store
14181442
.write(
@@ -1722,6 +1746,9 @@ impl BackgroundProcessor {
17221746
channel_manager.get_cm().timer_tick_occurred();
17231747
last_freshness_call = Instant::now();
17241748
}
1749+
// Capture the number of pending monitor writes before persisting the channel manager.
1750+
let pending_monitor_writes = chain_monitor.pending_write_count();
1751+
17251752
if channel_manager.get_cm().get_and_clear_needs_persistence() {
17261753
log_trace!(logger, "Persisting ChannelManager...");
17271754
(kv_store.write(
@@ -1733,6 +1760,16 @@ impl BackgroundProcessor {
17331760
log_trace!(logger, "Done persisting ChannelManager.");
17341761
}
17351762

1763+
// Flush the monitor writes that were pending before we persisted the channel manager.
1764+
if pending_monitor_writes > 0 {
1765+
match chain_monitor.flush(pending_monitor_writes) {
1766+
Ok(()) => {
1767+
log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes)
1768+
},
1769+
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
1770+
}
1771+
}
1772+
17361773
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
17371774
log_trace!(logger, "Persisting LiquidityManager...");
17381775
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1853,6 +1890,18 @@ impl BackgroundProcessor {
18531890
CHANNEL_MANAGER_PERSISTENCE_KEY,
18541891
channel_manager.get_cm().encode(),
18551892
)?;
1893+
1894+
// Flush all pending monitor writes after final channel manager persistence.
1895+
let pending_monitor_writes = chain_monitor.pending_write_count();
1896+
if pending_monitor_writes > 0 {
1897+
match chain_monitor.flush(pending_monitor_writes) {
1898+
Ok(()) => {
1899+
log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes)
1900+
},
1901+
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
1902+
}
1903+
}
1904+
18561905
if let Some(ref scorer) = scorer {
18571906
kv_store.write(
18581907
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,

lightning/src/chain/chainmonitor.rs

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::chain::channelmonitor::{
3939
use crate::chain::transaction::{OutPoint, TransactionData};
4040
use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput};
4141
use crate::events::{self, Event, EventHandler, ReplayEvent};
42+
use crate::io;
4243
use crate::ln::channel_state::ChannelDetails;
4344
#[cfg(peer_storage)]
4445
use crate::ln::msgs::PeerStorage;
@@ -198,16 +199,23 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
198199
/// the monitor already exists in the archive.
199200
fn archive_persisted_channel(&self, monitor_name: MonitorName);
200201

201-
/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
202-
/// [`Self::update_persisted_channel`], which have completed.
202+
/// Returns the number of pending writes in the queue.
203203
///
204-
/// Returning an update here is equivalent to calling
205-
/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
206-
/// hidden in the docs.
207-
#[doc(hidden)]
208-
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
209-
Vec::new()
204+
/// This can be used to capture the queue size before persisting the channel manager,
205+
/// then pass that count to [`Self::flush`] to only flush those specific updates.
206+
fn pending_write_count(&self) -> usize {
207+
0
210208
}
209+
210+
/// Flushes pending writes to the underlying storage.
211+
///
212+
/// The `count` parameter specifies how many pending writes to flush.
213+
///
214+
/// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
215+
/// from persist methods), this method should write queued data to storage.
216+
///
217+
/// Returns the list of completed monitor updates (channel_id, update_id) that were flushed.
218+
fn flush(&self, count: usize) -> Result<Vec<(ChannelId, u64)>, io::Error>;
211219
}
212220

213221
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -272,7 +280,6 @@ pub struct AsyncPersister<
272280
FE::Target: FeeEstimator,
273281
{
274282
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
275-
event_notifier: Arc<Notifier>,
276283
}
277284

278285
impl<
@@ -320,26 +327,28 @@ where
320327
&self, monitor_name: MonitorName,
321328
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
322329
) -> ChannelMonitorUpdateStatus {
323-
let notifier = Arc::clone(&self.event_notifier);
324-
self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
330+
self.persister.queue_new_channel(monitor_name, monitor);
325331
ChannelMonitorUpdateStatus::InProgress
326332
}
327333

328334
fn update_persisted_channel(
329335
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
330336
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
331337
) -> ChannelMonitorUpdateStatus {
332-
let notifier = Arc::clone(&self.event_notifier);
333-
self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
338+
self.persister.queue_channel_update(monitor_name, monitor_update, monitor);
334339
ChannelMonitorUpdateStatus::InProgress
335340
}
336341

337342
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
338343
self.persister.spawn_async_archive_persisted_channel(monitor_name);
339344
}
340345

341-
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
342-
self.persister.get_and_clear_completed_updates()
346+
fn pending_write_count(&self) -> usize {
347+
self.persister.pending_write_count()
348+
}
349+
350+
fn flush(&self, count: usize) -> Result<Vec<(ChannelId, u64)>, io::Error> {
351+
crate::util::persist::poll_sync_future(self.persister.flush(count))
343352
}
344353
}
345354

@@ -440,7 +449,6 @@ impl<
440449
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
441450
_our_peerstorage_encryption_key: PeerStorageKey,
442451
) -> Self {
443-
let event_notifier = Arc::new(Notifier::new());
444452
Self {
445453
monitors: RwLock::new(new_hash_map()),
446454
chain_source,
@@ -450,8 +458,8 @@ impl<
450458
_entropy_source,
451459
pending_monitor_events: Mutex::new(Vec::new()),
452460
highest_chain_height: AtomicUsize::new(0),
453-
event_notifier: Arc::clone(&event_notifier),
454-
persister: AsyncPersister { persister, event_notifier },
461+
event_notifier: Arc::new(Notifier::new()),
462+
persister: AsyncPersister { persister },
455463
pending_send_only_events: Mutex::new(Vec::new()),
456464
#[cfg(peer_storage)]
457465
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
@@ -742,6 +750,30 @@ where
742750
.collect()
743751
}
744752

753+
/// Returns the number of pending writes in the persister queue.
754+
///
755+
/// This can be used to capture the queue size before persisting the channel manager,
756+
/// then pass that count to [`Self::flush`] to only flush those specific updates.
757+
pub fn pending_write_count(&self) -> usize {
758+
self.persister.pending_write_count()
759+
}
760+
761+
/// Flushes pending writes to the underlying storage.
762+
///
763+
/// If `count` is `Some(n)`, only the first `n` pending writes are flushed.
764+
/// If `count` is `None`, all pending writes are flushed.
765+
///
766+
/// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
767+
/// from persist methods), this method writes queued data to storage and signals
768+
/// completion to the channel manager via [`Self::channel_monitor_updated`].
769+
pub fn flush(&self, count: usize) -> Result<(), io::Error> {
770+
let completed = self.persister.flush(count)?;
771+
for (channel_id, update_id) in completed {
772+
let _ = self.channel_monitor_updated(channel_id, update_id);
773+
}
774+
Ok(())
775+
}
776+
745777
#[cfg(any(test, feature = "_test_utils"))]
746778
pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor<ChannelSigner> {
747779
self.monitors.write().unwrap().remove(channel_id).unwrap().monitor
@@ -1497,9 +1529,6 @@ where
14971529
fn release_pending_monitor_events(
14981530
&self,
14991531
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1500-
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1501-
let _ = self.channel_monitor_updated(channel_id, update_id);
1502-
}
15031532
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
15041533
for monitor_state in self.monitors.read().unwrap().values() {
15051534
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();

0 commit comments

Comments
 (0)