Skip to content

Commit 688e994

Browse files
refactor: lazy sync requests and batched payment store updates
Split wallet_sync_request into separate full_scan and incremental methods so only the needed request type is built. Introduce WalletSyncRequest enum to carry the chosen variant through the sync pipeline. Spawn primary and additional wallet syncs concurrently in a single JoinSet instead of running primary first. Batch update_payment_store_for_all_transactions and write_node_metrics into a single call after all updates are applied, and only when at least one update succeeded. Fix a pre-existing bug where additional wallet sync timestamps were set even when apply_update_for_address_type failed. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 6b4d1be commit 688e994

File tree

5 files changed

+367
-271
lines changed

5 files changed

+367
-271
lines changed

crates/bdk-wallet-aggregate/src/lib.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -496,15 +496,20 @@ where
496496

497497
// ─── Sync ───────────────────────────────────────────────────────────
498498

499-
/// Build sync requests for a specific wallet.
500-
#[allow(clippy::type_complexity)]
501-
pub fn wallet_sync_request(
499+
/// Build a full scan request for a specific wallet.
500+
pub fn wallet_full_scan_request(
502501
&self, key: &K,
503-
) -> Result<(FullScanRequest<KeychainKind>, SyncRequest<(KeychainKind, u32)>), Error> {
502+
) -> Result<FullScanRequest<KeychainKind>, Error> {
504503
let wallet = self.wallets.get(key).ok_or(Error::WalletNotFound)?;
505-
let full_scan = wallet.start_full_scan().build();
506-
let incremental_sync = wallet.start_sync_with_revealed_spks().build();
507-
Ok((full_scan, incremental_sync))
504+
Ok(wallet.start_full_scan().build())
505+
}
506+
507+
/// Build an incremental sync request for a specific wallet.
508+
pub fn wallet_incremental_sync_request(
509+
&self, key: &K,
510+
) -> Result<SyncRequest<(KeychainKind, u32)>, Error> {
511+
let wallet = self.wallets.get(key).ok_or(Error::WalletNotFound)?;
512+
Ok(wallet.start_sync_with_revealed_spks().build())
508513
}
509514

510515
/// Apply a chain update to the primary wallet.

src/chain/electrum.rs

Lines changed: 130 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -141,113 +141,163 @@ impl ElectrumChainSource {
141141
);
142142
return Err(Error::FeerateEstimationUpdateFailed);
143143
};
144-
// If this is our first sync, do a full scan with the configured gap limit.
145-
// Otherwise just do an incremental sync.
146-
let incremental_sync =
147-
self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();
148-
149-
let apply_wallet_update =
150-
|update_res: Result<BdkUpdate, Error>, now: Instant| match update_res {
151-
Ok(update) => match onchain_wallet.apply_update(update) {
152-
Ok(wallet_events) => {
153-
log_info!(
154-
self.logger,
155-
"{} of on-chain wallet finished in {}ms.",
156-
if incremental_sync { "Incremental sync" } else { "Sync" },
157-
now.elapsed().as_millis()
158-
);
159-
let unix_time_secs_opt =
160-
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
161-
{
162-
let mut locked_node_metrics = self.node_metrics.write().unwrap();
163-
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
164-
unix_time_secs_opt;
165-
write_node_metrics(
166-
&*locked_node_metrics,
167-
Arc::clone(&self.kv_store),
168-
Arc::clone(&self.logger),
169-
)?;
170-
}
171-
Ok(wallet_events)
172-
},
173-
Err(e) => Err(e),
174-
},
175-
Err(e) => Err(e),
176-
};
177144

178-
let cached_txs = onchain_wallet.get_cached_txs();
179-
180-
let primary_result = if incremental_sync {
181-
let incremental_sync_request = onchain_wallet.get_incremental_sync_request();
182-
let incremental_sync_fut = electrum_client
183-
.get_incremental_sync_wallet_update(incremental_sync_request, cached_txs.clone());
184-
185-
let now = Instant::now();
186-
let update_res = incremental_sync_fut.await.map(|u| u.into());
187-
apply_wallet_update(update_res, now)
188-
} else {
189-
let full_scan_request = onchain_wallet.get_full_scan_request();
190-
let full_scan_fut =
191-
electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs.clone());
192-
let now = Instant::now();
193-
let update_res = full_scan_fut.await.map(|u| u.into());
194-
apply_wallet_update(update_res, now)
195-
};
196-
197-
let (mut all_events, primary_error) = match primary_result {
198-
Ok(events) => (events, None),
199-
Err(e) => (Vec::new(), Some(e)),
200-
};
145+
let primary_incremental =
146+
self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();
201147

202148
let additional_types =
203149
self.address_type_runtime_config.read().unwrap().additional_address_types();
204-
let sync_requests = super::collect_additional_sync_requests(
150+
let additional_sync_requests = super::collect_additional_sync_requests(
205151
&additional_types,
206152
&onchain_wallet,
207153
&self.node_metrics,
208154
&self.logger,
209155
);
210156

211-
let mut join_set = tokio::task::JoinSet::new();
212-
for (address_type, full_scan_req, incremental_req, do_incremental) in sync_requests {
157+
let primary_request: super::WalletSyncRequest = if primary_incremental {
158+
super::WalletSyncRequest::Incremental(onchain_wallet.get_incremental_sync_request())
159+
} else {
160+
super::WalletSyncRequest::FullScan(onchain_wallet.get_full_scan_request())
161+
};
162+
163+
// Collect cached transactions once and share via Arc to avoid cloning
164+
// the entire Vec for each spawned task.
165+
let cached_txs = Arc::new(onchain_wallet.get_cached_txs());
166+
167+
// Primary wallet is identified by address_type = None in the JoinSet results.
168+
let now = Instant::now();
169+
let mut join_set: tokio::task::JoinSet<(
170+
Option<crate::config::AddressType>,
171+
Result<BdkUpdate, Error>,
172+
)> = tokio::task::JoinSet::new();
173+
174+
{
213175
let client = Arc::clone(&electrum_client);
214-
let txs = cached_txs.clone();
215-
join_set.spawn(async move {
216-
let result: Result<BdkUpdate, Error> = if do_incremental {
217-
client
218-
.get_incremental_sync_wallet_update(incremental_req, txs)
219-
.await
220-
.map(|u| u.into())
221-
} else {
222-
client.get_full_scan_wallet_update(full_scan_req, txs).await.map(|u| u.into())
223-
};
224-
(address_type, result)
225-
});
176+
let txs = Arc::clone(&cached_txs);
177+
match primary_request {
178+
super::WalletSyncRequest::Incremental(req) => {
179+
join_set.spawn(async move {
180+
let result: Result<BdkUpdate, Error> = client
181+
.get_incremental_sync_wallet_update(req, txs.iter().cloned())
182+
.await
183+
.map(|u| u.into());
184+
(None, result)
185+
});
186+
},
187+
super::WalletSyncRequest::FullScan(req) => {
188+
join_set.spawn(async move {
189+
let result: Result<BdkUpdate, Error> = client
190+
.get_full_scan_wallet_update(req, txs.iter().cloned())
191+
.await
192+
.map(|u| u.into());
193+
(None, result)
194+
});
195+
},
196+
}
226197
}
227198

228-
let mut sync_results = Vec::new();
199+
for (address_type, sync_req) in additional_sync_requests {
200+
let client = Arc::clone(&electrum_client);
201+
let txs = Arc::clone(&cached_txs);
202+
match sync_req {
203+
super::WalletSyncRequest::Incremental(req) => {
204+
join_set.spawn(async move {
205+
let result: Result<BdkUpdate, Error> = client
206+
.get_incremental_sync_wallet_update(req, txs.iter().cloned())
207+
.await
208+
.map(|u| u.into());
209+
(Some(address_type), result)
210+
});
211+
},
212+
super::WalletSyncRequest::FullScan(req) => {
213+
join_set.spawn(async move {
214+
let result: Result<BdkUpdate, Error> = client
215+
.get_full_scan_wallet_update(req, txs.iter().cloned())
216+
.await
217+
.map(|u| u.into());
218+
(Some(address_type), result)
219+
});
220+
},
221+
}
222+
}
223+
224+
let mut primary_update: Option<BdkUpdate> = None;
225+
let mut primary_error: Option<Error> = None;
226+
let mut additional_results = Vec::new();
227+
229228
while let Some(join_result) = join_set.join_next().await {
230229
match join_result {
231-
Ok((address_type, Ok(update))) => {
232-
sync_results.push((address_type, Some(update)));
230+
Ok((None, Ok(update))) => {
231+
primary_update = Some(update);
232+
},
233+
Ok((None, Err(e))) => {
234+
primary_error = Some(e);
235+
},
236+
Ok((Some(address_type), Ok(update))) => {
237+
additional_results.push((address_type, Some(update)));
233238
},
234-
Ok((address_type, Err(e))) => {
239+
Ok((Some(address_type), Err(e))) => {
235240
log_warn!(self.logger, "Failed to sync wallet {:?}: {}", address_type, e);
236-
sync_results.push((address_type, None));
241+
additional_results.push((address_type, None));
237242
},
238243
Err(e) => {
239244
log_warn!(self.logger, "Wallet sync task panicked: {}", e);
240245
},
241246
};
242247
}
243248

244-
all_events.extend(super::apply_additional_sync_results(
245-
sync_results,
249+
let mut all_events = Vec::new();
250+
251+
if primary_update.is_none() && primary_error.is_none() {
252+
log_error!(self.logger, "Primary wallet sync task failed unexpectedly");
253+
primary_error = Some(Error::WalletOperationFailed);
254+
}
255+
256+
if let Some(update) = primary_update {
257+
match onchain_wallet.apply_update(update) {
258+
Ok(wallet_events) => {
259+
log_info!(
260+
self.logger,
261+
"{} of primary on-chain wallet finished in {}ms.",
262+
if primary_incremental { "Incremental sync" } else { "Full sync" },
263+
now.elapsed().as_millis()
264+
);
265+
let unix_time_secs_opt =
266+
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
267+
self.node_metrics.write().unwrap().latest_onchain_wallet_sync_timestamp =
268+
unix_time_secs_opt;
269+
all_events.extend(wallet_events);
270+
},
271+
Err(e) => {
272+
primary_error = Some(e);
273+
},
274+
}
275+
}
276+
277+
let (additional_events, any_additional_applied) = super::apply_additional_sync_results(
278+
additional_results,
246279
&onchain_wallet,
247280
&self.node_metrics,
248-
&self.kv_store,
249281
&self.logger,
250-
));
282+
);
283+
all_events.extend(additional_events);
284+
285+
let any_updates_applied = primary_error.is_none() || any_additional_applied;
286+
287+
if any_updates_applied {
288+
if let Err(e) = onchain_wallet.update_payment_store_for_all_transactions() {
289+
log_error!(self.logger, "Failed to update payment store after wallet syncs: {}", e);
290+
}
291+
292+
let locked_node_metrics = self.node_metrics.read().unwrap();
293+
if let Err(e) = write_node_metrics(
294+
&*locked_node_metrics,
295+
Arc::clone(&self.kv_store),
296+
Arc::clone(&self.logger),
297+
) {
298+
log_error!(self.logger, "Failed to persist node metrics: {}", e);
299+
}
300+
}
251301

252302
if let Some(e) = primary_error {
253303
return Err(e);

0 commit comments

Comments
 (0)