Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion lightning-persister/src/fs_store/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ use std::collections::HashMap;
use std::fs;
use std::io::{ErrorKind, Read, Write};
use std::path::{Path, PathBuf};
#[cfg(test)]
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[cfg(test)]
use std::sync::mpsc;
use std::sync::{Arc, Mutex, RwLock};

#[cfg(target_os = "windows")]
Expand Down Expand Up @@ -91,14 +95,20 @@ impl FilesystemStoreState {
}

fn get_new_version_and_lock_ref(&self, dest_file_path: PathBuf) -> (Arc<RwLock<u64>>, u64) {
let mut outer_lock = self.inner.locks.lock().unwrap();

// Allocate the version while holding the lock map mutex so that clean_locks cannot remove the entry after a
// version has been reserved but before its lock reference is cloned.
let version = self.next_version.fetch_add(1, Ordering::Relaxed);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be atomic anymore now, and can be included in the mutex?

@tnull tnull Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not make this a much more invasive change, risking to introduce further edge cases tbh.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, to me it seems more risky to leave the option open to modify the version outside of the lock.

if version == u64::MAX {
panic!("FilesystemStore version counter overflowed");
}
#[cfg(test)]
maybe_pause_after_version_allocation(&self.inner, &dest_file_path);

// Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for
// cleaning up unused locks.
let inner_lock_ref = self.inner.get_inner_lock_ref(dest_file_path);
let inner_lock_ref = Arc::clone(&outer_lock.entry(dest_file_path).or_default());

(inner_lock_ref, version)
}
Expand Down Expand Up @@ -851,3 +861,79 @@ pub(crate) fn get_key_from_dir_entry_path(
},
}
}

#[cfg(test)]
struct VersionAllocatedHook {
dest_file_path: PathBuf,
version_allocated: mpsc::Sender<()>,
continue_write: Mutex<mpsc::Receiver<()>>,
fired: AtomicBool,
}

#[cfg(test)]
static VERSION_ALLOCATED_HOOK: Mutex<Option<Arc<VersionAllocatedHook>>> = Mutex::new(None);

#[cfg(test)]
fn maybe_pause_after_version_allocation(inner: &FilesystemStoreInner, dest_file_path: &Path) {
let hook = VERSION_ALLOCATED_HOOK.lock().unwrap().clone();
if let Some(hook) = hook {
if hook.dest_file_path.as_path() != dest_file_path
|| hook.fired.swap(true, Ordering::AcqRel)
{
return;
}

let version_allocation_holds_lock = inner.locks.try_lock().is_err();
hook.version_allocated.send(()).unwrap();
if !version_allocation_holds_lock {
hook.continue_write.lock().unwrap().recv().unwrap();
}
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::sync::Arc;
use std::thread;

#[test]
fn stale_write_after_lock_cleanup_does_not_overwrite_newer_write() {
let mut temp_path = std::env::temp_dir();
temp_path.push("test_stale_write_after_lock_cleanup");
let _ = std::fs::remove_dir_all(&temp_path);

let state = Arc::new(FilesystemStoreState::new(temp_path.clone()));
let path =
state.get_checked_dest_file_path("ns", "sub", Some("key"), "write", false).unwrap();
let (version_allocated, wait_for_version) = mpsc::channel();
let (continue_write, wait_to_continue) = mpsc::channel();
*VERSION_ALLOCATED_HOOK.lock().unwrap() = Some(Arc::new(VersionAllocatedHook {
dest_file_path: path.clone(),
version_allocated,
continue_write: Mutex::new(wait_to_continue),
fired: AtomicBool::new(false),
}));

let state_for_thread = Arc::clone(&state);
let path_for_thread = path.clone();
let stale_write = thread::spawn(move || {
let (inner_lock_ref, version) =
state_for_thread.get_new_version_and_lock_ref(path_for_thread.clone());
state_for_thread
.inner
.write_version(inner_lock_ref, path_for_thread, b"stale".to_vec(), version, false)
.unwrap();
});

wait_for_version.recv().unwrap();
state.write_impl("ns", "sub", "key", b"newer".to_vec(), false).unwrap();
continue_write.send(()).unwrap();
stale_write.join().unwrap();
*VERSION_ALLOCATED_HOOK.lock().unwrap() = None;

assert_eq!(state.read_impl("ns", "sub", "key", false).unwrap(), b"newer");
let _ = std::fs::remove_dir_all(temp_path);
}
}