diff --git a/lightning-liquidity/src/lsps0/ser.rs b/lightning-liquidity/src/lsps0/ser.rs index 70649fe0f50..d28bba75e52 100644 --- a/lightning-liquidity/src/lsps0/ser.rs +++ b/lightning-liquidity/src/lsps0/ser.rs @@ -256,10 +256,14 @@ impl LSPSDateTime { now_seconds_since_epoch > datetime_seconds_since_epoch } - /// Returns the absolute difference between two datetimes as a `Duration`. + /// Returns the elapsed duration from `other` to `self`, or zero if `other` is later. pub fn duration_since(&self, other: &Self) -> Duration { - let diff_secs = self.0.timestamp().abs_diff(other.0.timestamp()); - Duration::from_secs(diff_secs) + let diff_secs = self.0.timestamp().saturating_sub(other.0.timestamp()); + if diff_secs <= 0 { + Duration::ZERO + } else { + Duration::from_secs(diff_secs as u64) + } } /// Returns the time in seconds since the unix epoch. @@ -971,6 +975,8 @@ pub(crate) mod u32_fee_rate { mod tests { use super::*; + use core::time::Duration; + use lightning::io::Cursor; #[test] @@ -981,4 +987,13 @@ mod tests { let decoded_datetime: LSPSDateTime = Readable::read(&mut Cursor::new(buf)).unwrap(); assert_eq!(expected_datetime, decoded_datetime); } + + #[test] + fn datetime_duration_since_is_directional() { + let earlier = LSPSDateTime::new_from_duration_since_epoch(Duration::from_secs(30)); + let later = LSPSDateTime::new_from_duration_since_epoch(Duration::from_secs(90)); + + assert_eq!(later.duration_since(&earlier), Duration::from_secs(60)); + assert_eq!(earlier.duration_since(&later), Duration::ZERO); + } } diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 7360131a9e9..cb3c62f9d1d 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -61,8 +61,8 @@ struct Webhook { // Timestamp used for tracking when the webhook was created / updated, or when the last notification was sent. // This is used to determine if the webhook is stale and should be pruned. last_used: LSPSDateTime, - // Timestamp when we last sent a notification to the client. This is used to enforce - // notification cooldowns. + // Timestamp when we last sent a notification to the client. This enforces the notification + // cooldown that protects the client from repeated spammy wake-ups. last_notification_sent: Option, } @@ -85,6 +85,12 @@ pub struct LSPS5ServiceConfig { pub const DEFAULT_MAX_WEBHOOKS_PER_CLIENT: u32 = 10; /// Default notification cooldown time in minutes. pub const NOTIFICATION_COOLDOWN_TIME: Duration = Duration::from_secs(60); // 1 minute +/// Minimum time between peer lifecycle events that are allowed to reset notification cooldowns. +/// +/// This is distinct from [`NOTIFICATION_COOLDOWN_TIME`]: that cooldown protects the client from +/// repeated spammy wake-ups, while this reset throttle protects registered notification URLs from +/// amplification via rapid peer connect/disconnect churn. +const NOTIFICATION_COOLDOWN_RESET_INTERVAL: Duration = Duration::from_secs(10); // Default configuration for LSPS5 service. impl Default for LSPS5ServiceConfig { @@ -689,7 +695,10 @@ where pub(crate) fn peer_connected(&self, counterparty_node_id: &PublicKey) { let mut outer_state_lock = self.per_peer_state.write().unwrap(); if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) { - peer_state.reset_notification_cooldown(); + let now = LSPSDateTime::new_from_duration_since_epoch( + self.time_provider.duration_since_epoch(), + ); + peer_state.reset_notification_cooldown(now); } self.check_prune_stale_webhooks(&mut outer_state_lock); } @@ -697,7 +706,10 @@ where pub(crate) fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { let mut outer_state_lock = self.per_peer_state.write().unwrap(); if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) { - peer_state.reset_notification_cooldown(); + let now = LSPSDateTime::new_from_duration_since_epoch( + self.time_provider.duration_since_epoch(), + ); + peer_state.reset_notification_cooldown(now); } self.check_prune_stale_webhooks(&mut outer_state_lock); } @@ -748,6 +760,11 @@ where #[derive(Debug)] pub(crate) struct PeerState { webhooks: Vec<(LSPS5AppName, Webhook)>, + // Timestamp of the last peer lifecycle event that was allowed to clear notification cooldowns. + // This is not the notification cooldown itself: `last_notification_sent` protects clients from + // repeated wake-ups, while this protects registered notification URLs from amplification via + // rapid connection churn. + last_notification_cooldown_reset: Option, needs_persist: bool, } @@ -803,10 +820,18 @@ impl PeerState { removed } - fn reset_notification_cooldown(&mut self) { + fn reset_notification_cooldown(&mut self, now: LSPSDateTime) { + let can_reset = self.last_notification_cooldown_reset.as_ref().map_or(true, |last_reset| { + now.duration_since(last_reset) >= NOTIFICATION_COOLDOWN_RESET_INTERVAL + }); + if !can_reset { + return; + } + for (_, h) in self.webhooks.iter_mut() { h.last_notification_sent = None; } + self.last_notification_cooldown_reset = Some(now); self.needs_persist |= true; } @@ -830,11 +855,77 @@ impl Default for PeerState { fn default() -> Self { let webhooks = Vec::new(); let needs_persist = true; - Self { webhooks, needs_persist } + let last_notification_cooldown_reset = None; + Self { webhooks, last_notification_cooldown_reset, needs_persist } } } impl_ser_tlv_based!(PeerState, { (0, webhooks, required_vec), + (_unused, last_notification_cooldown_reset, (static_value, None::)), (_unused, needs_persist, (static_value, false)), }); + +#[cfg(test)] +mod tests { + use super::*; + + use crate::alloc::string::ToString; + use crate::tests::utils::parse_pubkey; + + fn lsps_datetime(seconds: u64) -> LSPSDateTime { + LSPSDateTime::new_from_duration_since_epoch(Duration::from_secs(seconds)) + } + + fn test_webhook(last_notification_sent: Option) -> (LSPS5AppName, Webhook) { + let app_name = LSPS5AppName::new("test_app".to_string()).unwrap(); + let url = LSPS5WebhookUrl::new("https://example.com/webhook".to_string()).unwrap(); + let counterparty_node_id = + parse_pubkey("02c0ded160a4a70d71058509b647949a938924d3a6e109c6eb6aee8e2bb27dc79c") + .unwrap(); + let webhook = Webhook { + _app_name: app_name.clone(), + url, + _counterparty_node_id: counterparty_node_id, + last_used: lsps_datetime(1_000), + last_notification_sent, + }; + (app_name, webhook) + } + + fn test_peer_state(last_notification_sent: Option) -> PeerState { + PeerState { + webhooks: vec![test_webhook(last_notification_sent)], + last_notification_cooldown_reset: None, + needs_persist: false, + } + } + + #[test] + fn reset_notification_cooldown_is_throttled() { + let first_reset = lsps_datetime(2_000); + let mut peer_state = test_peer_state(Some(first_reset)); + + peer_state.reset_notification_cooldown(first_reset); + assert_eq!(peer_state.webhooks()[0].1.last_notification_sent, None); + assert_eq!(peer_state.last_notification_cooldown_reset, Some(first_reset)); + assert!(peer_state.needs_persist); + + peer_state.needs_persist = false; + let skipped_reset = lsps_datetime(2_009); + let recent_notification = lsps_datetime(2_009); + peer_state.webhooks_mut()[0].1.last_notification_sent = Some(recent_notification); + peer_state.needs_persist = false; + + peer_state.reset_notification_cooldown(skipped_reset); + assert_eq!(peer_state.webhooks()[0].1.last_notification_sent, Some(recent_notification)); + assert_eq!(peer_state.last_notification_cooldown_reset, Some(first_reset)); + assert!(!peer_state.needs_persist); + + let allowed_reset = lsps_datetime(2_010); + peer_state.reset_notification_cooldown(allowed_reset); + assert_eq!(peer_state.webhooks()[0].1.last_notification_sent, None); + assert_eq!(peer_state.last_notification_cooldown_reset, Some(allowed_reset)); + assert!(peer_state.needs_persist); + } +} diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index deed6b2f8b8..5e8c2b5bd28 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -1291,7 +1291,7 @@ fn test_notify_without_webhooks_does_nothing() { } #[test] -fn test_notifications_and_peer_connected_resets_cooldown() { +fn test_notifications_and_peer_connected_reset_is_throttled() { let mock_time_provider = Arc::new(MockTimeProvider::new(1000)); let time_provider = Arc::::clone(&mock_time_provider); let chanmon_cfgs = create_chanmon_cfgs(2); @@ -1369,7 +1369,7 @@ fn test_notifications_and_peer_connected_resets_cooldown() { "Should not emit event due to cooldown" ); - // 5. After peer_connected, notification should be sent again immediately + // 5. The first peer_connected reset should allow another notification immediately. let init_msg = Init { features: lightning_types::features::InitFeatures::empty(), remote_network_address: None, @@ -1387,6 +1387,31 @@ fn test_notifications_and_peer_connected_resets_cooldown() { }, _ => panic!("Expected SendWebhookNotification event after peer_connected"), } + + // 6. A rapid peer lifecycle update should not clear the cooldown again. + service_node.liquidity_manager.peer_disconnected(client_node_id); + let result = service_handler.notify_payment_incoming(client_node_id); + let error = result.unwrap_err(); + assert_eq!(error, LSPS5ProtocolError::SlowDownError); + assert!( + service_node.liquidity_manager.next_event().is_none(), + "Should not emit event after a rapid lifecycle reset" + ); + + // 7. Once the reset throttle has elapsed, peer_connected can reset the cooldown again. + mock_time_provider.advance_time(11); + service_node.liquidity_manager.peer_connected(client_node_id, &init_msg, false).unwrap(); + let _ = service_handler.notify_payment_incoming(client_node_id); + let event = service_node.liquidity_manager.next_event().unwrap(); + match event { + LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { + notification, + .. + }) => { + assert_eq!(notification.method, WebhookNotificationMethod::LSPS5PaymentIncoming); + }, + _ => panic!("Expected SendWebhookNotification event after reset throttle elapsed"), + } } #[test]