Skip to content

fix(fanout): replace nack-on-full with accept_pdata() backpressure to prevent silent data loss#2225

Merged
jmacd merged 16 commits intoopen-telemetry:mainfrom
lalitb:fanout-processor-maxflight
Mar 12, 2026
Merged

fix(fanout): replace nack-on-full with accept_pdata() backpressure to prevent silent data loss#2225
jmacd merged 16 commits intoopen-telemetry:mainfrom
lalitb:fanout-processor-maxflight

Conversation

@lalitb
Copy link
Copy Markdown
Member

@lalitb lalitb commented Mar 6, 2026

Issue reported in PR: #2223

Problem

When fanout hits its max_inflight limit it nacks incoming messages back upstream instead of applying backpressure. This causes silent data loss.

The correct behavior is for fanout to stop consuming from its input channel when max_inflight is full. This causes the pdata channel to fill up, which blocks the upstream send_message().await, naturally slowing the receiver -- no data lost, no retry needed.

Instead, fanout today does this:

if self.slim_inflight.len() >= self.config.max_inflight {
    // nacks the message -- data is lost as receive is not supposed to retry.
    effect_handler.notify_nack(...).await?;
}

Receivers are ingress nodes and are not expected to handle nacks from downstream processors. There is no retry contract in the engine. The nack carries the original pdata in nack.refused but it goes out of scope and is dropped silently -- no log, no metric, no recovery.

The workaround is max_inflight: 0 (unlimited inflight), which prevents nacking and lets channel capacity provide natural backpressure, but removes the memory bound on in-flight state.

Background: Stateless vs. Stateful Processors

The engine's processor model was originally designed around stateless transforms: each call to process() is independent, and the engine feeds messages as fast as the channel delivers them.

Fanout and batch are stateful processors -- they maintain in-flight tracking across multiple process() calls and must wait for acks before they can accept more work. This creates a throttling requirement that the engine's push model does not support:

  • The processor knows when it is at capacity (inflight map full)
  • But it cannot signal the engine to pause -- the engine keeps calling process() regardless
  • The only escape valve was to nack the message, which causes data loss when receivers do not retry

The fix closes this gap by letting processors signal their readiness back to the engine via accept_pdata().

Batch Processor

Batch has similar stateful limits (inbound_request_limit, outbound_request_limit) but does not have the same silent data loss problem in practice. When batch nacks due to capacity, it expects a retry processor to be present upstream to catch and re-deliver the nacked message. This is the standard recommended pipeline topology:

receiver -> retry -> batch -> exporter

Without a retry processor upstream, batch nacks would also cause silent data loss -- the same root cause. The accept_pdata() mechanism introduced here is the correct long-term fix for batch as well, but that is left as a follow-up since the immediate data loss issue only manifests in fanout (receivers do not retry; retry processors do).

Fix

Introduces accept_pdata() on the Processor trait and wires it into the engine run loop.

Processor trait (local and shared) -- new default method:

 fn accept_pdata(&self) -> bool { true }

All existing stateless processors inherit true with no change. Only stateful processors that need throttling override it.

Engine run loop -- changed from unconditional recv() to:

  while let Ok(msg) = message_channel.recv_when(processor.accept_pdata()).await {
      processor.process(msg, &mut effect_handler).await?;
  }

recv_when(false) only reads from the control channel (acks/nacks), leaving pdata untouched in its channel. This lets acks drain the inflight map without deadlocking, and causes natural backpressure upstream once the pdata channel fills.

FanoutProcessor -- implements accept_pdata():

  fn accept_pdata(&self) -> bool {
      if self.config.max_inflight == 0 { return true; }
      if self.config.use_fire_and_forget { return true; }
      if self.config.use_slim_primary {
          self.slim_inflight.len() < self.config.max_inflight
      } else {
          self.inflight.len() < self.config.max_inflight
      }
  }

The nack-on-full blocks are removed from process_slim_primary() and the full path. No pdata is ever dropped due to max_inflight pressure.

Coverage across all fanout modes

accept_pdata() handles all three paths correctly:

Mode accept_pdata() behavior Was nacking before?
Fire-and-forget Always true -- no inflight tracking, no throttling needed No
Slim primary slim_inflight.len() < max_inflight Yes -- fixed
Full path (sequential / await_all / fallback / timeout) inflight.len() < max_inflight Yes -- fixed

Fire-and-forget is unaffected by design: it sends and acks upstream immediately, so inflight never accumulates.

Performance

Hot path (accept_pdata() = true, the common case):

  • accept_pdata() is a handful of integer comparisons -- effectively free
  • recv_when(true) is identical to recv() -- polls both channels, no added overhead
  • Zero latency impact on the steady-state hot path

When throttling kicks in (accept_pdata() = false):

  • Engine loop only reads from control_rx -- proper async await, no busy loop, no CPU spin
  • Acks drain inflight, then accept_pdata() returns true again on the next iteration
  • Control messages (acks, nacks, shutdown, timeouts) are still delivered even while pdata is paused -- shutdown and timeout handling are unaffected

Heap:

  • No new allocations. inflight and slim_inflight are still bounded by max_inflight exactly as before
  • The nack path removed actually slightly reduces code path length

One behavioral change worth noting:
Previously when max_inflight was hit, fanout nacked immediately and continued processing new messages (at the cost of data loss). Now, if the downstream exporter is permanently stuck and never acks, the pipeline stalls end-to-end rather than silently dropping. This is the correct behavior -- a stuck exporter should surface as backpressure, not silent loss -- but it is a visible behavioral change if max_inflight is configured and the downstream is unhealthy.

@lalitb lalitb requested a review from a team as a code owner March 6, 2026 19:43
@github-actions github-actions bot added the rust Pull requests that update Rust code label Mar 6, 2026
@lalitb lalitb changed the title test: add failing test to reproduce silent data loss in fanout when max_inflight exceeded [Not for Merge] test: add failing test to reproduce silent data loss in fanout when max_inflight exceeded Mar 6, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 6, 2026

Codecov Report

❌ Patch coverage is 94.06780% with 7 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.39%. Comparing base (7ec0f53) to head (74bff98).
⚠️ Report is 11 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2225      +/-   ##
==========================================
- Coverage   87.39%   87.39%   -0.01%     
==========================================
  Files         568      568              
  Lines      190730   190784      +54     
==========================================
+ Hits       166695   166741      +46     
- Misses      23509    23517       +8     
  Partials      526      526              
Components Coverage Δ
otap-dataflow 89.50% <94.06%> (-0.01%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 90.30% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 52.44% <ø> (ø)
quiver 91.91% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lalitb
Copy link
Copy Markdown
Member Author

lalitb commented Mar 6, 2026

This is ready for review now.

@lalitb lalitb changed the title [Not for Merge] test: add failing test to reproduce silent data loss in fanout when max_inflight exceeded fix(fanout): replace nack-on-full with accept_pdata() backpressure to prevent silent data loss Mar 6, 2026
Copy link
Copy Markdown
Contributor

@drewrelmas drewrelmas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation @lalitb - the suggested change makes sense to me, but I think we should still get larger consensus before merging since this is core processor/enginecode.

You mention that batch processor isn't affected in same way because there is an expectation for a retry processor ahead of it.

Could the same pattern not be followed between a receiver and fanout processor rather than changing the core engine loop?

receiver -> retry -> fanout

Comment thread rust/otap-dataflow/crates/otap/src/fanout_processor.rs
Comment thread rust/otap-dataflow/crates/otap/src/fanout_processor.rs
Comment thread rust/otap-dataflow/crates/otap/src/fanout_processor.rs Outdated
@utpilla
Copy link
Copy Markdown
Contributor

utpilla commented Mar 9, 2026

You mention that batch processor isn't affected in same way because there is an expectation for a retry processor ahead of it.

Could the same pattern not be followed between a receiver and fanout processor rather than changing the core engine loop?

receiver -> retry -> fanout

A retry processor upstream doesn't fix this. It masks it differently:

  • Without retry: NACK → receiver ignores → data lost.
  • With retry: NACK → retry catches and re-queues, but fanout keeps accepting and rejecting at full speed, so retry accumulates unbounded delayed data → OOM under sustained pressure.

Both fail because NACKing doesn't create backpressure, process() returns immediately and the engine delivers the next message right away. accept_pdata(false) stops delivery at the source.

@lalitb
Copy link
Copy Markdown
Member Author

lalitb commented Mar 9, 2026

You mention that batch processor isn't affected in same way because there is an expectation for a retry processor ahead of it.
Could the same pattern not be followed between a receiver and fanout processor rather than changing the core engine loop?

receiver -> retry -> fanout

A retry processor upstream doesn't fix this. It masks it differently:

  • Without retry: NACK → receiver ignores → data lost.
  • With retry: NACK → retry catches and re-queues, but fanout keeps accepting and rejecting at full speed, so retry accumulates unbounded delayed data → OOM under sustained pressure.

Both fail because NACKing doesn't create backpressure, process() returns immediately and the engine delivers the next message right away. accept_pdata(false) stops delivery at the source.

Agreed with the above. Just want to add a couple more points:

Even if retry doesn't OOM, it eventually gives up - once max_elapsed_time (5 min default) is reached, it nacks upstream and the receiver drops it. So retry just delays the loss under sustained pressure rather than preventing it.

The other motivation for fixing this at the engine level: batch has the same class of problem. Today it crashes with ProcessorError when slots run out, and there's already a PR to replace that with a nack. If we go the retry-in-front route, we'd need retry before every stateful processor. accept_pdata() handles it once - and the recv_when guard already exists in the engine, we're just exposing it to processors via a trait method with a safe default.

Comment thread rust/otap-dataflow/crates/otap/src/fanout_processor.rs Outdated
Comment thread rust/otap-dataflow/crates/otap/src/fanout_processor.rs Outdated
Comment thread rust/otap-dataflow/crates/otap/src/fanout_processor.rs
Copy link
Copy Markdown
Contributor

@utpilla utpilla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix looks good. Left some suggestions.

Comment thread rust/otap-dataflow/crates/engine/src/processor.rs
@lquerel
Copy link
Copy Markdown
Contributor

lquerel commented Mar 10, 2026

@lalitb @jmacd @albertlockett @drewrelmas @utpilla

Please do not merge this PR before I have had a chance to review it. Sorry for the delay, I have too many things going on in parallel right now.

Copy link
Copy Markdown
Contributor

@lquerel lquerel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great fix!

I made a few suggestions that should be integrated before merging.

I think there are still some comments and documentation mentioning that overflow messages are nacked.

Comment on lines +1078 to +1083
debug_assert!(
self.accept_pdata(),
"process() called with PData while at max_inflight ({}) — \
engine must check accept_pdata() before delivering",
self.config.max_inflight
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment for the recv_when method says:

    /// During shutdown draining the guard is ignored — pdata is
    /// always drained until the deadline, regardless of
    /// `accept_pdata`.

So in debug mode, this debug_assert! can panic during shutdown, especially when fanout is already in a saturated state.

I’m wondering why, once shutdown draining starts, we couldn’t continue honoring accept_pdata for pdata, while still delivering cleanup control messages (Ack/Nack) so that stateful processors can reduce their in-flight state and reopen capacity.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This is fixed now.

Comment on lines -379 to -381
/// Messages rejected due to max_inflight limit (backpressure).
#[metric(unit = "{item}")]
pub rejected_max_inflight: Counter<u64>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For operability, I think we should add few saturation gauge metrics:

  • in_flight: current tracked request count.
  • max_inflight_config: configured max_inflight value, with 0 meaning unlimited.
  • throttled: 1 when fanout is currently refusing new pdata via accept_pdata(), else 0.
  • throttle_episodes (a counter): increments only on transition from not-throttled to throttled.

Those metrics could be updated after state mutation (not inside accept_pdata to avoid hot path impact).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The rejected_max_inflight counter has been replaced with the four metrics you suggested - All four are updated via update_saturation_metrics() which is called at the end of process() after any state mutation, not inside accept_pdata() itself to keep the hot path clean. The transition detection uses a was_throttled flag to ensure throttle_episodes only increments on the edge, not while staying saturated.

@lquerel
Copy link
Copy Markdown
Contributor

lquerel commented Mar 11, 2026

Just for future reviewers who might be confused by the PR description (as I was), I'd like to provide an alternative description of the problem:

  • by the time process() is called, the pdata item has already been dequeued from MessageChannel
  • acks/nacks are delivered later as control messages through the engine
  • if fanout tries to “wait inside process()” for an ack/nack that would free capacity, the engine stops polling the message channel while that call is blocked
  • that means the ack/nack it is waiting for cannot be delivered to fanout, because it remains queued on the control channel

So the issue is not that the engine may call process() concurrently. The issue is that the current engine API has no way to say “stop consuming pdata, but keep draining control messages” before the next pdata is removed from the channel.

In any case, the solution developed in this PR seems appropriate to me.

@JakeDern
Copy link
Copy Markdown
Contributor

Some thoughts on this related to batch processor - #2198 (comment)

@cijothomas @lalitb @lquerel Any thoughts on merging the ack/nack based approach for now vs adding a ref to the actual pdata in accept_pdata?

@lquerel
Copy link
Copy Markdown
Contributor

lquerel commented Mar 11, 2026

@JakeDern Just to make sure we're on the same page. What you're proposing is to merge the Nack approach for the batch processor and keep the accept_pdata approach for the current PR. If so, I agree.
In the medium term, however, I think we will need to revisit the batch processor on this aspect.

@JakeDern
Copy link
Copy Markdown
Contributor

@JakeDern Just to make sure we're on the same page. What you're proposing is to merge the Nack approach for the batch processor and keep the accept_pdata approach for the current PR. If so, I agree. In the medium term, however, I think we will need to revisit the batch processor on this aspect.

Yes exactly! We're playing comment tag, I just replied on the other PR 😁

@lalitb
Copy link
Copy Markdown
Member Author

lalitb commented Mar 11, 2026

Thanks for the review. all the comments are resolved now.,

@drewrelmas drewrelmas added this pull request to the merge queue Mar 12, 2026
@drewrelmas drewrelmas removed this pull request from the merge queue due to a manual request Mar 12, 2026
@drewrelmas
Copy link
Copy Markdown
Contributor

Not merging yet to give @jmacd opportunity to comment.

Copy link
Copy Markdown
Contributor

@jmacd jmacd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to be sure I understand what "silent data loss" means here. I do not see returning a Nack as silent in the sense that your data is Nacked and if the receiver is waiting for responses, the Nack itself is loud enough. If the receiver is in a fire-and-forget mode, i.e. with no subscriber, then the data is lost when batch or fanout nacks. That's what you're fixing here, thanks!

@jmacd jmacd added this pull request to the merge queue Mar 12, 2026
Merged via the queue into open-telemetry:main with commit ff7ab7b Mar 12, 2026
114 of 116 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

7 participants