fix(fanout): replace nack-on-full with accept_pdata() backpressure to prevent silent data loss#2225
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2225 +/- ##
==========================================
- Coverage 87.39% 87.39% -0.01%
==========================================
Files 568 568
Lines 190730 190784 +54
==========================================
+ Hits 166695 166741 +46
- Misses 23509 23517 +8
Partials 526 526
🚀 New features to boost your workflow:
|
|
This is ready for review now. |
There was a problem hiding this comment.
Thanks for the detailed explanation @lalitb - the suggested change makes sense to me, but I think we should still get larger consensus before merging since this is core processor/enginecode.
You mention that batch processor isn't affected in same way because there is an expectation for a retry processor ahead of it.
Could the same pattern not be followed between a receiver and fanout processor rather than changing the core engine loop?
receiver -> retry -> fanout
A retry processor upstream doesn't fix this. It masks it differently:
Both fail because NACKing doesn't create backpressure, |
Agreed with the above. Just want to add a couple more points: Even if retry doesn't OOM, it eventually gives up - once The other motivation for fixing this at the engine level: batch has the same class of problem. Today it crashes with |
|
@lalitb @jmacd @albertlockett @drewrelmas @utpilla Please do not merge this PR before I have had a chance to review it. Sorry for the delay, I have too many things going on in parallel right now. |
lquerel
left a comment
There was a problem hiding this comment.
Great fix!
I made a few suggestions that should be integrated before merging.
I think there are still some comments and documentation mentioning that overflow messages are nacked.
| debug_assert!( | ||
| self.accept_pdata(), | ||
| "process() called with PData while at max_inflight ({}) — \ | ||
| engine must check accept_pdata() before delivering", | ||
| self.config.max_inflight | ||
| ); |
There was a problem hiding this comment.
The comment for the recv_when method says:
/// During shutdown draining the guard is ignored — pdata is
/// always drained until the deadline, regardless of
/// `accept_pdata`.
So in debug mode, this debug_assert! can panic during shutdown, especially when fanout is already in a saturated state.
I’m wondering why, once shutdown draining starts, we couldn’t continue honoring accept_pdata for pdata, while still delivering cleanup control messages (Ack/Nack) so that stateful processors can reduce their in-flight state and reopen capacity.
There was a problem hiding this comment.
Good catch. This is fixed now.
| /// Messages rejected due to max_inflight limit (backpressure). | ||
| #[metric(unit = "{item}")] | ||
| pub rejected_max_inflight: Counter<u64>, |
There was a problem hiding this comment.
For operability, I think we should add few saturation gauge metrics:
in_flight: current tracked request count.max_inflight_config: configured max_inflight value, with 0 meaning unlimited.throttled: 1 when fanout is currently refusing new pdata viaaccept_pdata(), else 0.throttle_episodes(a counter): increments only on transition from not-throttled to throttled.
Those metrics could be updated after state mutation (not inside accept_pdata to avoid hot path impact).
There was a problem hiding this comment.
Done. The rejected_max_inflight counter has been replaced with the four metrics you suggested - All four are updated via update_saturation_metrics() which is called at the end of process() after any state mutation, not inside accept_pdata() itself to keep the hot path clean. The transition detection uses a was_throttled flag to ensure throttle_episodes only increments on the edge, not while staying saturated.
|
Just for future reviewers who might be confused by the PR description (as I was), I'd like to provide an alternative description of the problem:
So the issue is not that the engine may call In any case, the solution developed in this PR seems appropriate to me. |
|
Some thoughts on this related to batch processor - #2198 (comment) @cijothomas @lalitb @lquerel Any thoughts on merging the ack/nack based approach for now vs adding a ref to the actual pdata in accept_pdata? |
|
@JakeDern Just to make sure we're on the same page. What you're proposing is to merge the Nack approach for the batch processor and keep the |
Yes exactly! We're playing comment tag, I just replied on the other PR 😁 |
|
Thanks for the review. all the comments are resolved now., |
|
Not merging yet to give @jmacd opportunity to comment. |
jmacd
left a comment
There was a problem hiding this comment.
I just want to be sure I understand what "silent data loss" means here. I do not see returning a Nack as silent in the sense that your data is Nacked and if the receiver is waiting for responses, the Nack itself is loud enough. If the receiver is in a fire-and-forget mode, i.e. with no subscriber, then the data is lost when batch or fanout nacks. That's what you're fixing here, thanks!
ff7ab7b
Issue reported in PR: #2223
Problem
When fanout hits its
max_inflightlimit it nacks incoming messages back upstream instead of applying backpressure. This causes silent data loss.The correct behavior is for fanout to stop consuming from its input channel when
max_inflightis full. This causes the pdata channel to fill up, which blocks the upstreamsend_message().await, naturally slowing the receiver -- no data lost, no retry needed.Instead, fanout today does this:
Receivers are ingress nodes and are not expected to handle nacks from downstream processors. There is no retry contract in the engine. The nack carries the original pdata in
nack.refusedbut it goes out of scope and is dropped silently -- no log, no metric, no recovery.The workaround is
max_inflight: 0(unlimited inflight), which prevents nacking and lets channel capacity provide natural backpressure, but removes the memory bound on in-flight state.Background: Stateless vs. Stateful Processors
The engine's processor model was originally designed around stateless transforms: each call to
process()is independent, and the engine feeds messages as fast as the channel delivers them.Fanout and batch are stateful processors -- they maintain in-flight tracking across multiple
process()calls and must wait for acks before they can accept more work. This creates a throttling requirement that the engine's push model does not support:process()regardlessThe fix closes this gap by letting processors signal their readiness back to the engine via
accept_pdata().Batch Processor
Batch has similar stateful limits (
inbound_request_limit,outbound_request_limit) but does not have the same silent data loss problem in practice. When batch nacks due to capacity, it expects a retry processor to be present upstream to catch and re-deliver the nacked message. This is the standard recommended pipeline topology:receiver -> retry -> batch -> exporter
Without a retry processor upstream, batch nacks would also cause silent data loss -- the same root cause. The
accept_pdata()mechanism introduced here is the correct long-term fix for batch as well, but that is left as a follow-up since the immediate data loss issue only manifests in fanout (receivers do not retry; retry processors do).Fix
Introduces
accept_pdata()on theProcessortrait and wires it into the engine run loop.Processortrait (localandshared) -- new default method:All existing stateless processors inherit true with no change. Only stateful processors that need throttling override it.
Engine run loop -- changed from unconditional recv() to:
recv_when(false)only reads from the control channel (acks/nacks), leavingpdatauntouched in its channel. This lets acks drain the inflight map without deadlocking, and causes natural backpressure upstream once the pdata channel fills.FanoutProcessor-- implements accept_pdata():The nack-on-full blocks are removed from
process_slim_primary()and the full path. Nopdatais ever dropped due tomax_inflightpressure.Coverage across all fanout modes
accept_pdata()handles all three paths correctly:accept_pdata()behaviortrue-- no inflight tracking, no throttling neededslim_inflight.len() < max_inflightinflight.len() < max_inflightFire-and-forget is unaffected by design: it sends and acks upstream immediately, so inflight never accumulates.
Performance
Hot path (
accept_pdata()=true, the common case):accept_pdata()is a handful of integer comparisons -- effectively freerecv_when(true)is identical torecv()-- polls both channels, no added overheadWhen throttling kicks in (
accept_pdata()=false):control_rx-- proper async await, no busy loop, no CPU spininflight, thenaccept_pdata()returnstrueagain on the next iterationHeap:
inflightandslim_inflightare still bounded bymax_inflightexactly as beforeOne behavioral change worth noting:
Previously when
max_inflightwas hit, fanout nacked immediately and continued processing new messages (at the cost of data loss). Now, if the downstream exporter is permanently stuck and never acks, the pipeline stalls end-to-end rather than silently dropping. This is the correct behavior -- a stuck exporter should surface as backpressure, not silent loss -- but it is a visible behavioral change ifmax_inflightis configured and the downstream is unhealthy.