diff --git a/lightning-persister/src/fs_store/common.rs b/lightning-persister/src/fs_store/common.rs index 885f806b344..96e58945f84 100644 --- a/lightning-persister/src/fs_store/common.rs +++ b/lightning-persister/src/fs_store/common.rs @@ -11,7 +11,11 @@ use std::collections::HashMap; use std::fs; use std::io::{ErrorKind, Read, Write}; use std::path::{Path, PathBuf}; +#[cfg(test)] +use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +#[cfg(test)] +use std::sync::mpsc; use std::sync::{Arc, Mutex, RwLock}; #[cfg(target_os = "windows")] @@ -91,14 +95,20 @@ impl FilesystemStoreState { } fn get_new_version_and_lock_ref(&self, dest_file_path: PathBuf) -> (Arc>, u64) { + let mut outer_lock = self.inner.locks.lock().unwrap(); + + // Allocate the version while holding the lock map mutex so that clean_locks cannot remove the entry after a + // version has been reserved but before its lock reference is cloned. let version = self.next_version.fetch_add(1, Ordering::Relaxed); if version == u64::MAX { panic!("FilesystemStore version counter overflowed"); } + #[cfg(test)] + maybe_pause_after_version_allocation(&self.inner, &dest_file_path); // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for // cleaning up unused locks. - let inner_lock_ref = self.inner.get_inner_lock_ref(dest_file_path); + let inner_lock_ref = Arc::clone(&outer_lock.entry(dest_file_path).or_default()); (inner_lock_ref, version) } @@ -851,3 +861,79 @@ pub(crate) fn get_key_from_dir_entry_path( }, } } + +#[cfg(test)] +struct VersionAllocatedHook { + dest_file_path: PathBuf, + version_allocated: mpsc::Sender<()>, + continue_write: Mutex>, + fired: AtomicBool, +} + +#[cfg(test)] +static VERSION_ALLOCATED_HOOK: Mutex>> = Mutex::new(None); + +#[cfg(test)] +fn maybe_pause_after_version_allocation(inner: &FilesystemStoreInner, dest_file_path: &Path) { + let hook = VERSION_ALLOCATED_HOOK.lock().unwrap().clone(); + if let Some(hook) = hook { + if hook.dest_file_path.as_path() != dest_file_path + || hook.fired.swap(true, Ordering::AcqRel) + { + return; + } + + let version_allocation_holds_lock = inner.locks.try_lock().is_err(); + hook.version_allocated.send(()).unwrap(); + if !version_allocation_holds_lock { + hook.continue_write.lock().unwrap().recv().unwrap(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::sync::Arc; + use std::thread; + + #[test] + fn stale_write_after_lock_cleanup_does_not_overwrite_newer_write() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_stale_write_after_lock_cleanup"); + let _ = std::fs::remove_dir_all(&temp_path); + + let state = Arc::new(FilesystemStoreState::new(temp_path.clone())); + let path = + state.get_checked_dest_file_path("ns", "sub", Some("key"), "write", false).unwrap(); + let (version_allocated, wait_for_version) = mpsc::channel(); + let (continue_write, wait_to_continue) = mpsc::channel(); + *VERSION_ALLOCATED_HOOK.lock().unwrap() = Some(Arc::new(VersionAllocatedHook { + dest_file_path: path.clone(), + version_allocated, + continue_write: Mutex::new(wait_to_continue), + fired: AtomicBool::new(false), + })); + + let state_for_thread = Arc::clone(&state); + let path_for_thread = path.clone(); + let stale_write = thread::spawn(move || { + let (inner_lock_ref, version) = + state_for_thread.get_new_version_and_lock_ref(path_for_thread.clone()); + state_for_thread + .inner + .write_version(inner_lock_ref, path_for_thread, b"stale".to_vec(), version, false) + .unwrap(); + }); + + wait_for_version.recv().unwrap(); + state.write_impl("ns", "sub", "key", b"newer".to_vec(), false).unwrap(); + continue_write.send(()).unwrap(); + stale_write.join().unwrap(); + *VERSION_ALLOCATED_HOOK.lock().unwrap() = None; + + assert_eq!(state.read_impl("ns", "sub", "key", false).unwrap(), b"newer"); + let _ = std::fs::remove_dir_all(temp_path); + } +}