Skip to content

Commit b3544de

Browse files
committed
Reset LSPS5 persistence_in_flight counter on persist errors
`LSPS5ServiceHandler::persist` incremented `persistence_in_flight` at the top as a single-runner gate, but only decremented it on the success path: each interior `?` on a `kv_store` future propagated the error out of the function while leaving the counter at >= 1. After one transient I/O failure (disk full, brief unavailability of a remote `KVStore`, EPERM, etc.) every subsequent `persist()` call hit the `fetch_add > 0` short-circuit and silently returned `Ok(false)`. The in-memory `needs_persist` flags then continued to grow without ever reaching disk, so webhook state, removals, and notification cooldowns were lost on the next process restart — including the spec-mandated webhook retention/pruning state — without any error surfaced to the operator. The counter is monotonic, so recovery required a process restart. Adopt the LSPS1 / LSPS2 pattern: split the body into an inner `do_persist` and an outer `persist` that unconditionally clears the counter via `store(0)` after the call returns, regardless of outcome. A failed write now still propagates `Err`, but the next `persist()` attempt actually retries the write instead of no-op'ing. Co-Authored-By: HAL 9000
1 parent 1a26867 commit b3544de

2 files changed

Lines changed: 244 additions & 61 deletions

File tree

lightning-liquidity/src/lsps5/service.rs

Lines changed: 74 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -245,84 +245,97 @@ where
245245
// introduce some batching to upper-bound the number of requests inflight at any given
246246
// time.
247247

248-
let mut did_persist = false;
249-
250248
if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
251249
// If we're not the first event processor to get here, just return early, the increment
252250
// we just did will be treated as "go around again" at the end.
253-
return Ok(did_persist);
251+
return Ok(false);
254252
}
255253

254+
let mut did_persist = false;
255+
256256
loop {
257-
let mut need_remove = Vec::new();
258-
let mut need_persist = Vec::new();
257+
match self.do_persist().await {
258+
Ok(pass_did_persist) => did_persist |= pass_did_persist,
259+
Err(e) => {
260+
self.persistence_in_flight.store(0, Ordering::Release);
261+
return Err(e);
262+
},
263+
}
259264

260-
self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
261-
{
262-
let outer_state_lock = self.per_peer_state.read().unwrap();
263-
264-
for (client_id, peer_state) in outer_state_lock.iter() {
265-
let is_prunable = peer_state.is_prunable();
266-
let has_open_channel = self.client_has_open_channel(client_id);
267-
if is_prunable && !has_open_channel {
268-
need_remove.push(*client_id);
269-
} else if peer_state.needs_persist {
270-
need_persist.push(*client_id);
271-
}
272-
}
265+
if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
266+
// If another thread incremented the state while we were running we should go
267+
// around again, but only once.
268+
self.persistence_in_flight.store(1, Ordering::Release);
269+
continue;
273270
}
271+
break;
272+
}
274273

275-
for client_id in need_persist.into_iter() {
276-
debug_assert!(!need_remove.contains(&client_id));
277-
self.persist_peer_state(client_id).await?;
278-
did_persist = true;
274+
Ok(did_persist)
275+
}
276+
277+
async fn do_persist(&self) -> Result<bool, lightning::io::Error> {
278+
let mut did_persist = false;
279+
let mut need_remove = Vec::new();
280+
let mut need_persist = Vec::new();
281+
282+
self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
283+
{
284+
let outer_state_lock = self.per_peer_state.read().unwrap();
285+
286+
for (client_id, peer_state) in outer_state_lock.iter() {
287+
let is_prunable = peer_state.is_prunable();
288+
let has_open_channel = self.client_has_open_channel(client_id);
289+
if is_prunable && !has_open_channel {
290+
need_remove.push(*client_id);
291+
} else if peer_state.needs_persist {
292+
need_persist.push(*client_id);
293+
}
279294
}
295+
}
280296

281-
for client_id in need_remove {
282-
let mut future_opt = None;
283-
{
284-
// We need to take the `per_peer_state` write lock to remove an entry, but also
285-
// have to hold it until after the `remove` call returns (but not through
286-
// future completion) to ensure that writes for the peer's state are
287-
// well-ordered with other `persist_peer_state` calls even across the removal
288-
// itself.
289-
let mut per_peer_state = self.per_peer_state.write().unwrap();
290-
if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
291-
let state = entry.get_mut();
292-
if state.is_prunable() && !self.client_has_open_channel(&client_id) {
293-
entry.remove();
294-
let key = client_id.to_string();
295-
future_opt = Some(self.kv_store.remove(
296-
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
297-
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
298-
&key,
299-
true,
300-
));
301-
} else {
302-
// If the peer was re-added, force a re-persist of the current state.
303-
state.needs_persist = true;
304-
}
297+
for client_id in need_persist.into_iter() {
298+
debug_assert!(!need_remove.contains(&client_id));
299+
self.persist_peer_state(client_id).await?;
300+
did_persist = true;
301+
}
302+
303+
for client_id in need_remove {
304+
let mut future_opt = None;
305+
{
306+
// We need to take the `per_peer_state` write lock to remove an entry, but also
307+
// have to hold it until after the `remove` call returns (but not through
308+
// future completion) to ensure that writes for the peer's state are
309+
// well-ordered with other `persist_peer_state` calls even across the removal
310+
// itself.
311+
let mut per_peer_state = self.per_peer_state.write().unwrap();
312+
if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
313+
let state = entry.get_mut();
314+
if state.is_prunable() && !self.client_has_open_channel(&client_id) {
315+
entry.remove();
316+
let key = client_id.to_string();
317+
future_opt = Some(self.kv_store.remove(
318+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
319+
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
320+
&key,
321+
true,
322+
));
305323
} else {
306-
// This should never happen, we can only have one `persist` call
307-
// in-progress at once and map entries are only removed by it.
308-
debug_assert!(false);
324+
// If the peer was re-added, force a re-persist of the current state.
325+
state.needs_persist = true;
309326
}
310-
}
311-
if let Some(future) = future_opt {
312-
future.await?;
313-
did_persist = true;
314327
} else {
315-
self.persist_peer_state(client_id).await?;
328+
// This should never happen, we can only have one `persist` call
329+
// in-progress at once and map entries are only removed by it.
330+
debug_assert!(false);
316331
}
317332
}
318-
319-
if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
320-
// If another thread incremented the state while we were running we should go
321-
// around again, but only once.
322-
self.persistence_in_flight.store(1, Ordering::Release);
323-
continue;
333+
if let Some(future) = future_opt {
334+
future.await?;
335+
did_persist = true;
336+
} else {
337+
self.persist_peer_state(client_id).await?;
324338
}
325-
break;
326339
}
327340

328341
Ok(did_persist)

lightning-liquidity/tests/lsps5_integration_tests.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1633,3 +1633,173 @@ fn lsps5_service_handler_persistence_across_restarts() {
16331633
}
16341634
}
16351635
}
1636+
1637+
struct FailableKVStore {
1638+
inner: TestStore,
1639+
fail_lsps5: std::sync::atomic::AtomicBool,
1640+
}
1641+
1642+
impl FailableKVStore {
1643+
fn new() -> Self {
1644+
Self { inner: TestStore::new(false), fail_lsps5: std::sync::atomic::AtomicBool::new(false) }
1645+
}
1646+
1647+
fn set_fail_lsps5(&self, fail: bool) {
1648+
self.fail_lsps5.store(fail, std::sync::atomic::Ordering::SeqCst);
1649+
}
1650+
}
1651+
1652+
impl lightning::util::persist::KVStoreSync for FailableKVStore {
1653+
fn read(
1654+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
1655+
) -> lightning::io::Result<Vec<u8>> {
1656+
<TestStore as lightning::util::persist::KVStoreSync>::read(
1657+
&self.inner,
1658+
primary_namespace,
1659+
secondary_namespace,
1660+
key,
1661+
)
1662+
}
1663+
1664+
fn write(
1665+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
1666+
) -> lightning::io::Result<()> {
1667+
if secondary_namespace == "lsps5_service"
1668+
&& self.fail_lsps5.load(std::sync::atomic::Ordering::SeqCst)
1669+
{
1670+
return Err(lightning::io::Error::new(
1671+
lightning::io::ErrorKind::Other,
1672+
"intentional failure for lsps5 namespace",
1673+
));
1674+
}
1675+
<TestStore as lightning::util::persist::KVStoreSync>::write(
1676+
&self.inner,
1677+
primary_namespace,
1678+
secondary_namespace,
1679+
key,
1680+
buf,
1681+
)
1682+
}
1683+
1684+
fn remove(
1685+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
1686+
) -> lightning::io::Result<()> {
1687+
if secondary_namespace == "lsps5_service"
1688+
&& self.fail_lsps5.load(std::sync::atomic::Ordering::SeqCst)
1689+
{
1690+
return Err(lightning::io::Error::new(
1691+
lightning::io::ErrorKind::Other,
1692+
"intentional failure for lsps5 namespace",
1693+
));
1694+
}
1695+
<TestStore as lightning::util::persist::KVStoreSync>::remove(
1696+
&self.inner,
1697+
primary_namespace,
1698+
secondary_namespace,
1699+
key,
1700+
lazy,
1701+
)
1702+
}
1703+
1704+
fn list(
1705+
&self, primary_namespace: &str, secondary_namespace: &str,
1706+
) -> lightning::io::Result<Vec<String>> {
1707+
<TestStore as lightning::util::persist::KVStoreSync>::list(
1708+
&self.inner,
1709+
primary_namespace,
1710+
secondary_namespace,
1711+
)
1712+
}
1713+
}
1714+
1715+
#[test]
1716+
fn lsps5_service_persist_resets_in_flight_counter_on_io_error() {
1717+
use lightning::ln::peer_handler::CustomMessageHandler;
1718+
1719+
let chanmon_cfgs = create_chanmon_cfgs(2);
1720+
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1721+
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1722+
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1723+
1724+
let service_kv_store = Arc::new(FailableKVStore::new());
1725+
let client_kv_store = Arc::new(TestStore::new(false));
1726+
1727+
let service_config = LiquidityServiceConfig {
1728+
lsps1_service_config: None,
1729+
lsps2_service_config: None,
1730+
lsps5_service_config: Some(LSPS5ServiceConfig::default()),
1731+
advertise_service: true,
1732+
};
1733+
let client_config = LiquidityClientConfig {
1734+
lsps1_client_config: None,
1735+
lsps2_client_config: None,
1736+
lsps5_client_config: Some(LSPS5ClientConfig::default()),
1737+
};
1738+
let time_provider: Arc<dyn TimeProvider + Send + Sync> = Arc::new(DefaultTimeProvider);
1739+
1740+
let service_lm = LiquidityManagerSync::new_with_custom_time_provider(
1741+
nodes[0].keys_manager,
1742+
nodes[0].keys_manager,
1743+
nodes[0].node,
1744+
Arc::clone(&service_kv_store),
1745+
nodes[0].tx_broadcaster,
1746+
Some(service_config),
1747+
None,
1748+
Arc::clone(&time_provider),
1749+
)
1750+
.unwrap();
1751+
1752+
let client_lm = LiquidityManagerSync::new_with_custom_time_provider(
1753+
nodes[1].keys_manager,
1754+
nodes[1].keys_manager,
1755+
nodes[1].node,
1756+
client_kv_store,
1757+
nodes[1].tx_broadcaster,
1758+
None,
1759+
Some(client_config),
1760+
Arc::clone(&time_provider),
1761+
)
1762+
.unwrap();
1763+
1764+
let service_node_id = nodes[0].node.get_our_node_id();
1765+
let client_node_id = nodes[1].node.get_our_node_id();
1766+
1767+
create_chan_between_nodes(&nodes[0], &nodes[1]);
1768+
1769+
let client_handler = client_lm.lsps5_client_handler().unwrap();
1770+
client_handler
1771+
.set_webhook(service_node_id, "App".to_string(), "https://example.org/hook".to_string())
1772+
.unwrap();
1773+
1774+
let req_msgs = client_lm.get_and_clear_pending_msg();
1775+
assert_eq!(req_msgs.len(), 1);
1776+
let (_, request) = req_msgs.into_iter().next().unwrap();
1777+
service_lm.handle_custom_message(request, client_node_id).unwrap();
1778+
1779+
// Consume the SendWebhookNotification event so pending events queue is drained.
1780+
let _ = service_lm.next_event();
1781+
let _ = service_lm.get_and_clear_pending_msg();
1782+
1783+
// Initial persist should succeed and clear all needs_persist flags.
1784+
service_lm.persist().expect("initial persist should succeed");
1785+
1786+
// Now arrange for lsps5 writes to fail and dirty lsps5 state without dirtying
1787+
// pending_events (which lives in a different namespace).
1788+
service_kv_store.set_fail_lsps5(true);
1789+
service_lm.peer_disconnected(client_node_id);
1790+
1791+
// First persist attempt should error out due to the failing kv_store.
1792+
let res1 = service_lm.persist();
1793+
assert!(res1.is_err(), "persist should fail when lsps5 kv_store write fails");
1794+
1795+
// Second persist attempt must still attempt the write (and fail again). With the
1796+
// bug, the LSPS5 service handler's `persistence_in_flight` counter is left above
1797+
// zero on error so this returns Ok(false) immediately, silently dropping the
1798+
// pending state and breaking persistence forever.
1799+
let res2 = service_lm.persist();
1800+
assert!(
1801+
res2.is_err(),
1802+
"after a failed persist, subsequent persist calls must still attempt to persist; got {:?}",
1803+
res2,
1804+
);
1805+
}

0 commit comments

Comments
 (0)