Defer MetricKey construction to the aggregator thread#11381
Conversation
Replace the producer-side conflation pipeline with a thin per-span SpanSnapshot
posted to the existing aggregator thread. The aggregator now builds the
MetricKey, does the SERVICE_NAMES / SPAN_KINDS / PEER_TAGS_CACHE lookups, and
updates the AggregateMetric directly -- all off the producer's hot path.
What the producer does now, per span:
- filter (shouldComputeMetric, resource-ignored, longRunning)
- collect tag values into a SpanSnapshot (1 allocation per span)
- inbox.offer(snapshot) + return error flag for forceKeep
What moved off the producer:
- MetricKey construction and its hash computation
- SERVICE_NAMES.computeIfAbsent (UTF8 encoding of service name)
- SPAN_KINDS.computeIfAbsent (UTF8 encoding of span.kind)
- PEER_TAGS_CACHE lookups (peer-tag name+value UTF8 encoding)
- pending/keys ConcurrentHashMap operations
- Batch pooling, batch atomic ops, batch contributeTo
Removed entirely:
- Batch.java -- the conflation primitive is no longer needed; the
aggregator's existing LRUCache<MetricKey, AggregateMetric> IS the
conflation point now.
- pending ConcurrentHashMap<MetricKey, Batch>
- keys ConcurrentHashMap<MetricKey, MetricKey> (canonical dedup)
- batchPool MessagePassingQueue<Batch>
- The CommonKeyCleaner role of tracking keys.keySet() on LRU eviction --
AggregateExpiry now just reports drops to healthMetrics.
Added:
- SpanSnapshot: immutable value carrying the raw MetricKey inputs + a
tagAndDuration long (duration | ERROR_TAG | TOP_LEVEL_TAG).
- AggregateMetric.recordOneDuration(long tagAndDuration) -- the single-hit
equivalent of the existing recordDurations(int, AtomicLongArray).
- Peer-tag values flow through the snapshot as a flattened String[] of
[name0, value0, name1, value1, ...]; the aggregator encodes them through
PEER_TAGS_CACHE on its own thread.
Benchmark results (2 forks x 5 iter x 15s):
ConflatingMetricsAggregatorDDSpanBenchmark
prior commit 6.343 +- 0.115 us/op
this commit 2.506 +- 0.044 us/op (~60% faster)
ConflatingMetricsAggregatorBenchmark (SimpleSpan)
prior commit 6.585 +- 0.049 us/op
this commit 3.116 +- 0.032 us/op (~53% faster)
Caveat on the benchmark: without conflation, the producer pushes 1 inbox
item per span instead of ~1 per 64. At the benchmark's synthetic rate the
consumer can't keep up and inbox.offer silently drops. The numbers measure
producer publish() latency only; consumer throughput at realistic span rates
is a follow-up to validate. Tuning maxPending matters more in this design.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
With the per-span SpanSnapshot inbox path, the producer can lose snapshots when the bounded MPSC queue is full -- silently, since inbox.offer() returns a boolean we previously ignored. The conflating-Batch design used to absorb ~64x more producer pressure per inbox slot, so this is a new failure mode worth surfacing. Wire it through the existing HealthMetrics path: - HealthMetrics.onStatsInboxFull() (no-op default). - TracerHealthMetrics gets a statsInboxFull LongAdder and a new reason tag reason:inbox_full reported under the same stats.dropped_aggregates metric used for LRU evictions. Two LongAdders, two tagged time series. - ConflatingMetricsAggregator.publish increments the counter when inbox.offer(snapshot) returns false. This doesn't fix the drop -- tuning maxPending and/or building producer-side batching are the actual fixes. But it makes the failure visible in the same place ops already watches. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nflating-metrics-background-work
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 950499c767
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| reportIfChanged( | ||
| target.statsd, | ||
| "stats.dropped_aggregates", | ||
| target.statsInboxFull, | ||
| REASON_INBOX_FULL_TAG); |
There was a problem hiding this comment.
Resize health metric history for inbox-full counter
When statsInboxFull is nonzero this added 52nd reportIfChanged call indexes previousCounts[++countIndex], but previousCounts is still sized for the previous 51 counters. As a result the new reason:inbox_full metric is never emitted and every flush that reaches this call logs the resize warning instead; increase the array size alongside the new counter.
Useful? React with 👍 / 👎.
What Does This Do
Moves the per-span MetricKey construction, cache lookups, and aggregation off the producer thread into the existing aggregator thread, replacing the Batch-based conflation pipeline with a thin per-span
SpanSnapshotposted to the inbox.Motivation
Incremental step towards using a lighter weight structure for metrics.
In the subsequent PR, I intend to switch to a simplified hash table that isn't thread-safe.
The simplified hashtable uses custom entries that that will allow us to avoid the MetricKey construction on look-up,
but given that the simple hashtable isn't thread-safe we need to move the work to the consumer thread first.
Additional Notes
Stacked on top of #11380 -- review that first; the merge base of this PR is
dougqh/conflating-metrics-producer-wins, notmaster. The diff shown here is only the work that's new beyond that PR.What the producer does now (per span)
shouldComputeMetric, resource-ignored, longRunning)SpanSnapshot(one allocation per span)inbox.offer(snapshot)+ return error flag forforceKeepWhat moved off the producer
MetricKeyconstruction and its hash computationSERVICE_NAMES.computeIfAbsent(UTF8 encoding of service name)SPAN_KINDS.computeIfAbsent(UTF8 encoding ofspan.kind)PEER_TAGS_CACHElookups (peer-tag name+value UTF8 encoding)pending/keysConcurrentHashMap operationscontributeToRemoved entirely
Batch.java-- the aggregator's existingLRUCache<MetricKey, AggregateMetric>IS the conflation point nowpendingConcurrentHashMap<MetricKey, Batch>keysConcurrentHashMap<MetricKey, MetricKey>(canonical dedup)batchPoolMessagePassingQueue<Batch>CommonKeyCleaner'skeys.keySet()tracking;AggregateExpirynow just reports LRU drops to health metricsAdded
SpanSnapshot: immutable value carrying the rawMetricKeyinputs + atagAndDurationlong (duration OR-ed withERROR_TAG/TOP_LEVEL_TAG).AggregateMetric.recordOneDuration(long)-- single-hit equivalent of the existingrecordDurations(int, AtomicLongArray).String[]of[name0, value0, name1, value1, ...]; the aggregator encodes them throughPEER_TAGS_CACHEon its own thread.HealthMetrics.onStatsInboxFull()+ aTracerHealthMetricscounter reported asstats.dropped_aggregates{reason:inbox_full}-- parallel to the existingreason:lru_eviction. Without conflation the producer can lose snapshots when the bounded MPSC queue is full; this makes that visible without silencing it.Benchmark results (2 forks × 5 iter × 15s)
ConflatingMetricsAggregatorDDSpanBenchmark:~60% faster on the production DDSpan path. The SimpleSpan bench shows ~53% faster as well.
Caveat on the bench numbers
Without conflation, the producer pushes 1 inbox item per span instead of ~1 per 64. At the JMH bench's synthetic rate (effectively ~20M snapshots/sec from the producer) the consumer can't keep up and
inbox.offersilently drops -- the newonStatsInboxFullcounter would fire constantly. The headline numbers measure producerpublish()latency only; consumer throughput at realistic span rates is a follow-up to validate. TuningmaxPendingmatters more in this design.Real fixes for capacity (out of scope for this PR):
maxPendingdefault; the conflating design used 2048 slots × ~64 conflation = ~131K effective capacity, the new design has 2048 slots flat.Test plan
./gradlew :dd-trace-core:test --tests 'datadog.trace.common.metrics.*'passes./gradlew :dd-trace-core:test --tests 'datadog.trace.core.monitor.*'passes./gradlew :dd-trace-core:compileJava :dd-trace-core:compileTestGroovy :dd-trace-core:compileJmhJava :dd-trace-core:compileTraceAgentTestGroovyall green./gradlew spotlessCheckcleanstats.dropped_aggregates{reason:inbox_full}reports as expected under a synthetic high-load run (not in the JMH bench)🤖 Generated with Claude Code