Skip to content

Commit f881ab0

Browse files
feat: direct webhook delivery from LiquiditySource (#9)
Fire HTTP POST directly in handle_next_event() instead of routing through the event queue (S3 persistence), cutting webhook latency from 15-23s to <2s. LiquiditySource gets optional reqwest::Client + webhook_url fields. When configured, SendWebhook events bypass the event queue entirely. Falls back to event queue if not configured. Also bumps rust-lightning to include HTLC expiry threshold increase (10s -> 45s, moneydevkit/rust-lightning#5, 1432d063c).
1 parent 9dd57f4 commit f881ab0

File tree

3 files changed

+71
-17
lines changed

3 files changed

+71
-17
lines changed

Cargo.toml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,17 @@ default = []
4242
# lightning-macros = { version = "0.2.0" }
4343

4444
# Branch: https://github.com/moneydevkit/rust-lightning/commits/lsp-0.2.0/
45-
lightning = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c", features = ["std"] }
46-
lightning-types = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c" }
47-
lightning-invoice = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c", features = ["std"] }
48-
lightning-net-tokio = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c" }
49-
lightning-persister = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c", features = ["tokio"] }
50-
lightning-background-processor = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c" }
51-
lightning-rapid-gossip-sync = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c" }
52-
lightning-block-sync = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c", features = ["rest-client", "rpc-client", "tokio"] }
53-
lightning-transaction-sync = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
54-
lightning-liquidity = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c", features = ["std"] }
55-
lightning-macros = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c" }
45+
lightning = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85", features = ["std"] }
46+
lightning-types = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85" }
47+
lightning-invoice = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85", features = ["std"] }
48+
lightning-net-tokio = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85" }
49+
lightning-persister = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85", features = ["tokio"] }
50+
lightning-background-processor = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85" }
51+
lightning-rapid-gossip-sync = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85" }
52+
lightning-block-sync = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85", features = ["rest-client", "rpc-client", "tokio"] }
53+
lightning-transaction-sync = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
54+
lightning-liquidity = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85", features = ["std"] }
55+
lightning-macros = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85" }
5656

5757
#lightning = { path = "../rust-lightning/lightning", features = ["std"] }
5858
#lightning-types = { path = "../rust-lightning/lightning-types" }
@@ -101,7 +101,7 @@ winapi = { version = "0.3", features = ["winbase"] }
101101
[dev-dependencies]
102102
# lightning = { version = "0.2.0", features = ["std", "_test_utils"] }
103103
# Branch: https://github.com/moneydevkit/rust-lightning/commits/lsp-0.2.0/
104-
lightning = { git = "https://github.com/moneydevkit/rust-lightning", rev = "145438f9fa8e08cddcb2f7fe15c8613cfe464f8c", features = ["std", "_test_utils"] }
104+
lightning = { git = "https://github.com/moneydevkit/rust-lightning", rev = "1432d063c277f303266a6a5345789617f2e05e85", features = ["std", "_test_utils"] }
105105
#lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] }
106106
proptest = "1.0.0"
107107
regex = "1.5.6"

src/builder.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ struct LiquiditySourceConfig {
136136
lsps4_client: Option<LSPS4ClientConfig>,
137137
// Act as an LSPS4 service.
138138
lsps4_service: Option<LSPS4ServiceConfig>,
139+
// URL for direct webhook delivery from LiquiditySource.
140+
webhook_url: Option<String>,
139141
}
140142

141143
#[derive(Clone)]
@@ -525,6 +527,15 @@ impl NodeBuilder {
525527
self
526528
}
527529

530+
/// Sets a webhook URL for direct HTTP delivery of payment notifications from the
531+
/// liquidity source, bypassing the event queue.
532+
pub fn set_webhook_url(&mut self, url: String) -> &mut Self {
533+
let liquidity_source_config =
534+
self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default());
535+
liquidity_source_config.webhook_url = Some(url);
536+
self
537+
}
538+
528539
/// Sets the used storage directory path.
529540
pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self {
530541
self.config.storage_dir_path = storage_dir_path;
@@ -1782,6 +1793,10 @@ fn build_with_store_internal(
17821793
.as_ref()
17831794
.map(|config| liquidity_source_builder.lsps4_service(config.clone()));
17841795

1796+
if let Some(ref url) = lsc.webhook_url {
1797+
liquidity_source_builder.webhook(url.clone());
1798+
}
1799+
17851800
let liquidity_source = runtime
17861801
.block_on(async move { liquidity_source_builder.build().await.map(Arc::new) })?;
17871802
let custom_message_handler =

src/liquidity.rs

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ where
208208
config: Arc<Config>,
209209
logger: L,
210210
event_queue: Arc<EventQueue<L>>,
211+
webhook_client: Option<reqwest::Client>,
212+
webhook_url: Option<String>,
211213
}
212214

213215
impl<L: Deref + Clone> LiquiditySourceBuilder<L>
@@ -239,9 +241,18 @@ where
239241
config,
240242
logger,
241243
event_queue,
244+
webhook_client: None,
245+
webhook_url: None,
242246
}
243247
}
244248

249+
/// Configures a webhook URL for direct HTTP delivery of payment notifications.
250+
pub(crate) fn webhook(&mut self, url: String) -> &mut Self {
251+
self.webhook_client = Some(reqwest::Client::new());
252+
self.webhook_url = Some(url);
253+
self
254+
}
255+
245256
pub(crate) fn lsps1_client(
246257
&mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, token: Option<String>,
247258
) -> &mut Self {
@@ -366,6 +377,8 @@ where
366377
config: self.config,
367378
logger: self.logger,
368379
event_queue: self.event_queue,
380+
webhook_client: self.webhook_client,
381+
webhook_url: self.webhook_url,
369382
})
370383
}
371384
}
@@ -387,6 +400,8 @@ where
387400
config: Arc<Config>,
388401
logger: L,
389402
event_queue: Arc<EventQueue<L>>,
403+
webhook_client: Option<reqwest::Client>,
404+
webhook_url: Option<String>,
390405
}
391406

392407
impl<L: Deref + Clone> LiquiditySource<L>
@@ -1203,11 +1218,35 @@ where
12031218
}
12041219
},
12051220
LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::SendWebhook { counterparty_node_id, payment_hash }) => {
1206-
if let Err(e) = self
1207-
.event_queue
1208-
.add_event(crate::event::Event::SendWebhook { node_id: counterparty_node_id, payment_hash }).await
1209-
{
1210-
log_error!(self.logger, "Failed to queue webhook event: {:?}", e);
1221+
if let (Some(client), Some(url)) = (&self.webhook_client, &self.webhook_url) {
1222+
let mut json_body = HashMap::new();
1223+
json_body.insert("nodeId", counterparty_node_id.to_string());
1224+
json_body.insert("paymentHash", payment_hash.to_string());
1225+
log_info!(
1226+
self.logger,
1227+
"Sending webhook directly for payment_hash={} node={}",
1228+
payment_hash,
1229+
counterparty_node_id
1230+
);
1231+
if let Err(e) = client.post(url).json(&json_body).send().await {
1232+
log_error!(self.logger, "Direct webhook POST failed: {:?}", e);
1233+
}
1234+
} else {
1235+
// Fallback: route through event queue (slower path via S3 persistence).
1236+
log_info!(
1237+
self.logger,
1238+
"No direct webhook configured, falling back to event queue"
1239+
);
1240+
if let Err(e) = self
1241+
.event_queue
1242+
.add_event(crate::event::Event::SendWebhook {
1243+
node_id: counterparty_node_id,
1244+
payment_hash,
1245+
})
1246+
.await
1247+
{
1248+
log_error!(self.logger, "Failed to queue webhook event: {:?}", e);
1249+
}
12111250
}
12121251
},
12131252
e => {

0 commit comments

Comments
 (0)