diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java
index 67d279f51b9..514ab59ec3a 100644
--- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java
+++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java
@@ -138,7 +138,8 @@ protected long getFeaturesDiscoveryMinDelayMillis() {
private synchronized void discoverIfOutdated(final long maxElapsedMs) {
final long now = System.currentTimeMillis();
- final long elapsed = now - discoveryState.lastTimeDiscovered;
+ final State previous = discoveryState;
+ final long elapsed = now - previous.lastTimeDiscovered;
if (elapsed > maxElapsedMs) {
final State newState = new State();
doDiscovery(newState);
diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java
similarity index 95%
rename from dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java
rename to dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java
index b9a2f7f8c54..b9d72eaf3ab 100644
--- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java
+++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java
@@ -34,12 +34,12 @@
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
-public class ConflatingMetricsAggregatorBenchmark {
+public class ClientStatsAggregatorBenchmark {
private final DDAgentFeaturesDiscovery featuresDiscovery =
new FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
- private final ConflatingMetricsAggregator aggregator =
- new ConflatingMetricsAggregator(
+ private final ClientStatsAggregator aggregator =
+ new ClientStatsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java
similarity index 87%
rename from dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java
rename to dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java
index 89059857d9c..0453b8888db 100644
--- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java
+++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java
@@ -28,8 +28,8 @@
import org.openjdk.jmh.infra.Blackhole;
/**
- * Parallels {@link ConflatingMetricsAggregatorBenchmark} but uses real {@link DDSpan} instances
- * instead of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link
+ * Parallels {@link ClientStatsAggregatorBenchmark} but uses real {@link DDSpan} instances instead
+ * of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link
* CoreSpan#isKind} path (cached span.kind ordinal + bit-test) rather than the groovy mock's
* dispatch.
*
@@ -50,21 +50,21 @@
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
-public class ConflatingMetricsAggregatorDDSpanBenchmark {
+public class ClientStatsAggregatorDDSpanBenchmark {
private static final CoreTracer TRACER =
CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build();
private final DDAgentFeaturesDiscovery featuresDiscovery =
- new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
+ new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
- private final ConflatingMetricsAggregator aggregator =
- new ConflatingMetricsAggregator(
+ private final ClientStatsAggregator aggregator =
+ new ClientStatsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
HealthMetrics.NO_OP,
- new ConflatingMetricsAggregatorBenchmark.NullSink(),
+ new ClientStatsAggregatorBenchmark.NullSink(),
2048,
2048,
false);
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
index 90d41ff7bdc..8f2ae1cc6b3 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
@@ -1,40 +1,40 @@
package datadog.trace.common.metrics;
-import static datadog.trace.api.Functions.UTF8_ENCODE;
-import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY;
-
import datadog.metrics.api.Histogram;
-import datadog.trace.api.Pair;
-import datadog.trace.api.cache.DDCache;
-import datadog.trace.api.cache.DDCaches;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.util.Hashtable;
import datadog.trace.util.LongHashingUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.function.Function;
-import javax.annotation.Nullable;
/**
- * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields that {@link
- * SerializingMetricWriter} writes to the wire plus the mutable counter/histogram state for the key.
+ * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields (the data
+ * {@link SerializingMetricWriter} writes to the wire) plus the mutable counter / histogram state
+ * for the key.
+ *
+ *
UTF8 canonicalization runs through per-field {@link PropertyCardinalityHandler}s (and {@link
+ * TagCardinalityHandler}s for peer tags), so cardinality is capped per reporting interval. The
+ * critical property: hashing and matching happen after canonicalization, so when a field's
+ * cardinality budget is exhausted and overflow values collapse to a {@code blocked_by_tracer}
+ * sentinel, those values land in the same bucket and merge into a single entry rather than
+ * fragmenting.
*
- *
{@link #matches(SpanSnapshot)} compares the entry's stored UTF8 forms against the snapshot's
- * raw {@code CharSequence}/{@code String}/{@code String[]} fields via content-equality, so {@code
- * String} vs {@code UTF8BytesString} mixing on the same logical key collapses into one entry
- * instead of splitting.
+ *
The aggregator thread is the sole writer. {@link AggregateTable} holds a reusable {@link
+ * Canonical} scratch buffer so the canonicalization itself doesn't allocate per lookup; on a miss
+ * the buffer's references are copied into a fresh entry. On a hit nothing is allocated.
*
- *
The static UTF8 caches that used to live on {@code MetricKey} and {@code
- * ConflatingMetricsAggregator} are consolidated here.
+ *
The handlers are reset on the aggregator thread every reporting cycle via {@link
+ * #resetCardinalityHandlers()}.
*
- *
Not thread-safe. Counter and histogram updates are performed by the single aggregator
- * thread; producer threads tag durations via {@link #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits and
- * hand them off through the snapshot inbox.
+ *
Thread-safety: not thread-safe. Counter and histogram updates, cardinality-handler
+ * registration, and {@link Canonical} use all run on the aggregator thread. Producer threads tag
+ * durations via {@link #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits and hand them off through the
+ * snapshot inbox. Test code uses {@link #of} which constructs entries without touching the
+ * cardinality handlers.
*/
@SuppressFBWarnings(
value = {"AT_NONATOMIC_OPERATIONS_ON_SHARED_VARIABLE", "AT_STALE_THREAD_WRITE_OF_PRIMITIVE"},
@@ -44,63 +44,45 @@ final class AggregateEntry extends Hashtable.Entry {
public static final long ERROR_TAG = 0x8000000000000000L;
public static final long TOP_LEVEL_TAG = 0x4000000000000000L;
- // UTF8 caches consolidated from the previous MetricKey + ConflatingMetricsAggregator split.
- private static final DDCache RESOURCE_CACHE =
- DDCaches.newFixedSizeCache(32);
- private static final DDCache SERVICE_CACHE =
- DDCaches.newFixedSizeCache(32);
- private static final DDCache OPERATION_CACHE =
- DDCaches.newFixedSizeCache(64);
- private static final DDCache SERVICE_SOURCE_CACHE =
- DDCaches.newFixedSizeCache(16);
- private static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8);
- private static final DDCache SPAN_KIND_CACHE =
- DDCaches.newFixedSizeCache(16);
- private static final DDCache HTTP_METHOD_CACHE =
- DDCaches.newFixedSizeCache(8);
- private static final DDCache HTTP_ENDPOINT_CACHE =
- DDCaches.newFixedSizeCache(32);
- private static final DDCache GRPC_STATUS_CODE_CACHE =
- DDCaches.newFixedSizeCache(32);
-
- /**
- * Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner
- * cache produces the "name:value" encoded form the serializer writes.
- */
- private static final DDCache<
- String, Pair, Function>>
- PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64);
-
- private static final Function<
- String, Pair, Function>>
- PEER_TAGS_CACHE_ADDER =
- key ->
- Pair.of(
- DDCaches.newFixedSizeCache(512),
- value -> UTF8BytesString.create(key + ":" + value));
-
- private final UTF8BytesString resource;
- private final UTF8BytesString service;
- private final UTF8BytesString operationName;
- @Nullable private final UTF8BytesString serviceSource;
- private final UTF8BytesString type;
- private final UTF8BytesString spanKind;
- @Nullable private final UTF8BytesString httpMethod;
- @Nullable private final UTF8BytesString httpEndpoint;
- @Nullable private final UTF8BytesString grpcStatusCode;
- private final short httpStatusCode;
- private final boolean synthetic;
- private final boolean traceRoot;
-
- // Peer tags carried in two forms: parallel String[] arrays mirroring the snapshot's (schema +
- // values) shape for matches(), and pre-encoded List ("name:value") for the
- // serializer. peerTagNames is the schema's names array (shared by-reference when the schema
- // hasn't been replaced); peerTagValues is the per-span String[] parallel to it.
- @Nullable private final String[] peerTagNames;
- @Nullable private final String[] peerTagValues;
- private final List peerTags;
-
- // Mutable aggregate state -- single-thread (consumer/aggregator) writer.
+ // Per-field cardinality handlers. Limits live on MetricCardinalityLimits -- see that class for
+ // per-field rationale.
+ static final PropertyCardinalityHandler RESOURCE_HANDLER =
+ new PropertyCardinalityHandler(MetricCardinalityLimits.RESOURCE);
+ static final PropertyCardinalityHandler SERVICE_HANDLER =
+ new PropertyCardinalityHandler(MetricCardinalityLimits.SERVICE);
+ static final PropertyCardinalityHandler OPERATION_HANDLER =
+ new PropertyCardinalityHandler(MetricCardinalityLimits.OPERATION);
+ static final PropertyCardinalityHandler SERVICE_SOURCE_HANDLER =
+ new PropertyCardinalityHandler(MetricCardinalityLimits.SERVICE_SOURCE);
+ static final PropertyCardinalityHandler TYPE_HANDLER =
+ new PropertyCardinalityHandler(MetricCardinalityLimits.TYPE);
+ static final PropertyCardinalityHandler SPAN_KIND_HANDLER =
+ new PropertyCardinalityHandler(MetricCardinalityLimits.SPAN_KIND);
+ static final PropertyCardinalityHandler HTTP_METHOD_HANDLER =
+ new PropertyCardinalityHandler(MetricCardinalityLimits.HTTP_METHOD);
+ static final PropertyCardinalityHandler HTTP_ENDPOINT_HANDLER =
+ new PropertyCardinalityHandler(MetricCardinalityLimits.HTTP_ENDPOINT);
+ static final PropertyCardinalityHandler GRPC_STATUS_CODE_HANDLER =
+ new PropertyCardinalityHandler(MetricCardinalityLimits.GRPC_STATUS_CODE);
+
+ final UTF8BytesString resource;
+ final UTF8BytesString service;
+ final UTF8BytesString operationName;
+ // Optional fields use UTF8BytesString.EMPTY as the "absent" sentinel rather than null. The
+ // cardinality handlers map null inputs to EMPTY, and createUtf8 does the same for the of(...)
+ // factory, so callers don't need to special-case absence.
+ final UTF8BytesString serviceSource;
+ final UTF8BytesString type;
+ final UTF8BytesString spanKind;
+ final UTF8BytesString httpMethod;
+ final UTF8BytesString httpEndpoint;
+ final UTF8BytesString grpcStatusCode;
+ final short httpStatusCode;
+ final boolean synthetic;
+ final boolean traceRoot;
+ final List peerTags;
+
+ // Mutable aggregate state -- single-thread (aggregator) writer.
private final Histogram okLatencies = Histogram.newHistogram();
private final Histogram errorLatencies = Histogram.newHistogram();
private int errorCount;
@@ -108,95 +90,36 @@ final class AggregateEntry extends Hashtable.Entry {
private int topLevelCount;
private long duration;
- /** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */
- private AggregateEntry(SpanSnapshot s, long keyHash) {
- super(keyHash);
- this.resource = canonicalize(RESOURCE_CACHE, s.resourceName);
- this.service = SERVICE_CACHE.computeIfAbsent(s.serviceName, UTF8_ENCODE);
- this.operationName = canonicalize(OPERATION_CACHE, s.operationName);
- this.serviceSource =
- s.serviceNameSource == null
- ? null
- : canonicalize(SERVICE_SOURCE_CACHE, s.serviceNameSource);
- this.type = canonicalize(TYPE_CACHE, s.spanType);
- this.spanKind = SPAN_KIND_CACHE.computeIfAbsent(s.spanKind, UTF8BytesString::create);
- this.httpMethod =
- s.httpMethod == null
- ? null
- : HTTP_METHOD_CACHE.computeIfAbsent(s.httpMethod, UTF8BytesString::create);
- this.httpEndpoint =
- s.httpEndpoint == null
- ? null
- : HTTP_ENDPOINT_CACHE.computeIfAbsent(s.httpEndpoint, UTF8BytesString::create);
- this.grpcStatusCode =
- s.grpcStatusCode == null
- ? null
- : GRPC_STATUS_CODE_CACHE.computeIfAbsent(s.grpcStatusCode, UTF8BytesString::create);
- this.httpStatusCode = s.httpStatusCode;
- this.synthetic = s.synthetic;
- this.traceRoot = s.traceRoot;
- this.peerTagNames = s.peerTagSchema == null ? null : s.peerTagSchema.names;
- this.peerTagValues = s.peerTagValues;
- this.peerTags = materializePeerTags(this.peerTagNames, this.peerTagValues);
- }
-
- /**
- * Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. Accepts a
- * pre-encoded {@code List} of {@code "name:value"} peer tags and recovers the
- * parallel-array {@code (names, values)} form by splitting on the {@code ':'} delimiter.
- */
- static AggregateEntry of(
- CharSequence resource,
- CharSequence service,
- CharSequence operationName,
- @Nullable CharSequence serviceSource,
- CharSequence type,
- int httpStatusCode,
+ /** Field-bearing constructor used by both the hot path and the test factory. */
+ private AggregateEntry(
+ long keyHash,
+ UTF8BytesString resource,
+ UTF8BytesString service,
+ UTF8BytesString operationName,
+ UTF8BytesString serviceSource,
+ UTF8BytesString type,
+ UTF8BytesString spanKind,
+ UTF8BytesString httpMethod,
+ UTF8BytesString httpEndpoint,
+ UTF8BytesString grpcStatusCode,
+ short httpStatusCode,
boolean synthetic,
boolean traceRoot,
- CharSequence spanKind,
- @Nullable List peerTags,
- @Nullable CharSequence httpMethod,
- @Nullable CharSequence httpEndpoint,
- @Nullable CharSequence grpcStatusCode) {
- PeerTagSchema schema = null;
- String[] values = null;
- if (peerTags != null && !peerTags.isEmpty()) {
- String[] names = new String[peerTags.size()];
- values = new String[peerTags.size()];
- int i = 0;
- for (UTF8BytesString t : peerTags) {
- String s = t.toString();
- int colon = s.indexOf(':');
- names[i] = colon < 0 ? s : s.substring(0, colon);
- values[i] = colon < 0 ? "" : s.substring(colon + 1);
- i++;
- }
- schema = PeerTagSchema.testSchema(names);
- }
- SpanSnapshot synthetic_snapshot =
- new SpanSnapshot(
- resource,
- service == null ? null : service.toString(),
- operationName,
- serviceSource,
- type,
- (short) httpStatusCode,
- synthetic,
- traceRoot,
- spanKind == null ? null : spanKind.toString(),
- schema,
- values,
- httpMethod == null ? null : httpMethod.toString(),
- httpEndpoint == null ? null : httpEndpoint.toString(),
- grpcStatusCode == null ? null : grpcStatusCode.toString(),
- 0L);
- return new AggregateEntry(synthetic_snapshot, hashOf(synthetic_snapshot));
- }
-
- /** Construct from a snapshot at consumer-thread miss time. */
- static AggregateEntry forSnapshot(SpanSnapshot s) {
- return new AggregateEntry(s, hashOf(s));
+ List peerTags) {
+ super(keyHash);
+ this.resource = resource;
+ this.service = service;
+ this.operationName = operationName;
+ this.serviceSource = serviceSource;
+ this.type = type;
+ this.spanKind = spanKind;
+ this.httpMethod = httpMethod;
+ this.httpEndpoint = httpEndpoint;
+ this.grpcStatusCode = grpcStatusCode;
+ this.httpStatusCode = httpStatusCode;
+ this.synthetic = synthetic;
+ this.traceRoot = traceRoot;
+ this.peerTags = peerTags;
}
AggregateEntry recordDurations(int count, AtomicLongArray durations) {
@@ -274,66 +197,125 @@ void clear() {
this.errorLatencies.clear();
}
- boolean matches(SpanSnapshot s) {
- String[] snapshotNames = s.peerTagSchema == null ? null : s.peerTagSchema.names;
- return httpStatusCode == s.httpStatusCode
- && synthetic == s.synthetic
- && traceRoot == s.traceRoot
- && contentEquals(resource, s.resourceName)
- && stringContentEquals(service, s.serviceName)
- && contentEquals(operationName, s.operationName)
- && contentEquals(serviceSource, s.serviceNameSource)
- && contentEquals(type, s.spanType)
- && stringContentEquals(spanKind, s.spanKind)
- && Arrays.equals(peerTagNames, snapshotNames)
- && Arrays.equals(peerTagValues, s.peerTagValues)
- && stringContentEquals(httpMethod, s.httpMethod)
- && stringContentEquals(httpEndpoint, s.httpEndpoint)
- && stringContentEquals(grpcStatusCode, s.grpcStatusCode);
+ /**
+ * Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. Bypasses
+ * the cardinality handlers so tests don't pollute their state -- {@link UTF8BytesString}s are
+ * created directly. Content-equal entries from {@link Canonical#toEntry} still {@link #equals} an
+ * entry built via {@code of(...)}.
+ */
+ static AggregateEntry of(
+ CharSequence resource,
+ CharSequence service,
+ CharSequence operationName,
+ CharSequence serviceSource,
+ CharSequence type,
+ int httpStatusCode,
+ boolean synthetic,
+ boolean traceRoot,
+ CharSequence spanKind,
+ List peerTags,
+ CharSequence httpMethod,
+ CharSequence httpEndpoint,
+ CharSequence grpcStatusCode) {
+ UTF8BytesString resourceUtf = createUtf8(resource);
+ UTF8BytesString serviceUtf = createUtf8(service);
+ UTF8BytesString operationNameUtf = createUtf8(operationName);
+ UTF8BytesString serviceSourceUtf = createUtf8(serviceSource);
+ UTF8BytesString typeUtf = createUtf8(type);
+ UTF8BytesString spanKindUtf = createUtf8(spanKind);
+ UTF8BytesString httpMethodUtf = createUtf8(httpMethod);
+ UTF8BytesString httpEndpointUtf = createUtf8(httpEndpoint);
+ UTF8BytesString grpcUtf = createUtf8(grpcStatusCode);
+ List peerTagsList = peerTags == null ? Collections.emptyList() : peerTags;
+ long keyHash =
+ hashOf(
+ resourceUtf,
+ serviceUtf,
+ operationNameUtf,
+ serviceSourceUtf,
+ typeUtf,
+ spanKindUtf,
+ httpMethodUtf,
+ httpEndpointUtf,
+ grpcUtf,
+ (short) httpStatusCode,
+ synthetic,
+ traceRoot,
+ peerTagsList);
+ return new AggregateEntry(
+ keyHash,
+ resourceUtf,
+ serviceUtf,
+ operationNameUtf,
+ serviceSourceUtf,
+ typeUtf,
+ spanKindUtf,
+ httpMethodUtf,
+ httpEndpointUtf,
+ grpcUtf,
+ (short) httpStatusCode,
+ synthetic,
+ traceRoot,
+ peerTagsList);
}
/**
- * Pre-checks {@link #keyHash} against {@code keyHash} before delegating to {@link
- * #matches(SpanSnapshot)}. The hash check is cheap and rules out most mismatches without touching
- * the field-by-field comparison.
+ * Resets every cardinality handler's working set. Must be called on the aggregator thread.
+ * Existing entries continue to hold their previously-issued {@link UTF8BytesString} references;
+ * matches via content-equality so snapshots delivered after a reset still resolve to the existing
+ * entries.
*/
- boolean matches(long keyHash, SpanSnapshot s) {
- return this.keyHash == keyHash && matches(s);
+ static void resetCardinalityHandlers() {
+ RESOURCE_HANDLER.reset();
+ SERVICE_HANDLER.reset();
+ OPERATION_HANDLER.reset();
+ SERVICE_SOURCE_HANDLER.reset();
+ TYPE_HANDLER.reset();
+ SPAN_KIND_HANDLER.reset();
+ HTTP_METHOD_HANDLER.reset();
+ HTTP_ENDPOINT_HANDLER.reset();
+ GRPC_STATUS_CODE_HANDLER.reset();
+ PeerTagSchema.INTERNAL.resetCardinalityHandlers();
}
/**
- * Computes the 64-bit lookup hash for a {@link SpanSnapshot}. Chained per-field calls -- no
- * varargs / Object[] allocation, no autoboxing on primitive overloads. The constructor's
- * super({@code hashOf(s)}) call uses the same function so an entry built from a snapshot hashes
- * to the same bucket the snapshot itself looks up.
- *
- * Hashes are content-stable across {@code String} / {@code UTF8BytesString}: {@link
- * UTF8BytesString#hashCode()} returns the underlying {@code String}'s hash.
+ * 64-bit lookup hash, computed over UTF8-encoded fields so that cardinality-blocked values (which
+ * all canonicalize to the same sentinel {@link UTF8BytesString}) collide in the same bucket.
+ * {@link UTF8BytesString#hashCode()} returns the underlying String hash, so entries built via
+ * {@link #of} produce the same hash as entries built from a snapshot with matching content.
*/
- static long hashOf(SpanSnapshot s) {
+ static long hashOf(
+ UTF8BytesString resource,
+ UTF8BytesString service,
+ UTF8BytesString operationName,
+ UTF8BytesString serviceSource,
+ UTF8BytesString type,
+ UTF8BytesString spanKind,
+ UTF8BytesString httpMethod,
+ UTF8BytesString httpEndpoint,
+ UTF8BytesString grpcStatusCode,
+ short httpStatusCode,
+ boolean synthetic,
+ boolean traceRoot,
+ List peerTags) {
long h = 0;
- h = LongHashingUtils.addToHash(h, s.resourceName);
- h = LongHashingUtils.addToHash(h, s.serviceName);
- h = LongHashingUtils.addToHash(h, s.operationName);
- h = LongHashingUtils.addToHash(h, s.serviceNameSource);
- h = LongHashingUtils.addToHash(h, s.spanType);
- h = LongHashingUtils.addToHash(h, s.httpStatusCode);
- h = LongHashingUtils.addToHash(h, s.synthetic);
- h = LongHashingUtils.addToHash(h, s.traceRoot);
- h = LongHashingUtils.addToHash(h, s.spanKind);
- if (s.peerTagSchema != null && s.peerTagValues != null) {
- String[] names = s.peerTagSchema.names;
- String[] values = s.peerTagValues;
- for (int i = 0; i < names.length; i++) {
- if (values[i] != null) {
- h = LongHashingUtils.addToHash(h, names[i]);
- h = LongHashingUtils.addToHash(h, values[i]);
- }
- }
+ h = LongHashingUtils.addToHash(h, resource);
+ h = LongHashingUtils.addToHash(h, service);
+ h = LongHashingUtils.addToHash(h, operationName);
+ h = LongHashingUtils.addToHash(h, serviceSource);
+ h = LongHashingUtils.addToHash(h, type);
+ h = LongHashingUtils.addToHash(h, httpStatusCode);
+ h = LongHashingUtils.addToHash(h, synthetic);
+ h = LongHashingUtils.addToHash(h, traceRoot);
+ h = LongHashingUtils.addToHash(h, spanKind);
+ // indexed iteration -- avoids the iterator allocation a for-each over a List would do
+ int peerTagCount = peerTags.size();
+ for (int i = 0; i < peerTagCount; i++) {
+ h = LongHashingUtils.addToHash(h, peerTags.get(i));
}
- h = LongHashingUtils.addToHash(h, s.httpMethod);
- h = LongHashingUtils.addToHash(h, s.httpEndpoint);
- h = LongHashingUtils.addToHash(h, s.grpcStatusCode);
+ h = LongHashingUtils.addToHash(h, httpMethod);
+ h = LongHashingUtils.addToHash(h, httpEndpoint);
+ h = LongHashingUtils.addToHash(h, grpcStatusCode);
return h;
}
@@ -350,7 +332,6 @@ UTF8BytesString getOperationName() {
return operationName;
}
- @Nullable
UTF8BytesString getServiceSource() {
return serviceSource;
}
@@ -363,17 +344,14 @@ UTF8BytesString getSpanKind() {
return spanKind;
}
- @Nullable
UTF8BytesString getHttpMethod() {
return httpMethod;
}
- @Nullable
UTF8BytesString getHttpEndpoint() {
return httpEndpoint;
}
- @Nullable
UTF8BytesString getGrpcStatusCode() {
return grpcStatusCode;
}
@@ -396,8 +374,8 @@ List getPeerTags() {
/**
* Equality on the 13 label fields (not on the aggregate). Used only by test mock matchers; the
- * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link #matches(SpanSnapshot)}
- * and never calls {@code equals}.
+ * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link Canonical#matches} and
+ * never calls {@code equals}.
*/
@Override
public boolean equals(Object o) {
@@ -424,84 +402,168 @@ public int hashCode() {
return (int) keyHash;
}
- // ----- helpers -----
-
- private static UTF8BytesString canonicalize(
- DDCache cache, CharSequence charSeq) {
- if (charSeq == null) {
- return EMPTY;
- }
- if (charSeq instanceof UTF8BytesString) {
- return (UTF8BytesString) charSeq;
+ /**
+ * Reusable scratch buffer for canonicalizing a {@link SpanSnapshot} into UTF8 fields, computing
+ * its lookup hash, comparing against existing entries, and building a fresh entry on miss.
+ *
+ * One instance is held by an {@link AggregateTable} and reused on every {@code findOrInsert}
+ * call. Single-threaded use only. Fields are deliberately mutable -- this is a hot-path scratch
+ * area, not a value class.
+ */
+ static final class Canonical {
+ UTF8BytesString resource;
+ UTF8BytesString service;
+ UTF8BytesString operationName;
+ UTF8BytesString serviceSource;
+ UTF8BytesString type;
+ UTF8BytesString spanKind;
+ UTF8BytesString httpMethod;
+ UTF8BytesString httpEndpoint;
+ UTF8BytesString grpcStatusCode;
+ short httpStatusCode;
+ boolean synthetic;
+ boolean traceRoot;
+
+ /**
+ * Reusable buffer of canonicalized peer-tag UTF8 forms. Cleared and refilled in {@link
+ * #populate}; on miss, {@link #toEntry} copies it into an immutable list for the entry to own.
+ * Zero allocation on the hit path.
+ */
+ final ArrayList peerTagsBuffer = new ArrayList<>(4);
+
+ long keyHash;
+
+ /** Canonicalize all fields from {@code s} through the handlers into this buffer. */
+ void populate(SpanSnapshot s) {
+ this.resource = RESOURCE_HANDLER.register(s.resourceName);
+ this.service = SERVICE_HANDLER.register(s.serviceName);
+ this.operationName = OPERATION_HANDLER.register(s.operationName);
+ this.serviceSource = SERVICE_SOURCE_HANDLER.register(s.serviceNameSource);
+ this.type = TYPE_HANDLER.register(s.spanType);
+ this.spanKind = SPAN_KIND_HANDLER.register(s.spanKind);
+ this.httpMethod = HTTP_METHOD_HANDLER.register(s.httpMethod);
+ this.httpEndpoint = HTTP_ENDPOINT_HANDLER.register(s.httpEndpoint);
+ this.grpcStatusCode = GRPC_STATUS_CODE_HANDLER.register(s.grpcStatusCode);
+ this.httpStatusCode = s.httpStatusCode;
+ this.synthetic = s.synthetic;
+ this.traceRoot = s.traceRoot;
+ populatePeerTags(s.peerTagSchema, s.peerTagValues);
+ this.keyHash =
+ hashOf(
+ resource,
+ service,
+ operationName,
+ serviceSource,
+ type,
+ spanKind,
+ httpMethod,
+ httpEndpoint,
+ grpcStatusCode,
+ httpStatusCode,
+ synthetic,
+ traceRoot,
+ peerTagsBuffer);
}
- return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create);
- }
- /** UTF8 vs raw CharSequence content-equality, no allocation in the common (String) case. */
- private static boolean contentEquals(UTF8BytesString a, CharSequence b) {
- if (a == null) {
- return b == null;
- }
- if (b == null) {
- return false;
- }
- // UTF8BytesString.toString() returns the underlying String -- O(1), no allocation.
- String aStr = a.toString();
- if (b instanceof String) {
- return aStr.equals(b);
- }
- if (b instanceof UTF8BytesString) {
- return aStr.equals(b.toString());
+ /**
+ * Fills {@link #peerTagsBuffer} with canonical UTF8 forms, applying the schema's per-tag
+ * handler + warn-once notification at the same index. Returns {@code EMPTY} for null inputs;
+ * we elide those from the buffer so the wire-format list-of-pairs only contains present peer
+ * tags. No allocation when the schema/values are absent or all values are null (buffer is just
+ * cleared).
+ */
+ private void populatePeerTags(PeerTagSchema schema, String[] values) {
+ peerTagsBuffer.clear();
+ if (schema == null || values == null) {
+ return;
+ }
+ int n = schema.size();
+ for (int i = 0; i < n; i++) {
+ UTF8BytesString utf8 = schema.register(i, values[i]);
+ if (utf8 != UTF8BytesString.EMPTY) {
+ peerTagsBuffer.add(utf8);
+ }
+ }
}
- return aStr.contentEquals(b);
- }
- private static boolean stringContentEquals(UTF8BytesString a, String b) {
- if (a == null) {
- return b == null;
+ /**
+ * Whether this canonicalized snapshot matches the given entry. Compares UTF8 fields via
+ * content-equality (so an entry surviving a handler reset still matches a freshly-canonicalized
+ * snapshot of the same content).
+ */
+ boolean matches(AggregateEntry e) {
+ return httpStatusCode == e.httpStatusCode
+ && synthetic == e.synthetic
+ && traceRoot == e.traceRoot
+ && Objects.equals(resource, e.resource)
+ && Objects.equals(service, e.service)
+ && Objects.equals(operationName, e.operationName)
+ && Objects.equals(serviceSource, e.serviceSource)
+ && Objects.equals(type, e.type)
+ && Objects.equals(spanKind, e.spanKind)
+ && peerTagsEqual(peerTagsBuffer, e.peerTags)
+ && Objects.equals(httpMethod, e.httpMethod)
+ && Objects.equals(httpEndpoint, e.httpEndpoint)
+ && Objects.equals(grpcStatusCode, e.grpcStatusCode);
}
- return b != null && a.toString().equals(b);
- }
- /**
- * Encodes the per-span peer-tag values into the {@code List} the serializer
- * consumes. Reads name/value pairs at the same index from the schema's names and the snapshot's
- * values; null value slots are skipped (the span didn't set that peer tag). Counts hits once for
- * exact-size allocation and preserves the singletonList fast path for the common one-entry case
- * (e.g. internal-kind base.service).
- */
- private static List materializePeerTags(
- @Nullable String[] names, @Nullable String[] values) {
- if (names == null || values == null) {
- return Collections.emptyList();
- }
- int n = names.length;
- int firstHit = -1;
- int hitCount = 0;
- for (int i = 0; i < n; i++) {
- if (values[i] != null) {
- if (hitCount == 0) firstHit = i;
- hitCount++;
+ /** Indexed list comparison -- avoids the iterator a {@code List.equals} would allocate. */
+ private static boolean peerTagsEqual(List a, List b) {
+ int n = a.size();
+ if (n != b.size()) {
+ return false;
}
+ for (int i = 0; i < n; i++) {
+ if (!a.get(i).equals(b.get(i))) {
+ return false;
+ }
+ }
+ return true;
}
- if (hitCount == 0) {
- return Collections.emptyList();
- }
- if (hitCount == 1) {
- return Collections.singletonList(encodePeerTag(names[firstHit], values[firstHit]));
- }
- List tags = new ArrayList<>(hitCount);
- for (int i = firstHit; i < n; i++) {
- if (values[i] != null) {
- tags.add(encodePeerTag(names[i], values[i]));
+
+ /**
+ * Build a new entry from the currently-populated canonical fields. The peer-tag buffer is
+ * copied into an immutable list so the entry's reference stays stable across subsequent {@link
+ * #populate} calls.
+ */
+ AggregateEntry toEntry() {
+ List snapshottedPeerTags;
+ int n = peerTagsBuffer.size();
+ if (n == 0) {
+ snapshottedPeerTags = Collections.emptyList();
+ } else if (n == 1) {
+ snapshottedPeerTags = Collections.singletonList(peerTagsBuffer.get(0));
+ } else {
+ snapshottedPeerTags = new ArrayList<>(peerTagsBuffer);
}
+ return new AggregateEntry(
+ keyHash,
+ resource,
+ service,
+ operationName,
+ serviceSource,
+ type,
+ spanKind,
+ httpMethod,
+ httpEndpoint,
+ grpcStatusCode,
+ httpStatusCode,
+ synthetic,
+ traceRoot,
+ snapshottedPeerTags);
}
- return tags;
}
- private static UTF8BytesString encodePeerTag(String name, String value) {
- final Pair, Function>
- cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER);
- return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight());
+ // ----- helpers -----
+
+ /** Direct {@link UTF8BytesString} creation that bypasses the cardinality handlers. */
+ private static UTF8BytesString createUtf8(CharSequence cs) {
+ if (cs == null) {
+ return UTF8BytesString.EMPTY;
+ }
+ if (cs instanceof UTF8BytesString) {
+ return (UTF8BytesString) cs;
+ }
+ return UTF8BytesString.create(cs.toString());
}
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java
index 2255ca1cdf8..5a2934c71d2 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java
@@ -7,14 +7,14 @@
import java.util.function.Consumer;
/**
- * Consumer-side {@link AggregateEntry} store, keyed on the raw fields of a {@link SpanSnapshot}.
+ * Consumer-side {@link AggregateEntry} store, keyed on the canonical UTF8-encoded labels of a
+ * {@link SpanSnapshot}.
*
- * Replaces the prior {@code LRUCache}. The win is on the
- * steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise
- * {@code matches}, with no per-snapshot {@link AggregateEntry} allocation and no UTF8 cache
- * lookups. The UTF8-encoded forms (formerly held on {@code MetricKey}) and the mutable counters
- * (formerly held on {@code AggregateMetric}) both live on the {@link AggregateEntry} now, built
- * once per unique key at insert time.
+ * {@link #findOrInsert} canonicalizes the snapshot's fields through the cardinality handlers (so
+ * cardinality-blocked values share a sentinel and collapse into one entry) and then computes the
+ * lookup hash from that canonical form. Canonicalization runs into a reusable {@link
+ * AggregateEntry.Canonical} scratch buffer; on a hit nothing is allocated, on a miss the buffer's
+ * references are copied into a fresh entry and the buffer is overwritten on the next call.
*
*
Not thread-safe. The aggregator thread is the sole writer; {@link #clear()} must be
* routed through the inbox rather than called from arbitrary threads.
@@ -23,6 +23,7 @@ final class AggregateTable {
private final Hashtable.Entry[] buckets;
private final int maxAggregates;
+ private final AggregateEntry.Canonical canonical = new AggregateEntry.Canonical();
private int size;
AggregateTable(int maxAggregates) {
@@ -44,18 +45,19 @@ boolean isEmpty() {
* caller should drop the data point in that case.
*/
AggregateEntry findOrInsert(SpanSnapshot snapshot) {
- long keyHash = AggregateEntry.hashOf(snapshot);
+ canonical.populate(snapshot);
+ long keyHash = canonical.keyHash;
for (AggregateEntry candidate = Support.bucket(buckets, keyHash);
candidate != null;
candidate = candidate.next()) {
- if (candidate.matches(keyHash, snapshot)) {
+ if (candidate.keyHash == keyHash && canonical.matches(candidate)) {
return candidate;
}
}
if (size >= maxAggregates && !evictOneStale()) {
return null;
}
- AggregateEntry entry = AggregateEntry.forSnapshot(snapshot);
+ AggregateEntry entry = canonical.toEntry();
Support.insertHeadEntry(buckets, keyHash, entry);
size++;
return entry;
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java
similarity index 73%
rename from dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
rename to dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java
index 14a0593f556..393181b5936 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java
@@ -4,7 +4,6 @@
import static datadog.trace.api.DDSpanTypes.RPC;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD;
-import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG;
import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR;
@@ -39,14 +38,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class ConflatingMetricsAggregator implements MetricsAggregator, EventListener {
+public final class ClientStatsAggregator implements MetricsAggregator, EventListener {
- private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class);
+ private static final Logger log = LoggerFactory.getLogger(ClientStatsAggregator.class);
private static final Map DEFAULT_HEADERS =
Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION);
- private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";
+ private static final String SYNTHETICS_ORIGIN = "synthetics";
private static final SpanKindFilter METRICS_ELIGIBLE_KINDS =
SpanKindFilter.builder()
@@ -74,25 +73,26 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private final boolean includeEndpointInMetrics;
/**
- * Cached peer-aggregation schema. Producers read this reference once per trace and pass it
- * through to the consumer in {@link SpanSnapshot}; they never inspect the schema's timestamp or
- * rebuild it. Reconciliation is the aggregator thread's job: {@link #reconcilePeerTagSchema()}
- * compares the schema's {@link PeerTagSchema#lastTimeDiscovered} against {@link
- * DDAgentFeaturesDiscovery#getLastTimeDiscovered()} once per reporting cycle and either bumps the
- * timestamp in place (when the tag set is unchanged) or swaps in a freshly-built schema.
+ * Cached peer-tag schema. Producers read this reference once per trace and pass it through to the
+ * consumer in {@link SpanSnapshot}; they never inspect the schema's timestamp or rebuild it.
+ * Reconciliation is the aggregator thread's job: {@link #resetCardinalityHandlers()} compares the
+ * schema's {@link PeerTagSchema#lastTimeDiscovered} against {@link
+ * DDAgentFeaturesDiscovery#getLastTimeDiscovered()} once per reporting cycle and either updates
+ * the timestamp in place (when the tag set is unchanged, preserving the schema's warm cardinality
+ * handlers) or swaps in a freshly-built schema.
*
- * {@code null} only on the bootstrap window before {@link #bootstrapPeerTagSchema()} runs on
- * the first publish.
+ *
An empty schema (size 0) represents the "peer tags unconfigured" state; {@code null} only on
+ * the bootstrap window before {@link #bootstrapPeerTagSchema()} runs on the first publish.
*
*
{@code volatile} so the consumer's reconcile-time replacement is visible to producer
- * threads; the schema's own internal mutable state ({@link PeerTagSchema#lastTimeDiscovered}) is
+ * threads; the schema's own internal mutable state (handlers, block counters, timestamp) is
* exercised only on the aggregator thread.
*/
private volatile PeerTagSchema cachedPeerTagSchema;
private volatile AgentTaskScheduler.Scheduled> cancellation;
- public ConflatingMetricsAggregator(
+ public ClientStatsAggregator(
Config config,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetrics) {
@@ -113,7 +113,7 @@ public ConflatingMetricsAggregator(
config.isTraceResourceRenamingEnabled());
}
- ConflatingMetricsAggregator(
+ ClientStatsAggregator(
WellKnownTags wellKnownTags,
Set ignoredResources,
DDAgentFeaturesDiscovery features,
@@ -135,7 +135,7 @@ public ConflatingMetricsAggregator(
includeEndpointInMetrics);
}
- ConflatingMetricsAggregator(
+ ClientStatsAggregator(
WellKnownTags wellKnownTags,
Set ignoredResources,
DDAgentFeaturesDiscovery features,
@@ -159,7 +159,7 @@ public ConflatingMetricsAggregator(
includeEndpointInMetrics);
}
- ConflatingMetricsAggregator(
+ ClientStatsAggregator(
Set ignoredResources,
DDAgentFeaturesDiscovery features,
HealthMetrics healthMetric,
@@ -184,7 +184,7 @@ public ConflatingMetricsAggregator(
reportingInterval,
timeUnit,
healthMetric,
- this::reconcilePeerTagSchema);
+ this::resetCardinalityHandlers);
this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator);
this.reportingInterval = reportingInterval;
this.reportingIntervalTimeUnit = timeUnit;
@@ -266,6 +266,14 @@ public boolean publish(List extends CoreSpan>> trace) {
boolean forceKeep = false;
int counted = 0;
if (features.supportsMetrics()) {
+ // Producer-side fast path: one volatile read and use whatever schema is currently cached.
+ // The aggregator thread keeps this schema in sync with feature discovery in
+ // resetCardinalityHandlers(). The only producer-side rebuild is the one-time bootstrap on
+ // the first publish.
+ PeerTagSchema peerTagSchema = cachedPeerTagSchema;
+ if (peerTagSchema == null) {
+ peerTagSchema = bootstrapPeerTagSchema();
+ }
for (CoreSpan> span : trace) {
boolean isTopLevel = span.isTopLevel();
if (shouldComputeMetric(span, isTopLevel)) {
@@ -276,7 +284,7 @@ public boolean publish(List extends CoreSpan>> trace) {
break;
}
counted++;
- forceKeep |= publish(span, isTopLevel);
+ forceKeep |= publish(span, isTopLevel, peerTagSchema);
}
}
healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep);
@@ -291,20 +299,7 @@ private boolean shouldComputeMetric(CoreSpan> span, boolean isTopLevel) {
&& span.getDurationNano() > 0;
}
- private boolean publish(CoreSpan> span, boolean isTopLevel) {
- // Error decision drives force-keep sampling regardless of whether the snapshot gets queued.
- boolean error = span.getError() > 0;
-
- // Fast-path the inbox-full case before any tag extraction or snapshot allocation. size() is
- // approximate on jctools' MPSC queue but that's fine: if we under-estimate, we fall through
- // and let inbox.offer be the source of truth (existing behavior); if we over-estimate, we
- // drop a snapshot that would have fit -- acceptable, onStatsInboxFull was going to fire
- // imminently anyway.
- if (inbox.size() >= inbox.capacity()) {
- healthMetrics.onStatsInboxFull();
- return error;
- }
-
+ private boolean publish(CoreSpan> span, boolean isTopLevel, PeerTagSchema peerTagSchema) {
// Extract HTTP method and endpoint only if the feature is enabled
String httpMethod = null;
String httpEndpoint = null;
@@ -321,20 +316,24 @@ private boolean publish(CoreSpan> span, boolean isTopLevel) {
Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE);
grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null;
}
- // CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString
- // tag values don't trigger a ClassCastException on the String assignment.
- final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString();
+ // DDSpan resolves this from a cached span.kind ordinal via a small lookup array, skipping a
+ // tag-map lookup. Other CoreSpan impls fall back to the tag map by default.
+ String spanKind = span.getSpanKindString();
+ if (spanKind == null) {
+ spanKind = "";
+ }
+ boolean error = span.getError() > 0;
long tagAndDuration =
span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L);
- PeerTagSchema peerTagSchema = peerTagSchemaFor(span);
+ PeerTagSchema spanPeerTagSchema = peerTagSchemaFor(span, peerTagSchema);
String[] peerTagValues =
- peerTagSchema == null ? null : capturePeerTagValues(span, peerTagSchema);
+ spanPeerTagSchema == null ? null : capturePeerTagValues(span, spanPeerTagSchema);
if (peerTagValues == null) {
- // No tags fired -- drop the schema reference so the consumer doesn't bother iterating an
- // all-null array.
- peerTagSchema = null;
+ // capture returned no non-null values -- drop the schema reference so the consumer doesn't
+ // bother iterating an all-null array.
+ spanPeerTagSchema = null;
}
SpanSnapshot snapshot =
@@ -348,7 +347,7 @@ private boolean publish(CoreSpan> span, boolean isTopLevel) {
isSynthetic(span),
span.getParentId() == 0,
spanKind,
- peerTagSchema,
+ spanPeerTagSchema,
peerTagValues,
httpMethod,
httpEndpoint,
@@ -361,26 +360,6 @@ private boolean publish(CoreSpan> span, boolean isTopLevel) {
return error;
}
- /**
- * Picks the peer-tag schema for a span. For internal-kind spans we always use the static {@link
- * PeerTagSchema#INTERNAL} singleton (one entry for {@code base.service}); for {@code
- * client}/{@code producer}/{@code consumer} kinds we use the cached peer-aggregation schema
- * synced from {@link DDAgentFeaturesDiscovery#peerTags()}. Other kinds get {@code null}.
- */
- private PeerTagSchema peerTagSchemaFor(CoreSpan> span) {
- if (span.isKind(PEER_AGGREGATION_KINDS)) {
- PeerTagSchema schema = cachedPeerTagSchema;
- if (schema == null) {
- schema = bootstrapPeerTagSchema();
- }
- return schema.size() > 0 ? schema : null;
- }
- if (span.isKind(INTERNAL_KIND)) {
- return PeerTagSchema.INTERNAL;
- }
- return null;
- }
-
/**
* One-time producer-side bootstrap of {@link #cachedPeerTagSchema}. Synchronized double-check
* guards against two producers racing on the very first publish; after this returns, {@code
@@ -401,16 +380,35 @@ private synchronized PeerTagSchema bootstrapPeerTagSchema() {
private PeerTagSchema buildPeerTagSchema() {
Set names = features.peerTags();
return PeerTagSchema.of(
- names == null ? Collections.emptySet() : names, features.getLastTimeDiscovered());
+ names == null ? Collections.emptySet() : names,
+ features.getLastTimeDiscovered(),
+ healthMetrics);
+ }
+
+ /**
+ * Single reset hook invoked on the aggregator thread at the end of each report cycle. Reconciles
+ * the cached peer-tag schema against the latest feature discovery, then resets all cardinality
+ * state in lockstep: the static property handlers + {@code PeerTagSchema.INTERNAL} (via {@link
+ * AggregateEntry#resetCardinalityHandlers()}) and the cached peer-tag schema (with whatever
+ * reconciliation just produced). New handlers added anywhere in this pipeline should be reset
+ * from here.
+ */
+ private void resetCardinalityHandlers() {
+ reconcilePeerTagSchema();
+ AggregateEntry.resetCardinalityHandlers();
+ PeerTagSchema schema = cachedPeerTagSchema;
+ if (schema != null) {
+ schema.resetCardinalityHandlers();
+ }
}
/**
* Reconciles {@link #cachedPeerTagSchema} with the latest feature discovery. Runs on the
- * aggregator thread once per reporting cycle via the reset hook passed to {@link Aggregator}.
- * Cheap fast path: a long compare against the cached schema's embedded timestamp short-circuits
- * when discovery hasn't refreshed since the schema was built. On mismatch, a set compare
- * distinguishes "discovery refreshed but tags unchanged" (just bump the timestamp in place) from
- * "tags actually changed" (build a new schema and swap the volatile reference).
+ * aggregator thread once per reporting cycle. Cheap fast path: a long compare against the cached
+ * schema's embedded timestamp short-circuits when discovery hasn't refreshed since the schema was
+ * built. On mismatch, a set compare distinguishes "discovery refreshed but tags unchanged" (just
+ * bump the timestamp in place to preserve the warm cardinality handlers) from "tags actually
+ * changed" (build a new schema and swap the volatile reference).
*/
private void reconcilePeerTagSchema() {
PeerTagSchema cached = cachedPeerTagSchema;
@@ -423,19 +421,34 @@ private void reconcilePeerTagSchema() {
return;
}
Set latestNames = features.peerTags();
- Set normalized = latestNames == null ? Collections.emptySet() : latestNames;
+ Set normalized = latestNames == null ? Collections.emptySet() : latestNames;
if (cached.hasSameTagsAs(normalized)) {
cached.lastTimeDiscovered = latestDiscoveredAt;
} else {
- cachedPeerTagSchema = PeerTagSchema.of(normalized, latestDiscoveredAt);
+ cachedPeerTagSchema = PeerTagSchema.of(normalized, latestDiscoveredAt, healthMetrics);
+ }
+ }
+
+ /**
+ * Picks the peer-tag schema for a span. The {@code peerTagSchema} argument is the per-trace
+ * cached schema (read once in {@link #publish(List)} via the volatile {@link
+ * #cachedPeerTagSchema}, with {@link #bootstrapPeerTagSchema()} taking care of the first-publish
+ * window) -- always non-null but possibly empty when peer tags are unconfigured. For
+ * internal-kind spans the static {@link PeerTagSchema#INTERNAL} schema is used regardless.
+ */
+ private static PeerTagSchema peerTagSchemaFor(CoreSpan> span, PeerTagSchema peerTagSchema) {
+ if (peerTagSchema.size() > 0 && span.isKind(PEER_AGGREGATION_KINDS)) {
+ return peerTagSchema;
}
+ if (span.isKind(INTERNAL_KIND)) {
+ return PeerTagSchema.INTERNAL;
+ }
+ return null;
}
/**
- * Captures the span's peer-tag values into a {@code String[]} parallel to {@code schema.names}.
- * Slots remain {@code null} for tags the span didn't set; the array itself is lazily allocated on
- * the first hit so spans that fire no peer tags pay zero allocation. Returns {@code null} when
- * none of the configured peer tags are set on the span.
+ * Captures the span's peer tag values into a {@code String[]} parallel to {@code schema.names}.
+ * Returns {@code null} when none of the configured peer tags are set on the span.
*/
private static String[] capturePeerTagValues(CoreSpan> span, PeerTagSchema schema) {
String[] names = schema.names;
@@ -454,7 +467,8 @@ private static String[] capturePeerTagValues(CoreSpan> span, PeerTagSchema sch
}
private static boolean isSynthetic(CoreSpan> span) {
- return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString());
+ CharSequence origin = span.getOrigin();
+ return origin != null && SYNTHETICS_ORIGIN.contentEquals(origin);
}
public void stop() {
@@ -500,24 +514,16 @@ private void disable() {
if (!features.supportsMetrics()) {
log.debug("Disabling metric reporting because an agent downgrade was detected");
// Route the clear through the inbox so the aggregator thread is the only writer.
- // AggregateTable is not thread-safe; calling clearAggregates() directly from this thread
- // would race with Drainer.accept on the aggregator thread.
- //
- // Best-effort single offer rather than the retry-loop pattern in report(). If the inbox is
- // full at downgrade time the clear is dropped, but the system self-heals: features.discover()
- // already flipped supportsMetrics() false, so producer publish() calls now skip the inbox;
- // the aggregator drains existing snapshots and ships them on the next report cycle; the
- // sink rejects that payload and fires DOWNGRADED again, which retries disable() against a
- // now-empty inbox. Worst case: one extra reporting cycle of stale data.
+ // AggregateTable is not thread-safe; clearing it directly from this thread would race
+ // with Drainer.accept on the aggregator thread.
inbox.offer(CLEAR);
}
}
- private static final class ReportTask
- implements AgentTaskScheduler.Task {
+ private static final class ReportTask implements AgentTaskScheduler.Task {
@Override
- public void run(ConflatingMetricsAggregator target) {
+ public void run(ClientStatsAggregator target) {
target.report();
}
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java
new file mode 100644
index 00000000000..f7d91343d4b
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java
@@ -0,0 +1,73 @@
+package datadog.trace.common.metrics;
+
+/**
+ * Per-field caps on the number of distinct values canonicalized per reporting cycle. Overflow
+ * values collapse to a {@code blocked_by_tracer} sentinel so they merge into one aggregate row
+ * instead of fragmenting the table.
+ *
+ * Values are sized to the typical-service workload with headroom; "typical" estimates are noted
+ * inline. Raise if a workload routinely hits the sentinel; lower carries proportional memory
+ * savings but risks suppressing legitimate distinctions.
+ */
+final class MetricCardinalityLimits {
+ private MetricCardinalityLimits() {}
+
+ /**
+ * Distinct {@code resource.name} values per cycle. Highest-cardinality field by far: DB-query
+ * obfuscations, HTTP route templates, custom resources. Typical service: 30-200 unique.
+ */
+ static final int RESOURCE = 128;
+
+ /**
+ * Distinct {@code service.name} values per cycle. Local service plus downstream peer-service
+ * names. Microservice meshes typically reference 10-50 distinct services.
+ */
+ static final int SERVICE = 32;
+
+ /**
+ * Distinct {@code operation.name} values per cycle. Names like {@code http.request}, {@code
+ * db.query}, etc. Typical service: 10-30 across integrations.
+ */
+ static final int OPERATION = 64;
+
+ /**
+ * Distinct {@code _dd.base_service} override values per cycle. Used rarely; usually empty or one
+ * of a handful per service.
+ */
+ static final int SERVICE_SOURCE = 16;
+
+ /**
+ * Distinct {@code span.type} values per cycle. {@code DDSpanTypes} catalog is ~30; a single
+ * service usually spans 5-10 integration types.
+ */
+ static final int TYPE = 16;
+
+ /**
+ * Distinct {@code span.kind} values per cycle. OTel defines exactly 5 (server/client/producer/
+ * consumer/internal); 8 still leaves 60% headroom in case a producer invents new kinds.
+ */
+ static final int SPAN_KIND = 8;
+
+ /**
+ * Distinct HTTP method values per cycle. Standard verbs are 7-9; WebDAV/custom adds a few more.
+ */
+ static final int HTTP_METHOD = 16;
+
+ /**
+ * Distinct {@code http.endpoint} values per cycle. Path templates -- same shape as {@code
+ * RESOURCE} for HTTP-heavy services. Only used when {@code includeEndpointInMetrics} is enabled.
+ */
+ static final int HTTP_ENDPOINT = 64;
+
+ /**
+ * Distinct gRPC status code values per cycle. gRPC spec defines exactly 17 codes (0-16); 24
+ * leaves headroom for unknown-code edge cases without wasting space.
+ */
+ static final int GRPC_STATUS_CODE = 24;
+
+ /**
+ * Distinct values per peer-tag name (e.g. distinct {@code peer.hostname} values). Each configured
+ * peer tag gets its own handler at this limit.
+ */
+ static final int PEER_TAG_VALUE = 512;
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java
index 09464310113..b9530871763 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java
@@ -15,7 +15,7 @@ public static MetricsAggregator createMetricsAggregator(
HealthMetrics healthMetrics) {
if (config.isTracerMetricsEnabled()) {
log.debug("tracer metrics enabled");
- return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics);
+ return new ClientStatsAggregator(config, sharedCommunicationObjects, healthMetrics);
}
log.debug("tracer metrics disabled");
return NoOpMetricsAggregator.INSTANCE;
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java
index 87a0b955f5f..295ab27117c 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java
@@ -2,48 +2,59 @@
import static datadog.trace.api.DDTags.BASE_SERVICE;
-import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
+import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
+import datadog.trace.core.monitor.HealthMetrics;
+import java.util.HashSet;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Names of the peer-tags eligible for client-stats aggregation, packed into a flat {@code String[]}
- * for parallel-array access by producers and the aggregator thread.
+ * Parallel arrays of peer-tag names and their {@link TagCardinalityHandler}s, indexed in lockstep.
*
- *
This is the minimal carrier shape used by {@link SpanSnapshot}: the producer captures per-span
- * values into a {@code String[]} parallel to {@link #names}, and the aggregator reconstructs the
- * encoded {@code tag:value} pairs from the same name index. It replaces the prior "flat pairs"
- * {@code [name0, value0, name1, value1, ...]} layout, which forced a worst-case allocation +
- * trim-and-copy on every span.
+ *
Replaces the previous {@code Map} lookup with positional array
+ * access: the producer captures span tag values into a {@code String[]} parallel to {@link #names},
+ * and the consumer calls {@link #register(int, String)} at the same index to canonicalize the value
+ * through the per-tag cardinality handler.
*
* Two schemas exist:
*
*
* - {@link #INTERNAL} -- a singleton with one entry for {@code base.service}, used for
* internal-kind spans where only the base service is aggregated.
- *
- A peer-aggregation schema built via {@link #of(Set, long)} for {@code client}/{@code
- * producer}/{@code consumer} spans. {@link ConflatingMetricsAggregator} caches the most
- * recently built schema and reconciles it on the aggregator thread once per reporting cycle
- * by comparing {@link #lastTimeDiscovered} against {@link
- * DDAgentFeaturesDiscovery#getLastTimeDiscovered()}.
+ *
- A peer-aggregation schema built via {@link #of(Set, long, HealthMetrics)} for {@code
+ * client}/{@code producer}/{@code consumer} spans. {@link ClientStatsAggregator} caches the
+ * most recently built schema and reconciles it on the aggregator thread once per reporting
+ * cycle by comparing {@link #lastTimeDiscovered} against {@code
+ * DDAgentFeaturesDiscovery.getLastTimeDiscovered()}.
*
*
- * This class deliberately has no cardinality limiters or per-cycle state -- callers that need
- * those layer them on top.
+ *
Cardinality blocks emit a one-shot warn log per reporting cycle per tag (tracked via {@link
+ * #warnedCardinality}) and accumulate a per-tag block counter (tracked via {@link #blockedCounts})
+ * that is flushed to {@link HealthMetrics#onTagCardinalityBlocked(String, long)} once per affected
+ * tag at cycle reset. All per-cycle state resets in {@link #resetCardinalityHandlers()}.
*
- *
Thread-safety: {@link #names} is final and safe to read from any thread. {@link
- * #lastTimeDiscovered} is exercised only on the aggregator thread (read and updated in
- * reconciliation); producer threads access the schema only through the volatile {@code
- * cachedPeerTagSchema} reference in {@link ConflatingMetricsAggregator}.
+ *
Each {@link SpanSnapshot} captures its own schema reference so producer and consumer agree on
+ * the indexing even if the current schema is replaced between capture and consumption.
+ *
+ *
Thread-safety: all mutable state ({@link TagCardinalityHandler}s, the warn-once set,
+ * {@link #blockedCounts}, and {@link #lastTimeDiscovered}) is exercised only on the aggregator
+ * thread. {@link #names} and {@link #handlers} are final and safe to read from any thread; producer
+ * threads access them through the volatile {@code cachedPeerTagSchema} reference in {@link
+ * ClientStatsAggregator}.
*/
final class PeerTagSchema {
+ private static final Logger log = LoggerFactory.getLogger(PeerTagSchema.class);
+
/** Singleton schema for internal-kind spans -- only {@code base.service}. */
static final PeerTagSchema INTERNAL =
// -1L sentinel; INTERNAL is never reconciled, so the value just has to be distinct from any
// real System.currentTimeMillis() that the aggregator might observe.
- new PeerTagSchema(new String[] {BASE_SERVICE}, -1L);
+ new PeerTagSchema(new String[] {BASE_SERVICE}, -1L, HealthMetrics.NO_OP);
final String[] names;
+ final TagCardinalityHandler[] handlers;
/**
* The {@code DDAgentFeaturesDiscovery.getLastTimeDiscovered()} value this schema was built from.
@@ -53,30 +64,56 @@ final class PeerTagSchema {
*/
long lastTimeDiscovered;
- private PeerTagSchema(String[] names, long lastTimeDiscovered) {
- this.names = names;
- this.lastTimeDiscovered = lastTimeDiscovered;
- }
+ private final HealthMetrics healthMetrics;
+
+ /**
+ * Per-cycle warn-once gating. {@code Set.add(name)} returns true exactly the first time a tag
+ * gets blocked this cycle, which is the only time we want to emit the warn log. Cleared by {@link
+ * #resetCardinalityHandlers()}.
+ */
+ private final Set warnedCardinality = new HashSet<>();
+
+ /**
+ * Per-tag block counter, indexed in lockstep with {@link #names}. Incremented on every blocked
+ * value during the cycle; flushed to {@link HealthMetrics#onTagCardinalityBlocked(String, long)}
+ * and zeroed in {@link #resetCardinalityHandlers()}. Single statsd call per affected tag per
+ * cycle keeps a misconfigured high-cardinality tag from flooding the metrics pipe.
+ */
+ private final long[] blockedCounts;
/** Builds a schema for the given peer-tag names. Order is determined by the {@link Set}. */
- static PeerTagSchema of(Set tags, long lastTimeDiscovered) {
- return new PeerTagSchema(tags.toArray(new String[0]), lastTimeDiscovered);
+ static PeerTagSchema of(Set names, long lastTimeDiscovered, HealthMetrics healthMetrics) {
+ return new PeerTagSchema(names.toArray(new String[0]), lastTimeDiscovered, healthMetrics);
}
/**
- * Test-only factory that takes the names array directly so tests can build a schema in a specific
- * order without going through a {@link Set}.
+ * Test-only factory that takes the names array directly so tests can build a schema in a
+ * specific order without going through a {@link Set}. Uses {@link HealthMetrics#NO_OP} and a
+ * sentinel discovery timestamp; tests exercising the cardinality-handler reset path should use
+ * {@link #of(Set, long, HealthMetrics)} instead.
*/
static PeerTagSchema testSchema(String[] names) {
- return new PeerTagSchema(names, 0L);
+ return new PeerTagSchema(names, 0L, HealthMetrics.NO_OP);
+ }
+
+ private PeerTagSchema(String[] names, long lastTimeDiscovered, HealthMetrics healthMetrics) {
+ this.names = names;
+ this.lastTimeDiscovered = lastTimeDiscovered;
+ this.healthMetrics = healthMetrics;
+ this.handlers = new TagCardinalityHandler[names.length];
+ this.blockedCounts = new long[names.length];
+ for (int i = 0; i < names.length; i++) {
+ this.handlers[i] =
+ new TagCardinalityHandler(names[i], MetricCardinalityLimits.PEER_TAG_VALUE);
+ }
}
/**
* Whether this schema's tag names exactly match {@code other}. Used by the aggregator's reconcile
* path: when a feature discovery refresh bumps {@link
* DDAgentFeaturesDiscovery#getLastTimeDiscovered()} but the resulting set is unchanged, the
- * aggregator can keep this schema and just bump {@link #lastTimeDiscovered} instead of
- * rebuilding.
+ * aggregator can keep this schema (and its warm cardinality handlers) and just bump {@link
+ * #lastTimeDiscovered} instead of rebuilding.
*/
boolean hasSameTagsAs(Set other) {
if (this.names.length != other.size()) {
@@ -90,7 +127,50 @@ boolean hasSameTagsAs(Set other) {
return true;
}
+ /**
+ * Canonicalizes the peer-tag value at slot {@code i}. Returns {@link UTF8BytesString#EMPTY} for
+ * null inputs and the handler's {@code ":blocked_by_tracer"} sentinel when the per-tag
+ * cardinality budget is exhausted. Increments the per-tag block counter on every block and emits
+ * a one-shot warn log per cycle per tag; the counter is flushed to {@link HealthMetrics} in
+ * {@link #resetCardinalityHandlers()}.
+ */
+ UTF8BytesString register(int i, String value) {
+ TagCardinalityHandler handler = handlers[i];
+ UTF8BytesString result = handler.register(value);
+ if (handler.isBlockedResult(result)) {
+ blockedCounts[i]++;
+ String name = names[i];
+ if (warnedCardinality.add(name)) {
+ log.warn(
+ "Cardinality limit reached for peer tag '{}'; further values are reported as"
+ + " 'blocked_by_tracer' until the next reporting cycle",
+ name);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Resets every {@link TagCardinalityHandler}'s working set, flushes accumulated per-tag block
+ * counts to {@link HealthMetrics}, and clears the per-cycle warn-once tracking. Must be called on
+ * the aggregator thread; handlers are not thread-safe.
+ */
+ void resetCardinalityHandlers() {
+ for (int i = 0; i < handlers.length; i++) {
+ handlers[i].reset();
+ if (blockedCounts[i] > 0) {
+ healthMetrics.onTagCardinalityBlocked(names[i], blockedCounts[i]);
+ blockedCounts[i] = 0;
+ }
+ }
+ warnedCardinality.clear();
+ }
+
int size() {
return names.length;
}
+
+ String name(int i) {
+ return names[i];
+ }
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java
new file mode 100644
index 00000000000..14af0bd0b27
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java
@@ -0,0 +1,140 @@
+package datadog.trace.common.metrics;
+
+import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
+import java.util.Arrays;
+
+/**
+ * Cardinality-capped UTF8 canonicalizer for one property field.
+ *
+ * Dual role -- limiter and cache. Prior versions ran a per-field {@code DDCache} for UTF8
+ * reuse with a separate global cardinality cap on top. Under high load that wasn't enough to stave
+ * off long GC cycles: every miss still concatenated / UTF8-encoded the value before the cache could
+ * store it. A cardinality limiter and a recent-value cache are both sets of recently used
+ * values, so this class collapses them into one structure. Cardinality limiting happens first,
+ * which lets the blocked path skip the concatenation and encoding entirely.
+ *
+ *
A pure limiter would fully reset each reporting cycle and destroy the cache. To preserve UTF8
+ * reuse across resets, the handler keeps the previous cycle's entries verbatim in a parallel table
+ * and reuses any matching {@link UTF8BytesString} when a value first appears in the new cycle.
+ *
+ *
Accepts any {@link CharSequence} input -- mixed {@code String}/{@code UTF8BytesString} of the
+ * same content collapse to one slot because {@link UTF8BytesString#hashCode()} delegates to the
+ * underlying String's hash and probe equality is the content-based {@code
+ * stored.toString().contentEquals(value)} (which fast-paths to {@code String.equals} when the input
+ * is a String).
+ *
+ *
Storage: open-addressed flat arrays with linear probing. Two parallel {@code
+ * UTF8BytesString[]} tables -- "current cycle" and "prior cycle". Capacity is the next power of two
+ * {@code >= 2 * cardinalityLimit} so probes stay short even at the full budget. The stored
+ * UTF8BytesString carries the slot's identity directly; no parallel keys array needed.
+ *
+ *
+ * - The current table tracks which values have consumed a slot of the cardinality budget this
+ * reporting cycle. Once {@link #cardinalityLimit} distinct values are present, further
+ * first-time values get the {@code blocked_by_tracer} sentinel.
+ *
- The prior table holds the previous cycle's entries verbatim. A first-time-this-cycle value
+ * that hits in the prior table reuses its {@link UTF8BytesString} instance -- no
+ * re-allocation -- and stores that reference in the current table.
+ *
+ *
+ * Reset: swap the current and prior pointers, then null the (now) current. One
+ * O(capacity) pass; half the work of a copy-then-null. Workloads with a stable value set across
+ * cycles pay zero UTF8 allocations after the first cycle, and the reused instances also
+ * short-circuit downstream equality to identity comparisons.
+ */
+final class PropertyCardinalityHandler {
+ private final int cardinalityLimit;
+ private final int capacityMask;
+
+ // Single open-addressed table per cycle. The stored UTF8BytesString IS the slot identity --
+ // equality is checked by comparing its underlying String against the incoming CharSequence.
+ private UTF8BytesString[] curValues;
+ private UTF8BytesString[] priorValues;
+ private int curSize;
+
+ private UTF8BytesString cacheBlocked = null;
+
+ PropertyCardinalityHandler(int cardinalityLimit) {
+ if (cardinalityLimit <= 0) {
+ throw new IllegalArgumentException("cardinalityLimit must be positive: " + cardinalityLimit);
+ }
+ // Upper bound prevents overflow in the (cardinalityLimit * 2 - 1) capacity calc below.
+ // Practical limits are 8..512; this cap is well beyond any realistic configuration.
+ if (cardinalityLimit > (1 << 29)) {
+ throw new IllegalArgumentException(
+ "cardinalityLimit must be at most 2^29: " + cardinalityLimit);
+ }
+ this.cardinalityLimit = cardinalityLimit;
+ // Capacity = next power of two >= 2 * cardinalityLimit. Linear-probing load factor stays
+ // <= 0.5 even when the budget is full, which keeps probe chains short.
+ final int capacity = Integer.highestOneBit(cardinalityLimit * 2 - 1) << 1;
+ this.capacityMask = capacity - 1;
+ this.curValues = new UTF8BytesString[capacity];
+ this.priorValues = new UTF8BytesString[capacity];
+ }
+
+ /**
+ * Canonicalizes {@code value} through the cardinality budget and per-cycle reuse cache. Null
+ * inputs map to {@link UTF8BytesString#EMPTY} -- callers don't need to pre-check.
+ */
+ UTF8BytesString register(CharSequence value) {
+ if (value == null) {
+ return UTF8BytesString.EMPTY;
+ }
+ final int slot = probe(this.curValues, value);
+ final UTF8BytesString existing = this.curValues[slot];
+ if (existing != null) {
+ // Already seen this cycle -- consumed a budget slot earlier; reuse the cached UTF8.
+ return existing;
+ }
+ if (this.curSize >= this.cardinalityLimit) {
+ return this.blockedByTracer();
+ }
+ // First-time-this-cycle value. Reuse from the prior cycle if possible to avoid re-allocation.
+ UTF8BytesString utf8;
+ final int priorSlot = probe(this.priorValues, value);
+ final UTF8BytesString priorMatch = this.priorValues[priorSlot];
+ if (priorMatch != null) {
+ utf8 = priorMatch;
+ } else {
+ utf8 = UTF8BytesString.create(value);
+ }
+ this.curValues[slot] = utf8;
+ this.curSize += 1;
+ return utf8;
+ }
+
+ /**
+ * Linear-probe to find {@code value}'s slot: either the slot occupied by a content-equal
+ * UTF8BytesString, or the first empty slot in the probe chain. {@link UTF8BytesString#hashCode}
+ * is content-stable with the underlying String, so the same content hashes to the same slot
+ * regardless of whether the input is a String or UTF8BytesString.
+ */
+ private int probe(UTF8BytesString[] values, CharSequence value) {
+ int idx = value.hashCode() & this.capacityMask;
+ while (values[idx] != null && !values[idx].toString().contentEquals(value)) {
+ idx = (idx + 1) & this.capacityMask;
+ }
+ return idx;
+ }
+
+ private UTF8BytesString blockedByTracer() {
+ UTF8BytesString cacheBlocked = this.cacheBlocked;
+ if (cacheBlocked != null) return cacheBlocked;
+
+ this.cacheBlocked = cacheBlocked = UTF8BytesString.create("blocked_by_tracer");
+ return cacheBlocked;
+ }
+
+ void reset() {
+ // Flip pointers: the just-completed cycle becomes prior; what was prior (2 cycles ago) is
+ // recycled into the new (empty) current.
+ final UTF8BytesString[] tmp = this.priorValues;
+ this.priorValues = this.curValues;
+ this.curValues = tmp;
+ // Null the new current. The values pulled out of prior are still reachable through any
+ // AggregateEntry rows they ended up populating; this just drops the handler's references.
+ Arrays.fill(this.curValues, null);
+ this.curSize = 0;
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java
index 7644ebaf044..f592dfe26f6 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java
@@ -143,11 +143,14 @@ public void startBucket(int metricCount, long start, long duration) {
@Override
public void add(AggregateEntry entry) {
- // Calculate dynamic map size based on optional fields
- final boolean hasHttpMethod = entry.getHttpMethod() != null;
- final boolean hasHttpEndpoint = entry.getHttpEndpoint() != null;
- final boolean hasServiceSource = entry.getServiceSource() != null;
- final boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null;
+ // Calculate dynamic map size based on optional fields. AggregateEntry uses
+ // UTF8BytesString.EMPTY
+ // as the "absent" sentinel for these optional fields (see AggregateEntry); identity comparison
+ // against the singleton.
+ final boolean hasHttpMethod = entry.getHttpMethod() != EMPTY;
+ final boolean hasHttpEndpoint = entry.getHttpEndpoint() != EMPTY;
+ final boolean hasServiceSource = entry.getServiceSource() != EMPTY;
+ final boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != EMPTY;
final int mapSize =
15
+ (hasServiceSource ? 1 : 0)
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
index 152ac42bb55..7b44029cfcd 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
@@ -1,5 +1,7 @@
package datadog.trace.common.metrics;
+import javax.annotation.Nullable;
+
/**
* Immutable per-span value posted from the producer to the aggregator thread. Carries the raw
* inputs the aggregator needs to look up or build an {@link AggregateEntry} and update its
@@ -22,17 +24,17 @@ final class SpanSnapshot implements InboxItem {
/**
* Schema for {@link #peerTagValues}. {@code null} when the span has no peer tags. The schema
- * carries the names in parallel-array form; {@code peerTagValues} holds the per-span tag values
- * at the same indices.
+ * carries the names + {@link TagCardinalityHandler}s in parallel array form; {@code
+ * peerTagValues} holds the per-span tag values at the same indices.
*/
- final PeerTagSchema peerTagSchema;
+ @Nullable final PeerTagSchema peerTagSchema;
/**
* Peer tag values captured from the span, parallel to {@code peerTagSchema.names}. A {@code null}
* entry means the span didn't have that peer tag set. {@code null} (the whole array) when {@link
* #peerTagSchema} is {@code null}.
*/
- final String[] peerTagValues;
+ @Nullable final String[] peerTagValues;
final String httpMethod;
final String httpEndpoint;
@@ -51,8 +53,8 @@ final class SpanSnapshot implements InboxItem {
boolean synthetic,
boolean traceRoot,
String spanKind,
- PeerTagSchema peerTagSchema,
- String[] peerTagValues,
+ @Nullable PeerTagSchema peerTagSchema,
+ @Nullable String[] peerTagValues,
String httpMethod,
String httpEndpoint,
String grpcStatusCode,
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java
new file mode 100644
index 00000000000..7cb6076dabc
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java
@@ -0,0 +1,115 @@
+package datadog.trace.common.metrics;
+
+import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
+import java.util.Arrays;
+
+/**
+ * Cardinality-capped UTF8 canonicalizer for one peer-tag name. Output is the pre-encoded {@code
+ * "tag:value"} form the serializer writes.
+ *
+ *
Like {@link PropertyCardinalityHandler}, this serves a dual role -- cardinality limiter and
+ * UTF8 cache fused into one set of recently used values, with the prior cycle's entries retained so
+ * UTF8 reuse survives the per-cycle reset. See {@link PropertyCardinalityHandler} for the full
+ * rationale and storage layout.
+ *
+ *
The structural difference here is that the cached {@link UTF8BytesString} holds the {@code
+ * "tag:value"} concatenation rather than the bare value, so a parallel {@code String[]} keys table
+ * is needed to probe by the raw value.
+ */
+final class TagCardinalityHandler {
+ private final String tag;
+ private final int cardinalityLimit;
+ private final int capacityMask;
+
+ private String[] curKeys;
+ private UTF8BytesString[] curValues;
+ private String[] priorKeys;
+ private UTF8BytesString[] priorValues;
+ private int curSize;
+
+ private UTF8BytesString cacheBlocked = null;
+
+ TagCardinalityHandler(String tag, int cardinalityLimit) {
+ if (cardinalityLimit <= 0) {
+ throw new IllegalArgumentException("cardinalityLimit must be positive: " + cardinalityLimit);
+ }
+ // Upper bound prevents overflow in the (cardinalityLimit * 2 - 1) capacity calc below.
+ if (cardinalityLimit > (1 << 29)) {
+ throw new IllegalArgumentException(
+ "cardinalityLimit must be at most 2^29: " + cardinalityLimit);
+ }
+ this.tag = tag;
+ this.cardinalityLimit = cardinalityLimit;
+ final int capacity = Integer.highestOneBit(cardinalityLimit * 2 - 1) << 1;
+ this.capacityMask = capacity - 1;
+ this.curKeys = new String[capacity];
+ this.curValues = new UTF8BytesString[capacity];
+ this.priorKeys = new String[capacity];
+ this.priorValues = new UTF8BytesString[capacity];
+ }
+
+ /**
+ * Canonicalizes {@code value} through the cardinality budget and per-cycle reuse cache. Null
+ * inputs map to {@link UTF8BytesString#EMPTY} -- callers don't need to pre-check.
+ */
+ UTF8BytesString register(String value) {
+ if (value == null) {
+ return UTF8BytesString.EMPTY;
+ }
+ final int slot = probe(this.curKeys, value);
+ if (this.curKeys[slot] != null) {
+ return this.curValues[slot];
+ }
+ if (this.curSize >= this.cardinalityLimit) {
+ return this.blockedByTracer();
+ }
+ UTF8BytesString utf8;
+ final int priorSlot = probe(this.priorKeys, value);
+ if (this.priorKeys[priorSlot] != null) {
+ utf8 = this.priorValues[priorSlot];
+ } else {
+ utf8 = UTF8BytesString.create(this.tag + ":" + value);
+ }
+ this.curKeys[slot] = value;
+ this.curValues[slot] = utf8;
+ this.curSize += 1;
+ return utf8;
+ }
+
+ private int probe(String[] keys, String value) {
+ int idx = value.hashCode() & this.capacityMask;
+ while (keys[idx] != null && !keys[idx].equals(value)) {
+ idx = (idx + 1) & this.capacityMask;
+ }
+ return idx;
+ }
+
+ /**
+ * Whether {@code result} (returned from a prior {@link #register} call) is this handler's blocked
+ * sentinel. The size check short-circuits the hot path so the sentinel is never materialized
+ * before any value has actually been blocked this cycle.
+ */
+ boolean isBlockedResult(UTF8BytesString result) {
+ return this.curSize >= this.cardinalityLimit && result == blockedByTracer();
+ }
+
+ private UTF8BytesString blockedByTracer() {
+ UTF8BytesString cacheBlocked = this.cacheBlocked;
+ if (cacheBlocked != null) return cacheBlocked;
+
+ this.cacheBlocked = cacheBlocked = UTF8BytesString.create(this.tag + ":blocked_by_tracer");
+ return cacheBlocked;
+ }
+
+ void reset() {
+ final String[] tmpKeys = this.priorKeys;
+ final UTF8BytesString[] tmpValues = this.priorValues;
+ this.priorKeys = this.curKeys;
+ this.priorValues = this.curValues;
+ this.curKeys = tmpKeys;
+ this.curValues = tmpValues;
+ Arrays.fill(this.curKeys, null);
+ Arrays.fill(this.curValues, null);
+ this.curSize = 0;
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java
index a6ced35967c..0cf5b898378 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java
@@ -86,6 +86,16 @@ default boolean isKind(SpanKindFilter filter) {
return filter.matches(kind == null ? null : kind.toString());
}
+ /**
+ * Returns the {@code span.kind} tag value as a String, or {@code null} if not set. Default
+ * implementation reads the tag map; {@link DDSpan} overrides to use a cached ordinal that
+ * resolves via a small lookup array, skipping the tag-map lookup on the hot path.
+ */
+ default String getSpanKindString() {
+ Object v = unsafeGetTag(datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND);
+ return v == null ? null : v.toString();
+ }
+
CharSequence getType();
/**
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
index f539ff84e8c..c3f89f8051d 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
@@ -964,6 +964,11 @@ public boolean isKind(SpanKindFilter filter) {
return filter.matches(context.getSpanKindOrdinal());
}
+ @Override
+ public String getSpanKindString() {
+ return context.getSpanKindString();
+ }
+
@Override
public void copyPropagationAndBaggage(final AgentSpan source) {
if (source instanceof DDSpan) {
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java
index d1c7fe126b4..6f9a263f593 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java
@@ -98,6 +98,15 @@ public void onStatsAggregateDropped() {}
*/
public void onStatsInboxFull() {}
+ /**
+ * Reports a batch of {@code count} tag values collapsed into the {@code blocked_by_tracer}
+ * sentinel for {@code tag} during the just-completed reporting cycle (per-tag cardinality budget
+ * exhausted, or per-value length cap exceeded). Called from the aggregator thread once per
+ * affected tag at cycle reset, so the implementation can do a single counter update rather than
+ * one per blocked value.
+ */
+ public void onTagCardinalityBlocked(String tag, long count) {}
+
/**
* @return Human-readable summary of the current health metrics.
*/
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java
index db384a7e42e..c00ef708abf 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java
@@ -363,6 +363,11 @@ public void onStatsInboxFull() {
statsInboxFull.increment();
}
+ @Override
+ public void onTagCardinalityBlocked(String tag, long count) {
+ statsd.count("stats.tag_cardinality_blocked", count, new String[] {"tag:" + tag});
+ }
+
@Override
public void close() {
if (null != cancellation) {
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy
similarity index 92%
rename from dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy
rename to dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy
index 3d75e43a88e..d8620e370f0 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy
@@ -18,7 +18,7 @@ import java.util.concurrent.TimeoutException
import java.util.function.Supplier
import spock.lang.Shared
-class ConflatingMetricAggregatorTest extends DDSpecification {
+class ClientStatsAggregatorTest extends DDSpecification {
static Set empty = new HashSet<>()
@@ -35,7 +35,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(
wellKnownTags,
empty,
features,
@@ -65,7 +65,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(
wellKnownTags,
[ignoredResourceName].toSet(),
features,
@@ -103,7 +103,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -149,7 +149,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -195,7 +195,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -253,46 +253,34 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"client" | "GET" | "/external/api" | true
}
- def "should create bucket for each set of peer tags"() {
+ def "should create separate buckets for distinct peer tag values"() {
+ // Peer-tag NAMES are configured per-tracer and stable for the duration of a trace publish;
+ // peer-tag VALUES vary per-span. Two spans with the same names but different values should
+ // produce two distinct aggregate buckets.
setup:
- // Peer-tag schema is reconciled with feature discovery once per reporting cycle (on the
- // aggregator thread, in the post-report hook), not per-span on the producer. Drive two
- // reporting cycles with different peerTags() configurations to verify the aggregator buckets
- // each cycle by the schema that was current at publish time.
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
- features.peerTags() >>> [["country"], ["country", "georegion"]]
- // Bump the discovered-at timestamp so reconcile during report cycle 1 sees a mismatch and
- // rebuilds the schema for span 2. Three calls: bootstrap (span1's publish), reconcile-during-
- // report-1 (mismatch -> rebuild + 2nd peerTags() call), reconcile-during-report-2 (no change).
- features.getLastTimeDiscovered() >>> [1L, 2L, 2L]
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ features.peerTags() >> ["country", "georegion"]
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
- when: "cycle 1 -- peerTags=[country]"
- CountDownLatch latch1 = new CountDownLatch(1)
+ when:
+ CountDownLatch latch = new CountDownLatch(1)
aggregator.publish([
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
- .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe")
- ])
- aggregator.report()
- def cycle1Triggered = latch1.await(2, SECONDS)
-
- and: "cycle 2 -- reconcile picks up peerTags=[country, georegion]"
- CountDownLatch latch2 = new CountDownLatch(1)
- aggregator.publish([
+ .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"),
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
- .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe")
+ .setTag(SPAN_KIND, "client").setTag("country", "germany").setTag("georegion", "europe")
])
aggregator.report()
- def cycle2Triggered = latch2.await(2, SECONDS)
+ def latchTriggered = latch.await(2, SECONDS)
then:
- cycle1Triggered
- cycle2Triggered
+ latchTriggered
+ 1 * writer.startBucket(2, _, _)
1 * writer.add(
AggregateEntry.of(
"resource",
@@ -304,7 +292,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
false,
false,
"client",
- [UTF8BytesString.create("country:france")],
+ [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")],
null,
null,
null
@@ -322,14 +310,14 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
false,
false,
"client",
- [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")],
+ [UTF8BytesString.create("country:germany"), UTF8BytesString.create("georegion:europe")],
null,
null,
null
)) >> { AggregateEntry e ->
e.getHitCount() == 1 && e.getTopLevelCount() == 0 && e.getDuration() == 100
}
- 2 * writer.finishBucket() >> { latch1.countDown(); latch2.countDown() }
+ 1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
aggregator.close()
@@ -342,7 +330,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> ["peer.hostname", "_dd.base_service"]
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -395,7 +383,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP,
sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -447,7 +435,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
long duration = 100
List trace = [
@@ -519,7 +507,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -646,7 +634,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -761,7 +749,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -831,7 +819,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -903,7 +891,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
long duration = 100
aggregator.start()
@@ -971,7 +959,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
HealthMetrics healthMetrics = Mock(HealthMetrics)
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
long duration = 100
aggregator.start()
@@ -1005,7 +993,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
HealthMetrics healthMetrics = Mock(HealthMetrics)
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -1050,7 +1038,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
long duration = 100
aggregator.start()
@@ -1152,7 +1140,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
long duration = 100
aggregator.start()
@@ -1212,7 +1200,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
long duration = 100
aggregator.start()
@@ -1263,7 +1251,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
long duration = 100
aggregator.start()
@@ -1294,7 +1282,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
aggregator.start()
@@ -1316,7 +1304,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> false
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, false)
final spans = [
new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK)
@@ -1348,7 +1336,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
when:
@@ -1381,7 +1369,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -1428,7 +1416,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -1483,7 +1471,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -1574,7 +1562,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -1647,14 +1635,14 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
aggregator.close()
}
- def reportAndWaitUntilEmpty(ConflatingMetricsAggregator aggregator) {
+ def reportAndWaitUntilEmpty(ClientStatsAggregator aggregator) {
waitUntilEmpty(aggregator)
aggregator.report()
waitUntilEmpty(aggregator)
}
- def waitUntilEmpty(ConflatingMetricsAggregator aggregator) {
+ def waitUntilEmpty(ClientStatsAggregator aggregator) {
int i = 0
while (!aggregator.inbox.isEmpty() && i++ < 100) {
Thread.sleep(10)
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy
index eceedeb1935..86a91c23b3f 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy
@@ -37,7 +37,7 @@ class FootprintForkedTest extends DDSpecification {
it.supportsMetrics() >> true
it.peerTags() >> []
}
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(
new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"),
[].toSet() as Set,
features,
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy
index 07f246bf9a9..dc9eb86fde3 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy
@@ -28,6 +28,6 @@ class MetricsAggregatorFactoryTest extends DDSpecification {
expect:
def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco, HealthMetrics.NO_OP,
)
- assert aggregator instanceof ConflatingMetricsAggregator
+ assert aggregator instanceof ClientStatsAggregator
}
}
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy
index 5e85c66557d..1e5f21e13e0 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy
@@ -1,6 +1,7 @@
package datadog.trace.common.metrics
import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED
+import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY
import static java.util.concurrent.TimeUnit.MILLISECONDS
import static java.util.concurrent.TimeUnit.SECONDS
@@ -284,12 +285,13 @@ class SerializingMetricWriterTest extends DDSpecification {
int statCount = unpacker.unpackArrayHeader()
assert statCount == content.size()
for (AggregateEntry entry : content) {
+ // counters now live on AggregateEntry
int metricMapSize = unpacker.unpackMapHeader()
// Calculate expected map size based on optional fields
- boolean hasHttpMethod = entry.getHttpMethod() != null
- boolean hasHttpEndpoint = entry.getHttpEndpoint() != null
- boolean hasServiceSource = entry.getServiceSource() != null
- boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null
+ boolean hasHttpMethod = entry.getHttpMethod() != EMPTY
+ boolean hasHttpEndpoint = entry.getHttpEndpoint() != EMPTY
+ boolean hasServiceSource = entry.getServiceSource() != EMPTY
+ boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != EMPTY
int expectedMapSize = 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0)
assert metricMapSize == expectedMapSize
int elementCount = 0
diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java
index 7b3a8a1f398..057478d46a4 100644
--- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java
+++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java
@@ -1,8 +1,11 @@
package datadog.trace.common.metrics;
+import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY;
import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import datadog.metrics.agent.AgentMeter;
@@ -86,24 +89,47 @@ void okAndErrorLatenciesTrackedSeparately() {
assertTrue(entry.getOkLatencies().getMaxValue() <= 5);
}
- private static AggregateEntry newEntry() {
- SpanSnapshot snapshot =
- new SpanSnapshot(
+ @Test
+ void absentOptionalFieldsResolveToEmptySentinel() {
+ // serviceSource / httpMethod / httpEndpoint / grpcStatusCode = null on input -> EMPTY on the
+ // entry. EMPTY is the universal "absent" sentinel; SerializingMetricWriter and equality use
+ // identity comparison against it.
+ AggregateEntry entry = newEntry();
+ assertSame(EMPTY, entry.getServiceSource());
+ assertSame(EMPTY, entry.getHttpMethod());
+ assertSame(EMPTY, entry.getHttpEndpoint());
+ assertSame(EMPTY, entry.getGrpcStatusCode());
+ }
+
+ @Test
+ void presentOptionalFieldsCarryTheirValue() {
+ AggregateEntry entry =
+ AggregateEntry.of(
"resource",
"svc",
"op",
- null,
+ "src",
"type",
- (short) 200,
+ 200,
false,
true,
"client",
null,
- null,
- null,
- null,
- null,
- 0L);
- return AggregateEntry.forSnapshot(snapshot);
+ "GET",
+ "/api/v1/foo",
+ "0");
+ assertNotSame(EMPTY, entry.getServiceSource());
+ assertNotSame(EMPTY, entry.getHttpMethod());
+ assertNotSame(EMPTY, entry.getHttpEndpoint());
+ assertNotSame(EMPTY, entry.getGrpcStatusCode());
+ assertEquals("src", entry.getServiceSource().toString());
+ assertEquals("GET", entry.getHttpMethod().toString());
+ assertEquals("/api/v1/foo", entry.getHttpEndpoint().toString());
+ assertEquals("0", entry.getGrpcStatusCode().toString());
+ }
+
+ private static AggregateEntry newEntry() {
+ return AggregateEntry.of(
+ "resource", "svc", "op", null, "type", 200, false, true, "client", null, null, null, null);
}
}
diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java
new file mode 100644
index 00000000000..08ecbdef628
--- /dev/null
+++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java
@@ -0,0 +1,168 @@
+package datadog.trace.common.metrics;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
+import org.junit.jupiter.api.Test;
+
+class CardinalityHandlerTest {
+
+ @Test
+ void propertyReturnsSameInstanceForRepeatedValueUntilLimit() {
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(3);
+ UTF8BytesString a1 = h.register("a");
+ UTF8BytesString a2 = h.register("a");
+ assertSame(a1, a2);
+ assertEquals("a", a1.toString());
+ }
+
+ @Test
+ void propertyOverLimitReturnsBlockedSentinel() {
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(2);
+ UTF8BytesString a = h.register("a");
+ UTF8BytesString b = h.register("b");
+ UTF8BytesString blocked1 = h.register("c");
+ UTF8BytesString blocked2 = h.register("d");
+
+ assertEquals("blocked_by_tracer", blocked1.toString());
+ assertSame(blocked1, blocked2); // same sentinel for all overflow values
+ assertNotSame(blocked1, a);
+ assertNotSame(blocked1, b);
+ }
+
+ @Test
+ void propertyResetRefreshesBudget() {
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(2);
+ h.register("a");
+ h.register("b");
+ UTF8BytesString blocked = h.register("c");
+ assertEquals("blocked_by_tracer", blocked.toString());
+
+ h.reset();
+
+ // After reset, three distinct values fit again. Prior-cycle instances are reused
+ // (see propertyPriorCycleInstancesAreReusedAcrossReset for the dedicated check); here
+ // we just confirm that the budget refreshed so values previously blocked now have
+ // a slot.
+ UTF8BytesString afterReset = h.register("a");
+ assertEquals("a", afterReset.toString());
+ UTF8BytesString c = h.register("c");
+ assertEquals("c", c.toString());
+ UTF8BytesString blockedAgain = h.register("d");
+ UTF8BytesString blockedYetAgain = h.register("e");
+ assertEquals("blocked_by_tracer", blockedAgain.toString());
+ assertSame(blockedAgain, blockedYetAgain);
+ }
+
+ @Test
+ void propertyPriorCycleInstancesAreReusedAcrossReset() {
+ // Dual role: the handler is also a UTF8 cache. Values held in the prior cycle are
+ // reused on the first registration in the new cycle, so aggregate entries that hold a
+ // reference to a UTF8BytesString still match on identity after the per-cycle reset.
+ // This is the cache-survives-reset property the canonical-key lookup depends on.
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(4);
+ UTF8BytesString aBefore = h.register("a");
+ UTF8BytesString bBefore = h.register("b");
+
+ h.reset();
+
+ assertSame(aBefore, h.register("a"));
+ assertSame(bBefore, h.register("b"));
+ // Same-cycle subsequent registration continues to return the reused instance.
+ assertSame(aBefore, h.register("a"));
+ }
+
+ @Test
+ void propertyPriorCycleReuseSurvivesOneResetButNotTwo() {
+ // Reuse window is one cycle deep -- the handler swaps current/prior on reset, so a
+ // value last seen two cycles ago is no longer cached and will be re-allocated.
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(4);
+ UTF8BytesString first = h.register("a");
+
+ h.reset();
+ h.reset();
+
+ UTF8BytesString afterTwoResets = h.register("a");
+ assertNotSame(first, afterTwoResets);
+ assertEquals("a", afterTwoResets.toString());
+ }
+
+ @Test
+ void tagPrefixesValuesAndReusesUnderLimit() {
+ TagCardinalityHandler h = new TagCardinalityHandler("peer.hostname", 4);
+ UTF8BytesString first = h.register("host-a");
+ UTF8BytesString second = h.register("host-a");
+ UTF8BytesString other = h.register("host-b");
+
+ assertSame(first, second);
+ assertNotSame(first, other);
+ assertEquals("peer.hostname:host-a", first.toString());
+ assertEquals("peer.hostname:host-b", other.toString());
+ }
+
+ @Test
+ void tagOverLimitReturnsTaggedSentinel() {
+ TagCardinalityHandler h = new TagCardinalityHandler("peer.service", 1);
+ h.register("svc-1");
+ UTF8BytesString blocked = h.register("svc-2");
+ assertEquals("peer.service:blocked_by_tracer", blocked.toString());
+ }
+
+ @Test
+ void tagResetRefreshesBudgetAndSentinelStaysStable() {
+ TagCardinalityHandler h = new TagCardinalityHandler("x", 1);
+ h.register("v1");
+ UTF8BytesString blockedBefore = h.register("v2");
+ h.reset();
+ h.register("v1");
+ UTF8BytesString blockedAfter = h.register("v2");
+ // Both are the same sentinel instance (cacheBlocked is not cleared on reset).
+ assertSame(blockedBefore, blockedAfter);
+ }
+
+ @Test
+ void tagPriorCycleInstancesAreReusedAcrossReset() {
+ // Mirrors propertyPriorCycleInstancesAreReusedAcrossReset: the pre-built "tag:value"
+ // UTF8BytesString from the prior cycle is reused on the first registration in the new
+ // cycle -- no re-concatenation, no re-encoding.
+ TagCardinalityHandler h = new TagCardinalityHandler("peer.hostname", 4);
+ UTF8BytesString hostABefore = h.register("host-a");
+ UTF8BytesString hostBBefore = h.register("host-b");
+
+ h.reset();
+
+ assertSame(hostABefore, h.register("host-a"));
+ assertSame(hostBBefore, h.register("host-b"));
+ }
+
+ @Test
+ void propertyRegisterOfNullReturnsEmpty() {
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(4);
+ // Null input short-circuits to UTF8BytesString.EMPTY -- the universal "absent" sentinel that
+ // AggregateEntry's optional UTF8 fields use in place of null.
+ assertSame(UTF8BytesString.EMPTY, h.register(null));
+ }
+
+ @Test
+ void propertyRegisterOfNullDoesNotConsumeBudget() {
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(2);
+ h.register(null);
+ h.register(null);
+ h.register(null);
+ // Three null registrations didn't consume the budget; two real values still fit.
+ assertEquals("a", h.register("a").toString());
+ assertEquals("b", h.register("b").toString());
+ // Third real value spills to the blocked sentinel (limit = 2).
+ assertEquals("blocked_by_tracer", h.register("c").toString());
+ }
+
+ @Test
+ void tagRegisterOfNullReturnsEmpty() {
+ TagCardinalityHandler h = new TagCardinalityHandler("peer.hostname", 4);
+ // Null returns EMPTY (no "tag:" prefix applied -- the sentinel is the same EMPTY singleton
+ // every handler returns for null input).
+ assertSame(UTF8BytesString.EMPTY, h.register(null));
+ }
+}
diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorInboxFullTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ClientStatsAggregatorInboxFullTest.java
similarity index 93%
rename from dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorInboxFullTest.java
rename to dd-trace-core/src/test/java/datadog/trace/common/metrics/ClientStatsAggregatorInboxFullTest.java
index f4e4c2da253..348245ec47c 100644
--- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorInboxFullTest.java
+++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ClientStatsAggregatorInboxFullTest.java
@@ -17,12 +17,12 @@
import org.junit.jupiter.api.Test;
/**
- * Coverage for the inbox-full fast-path in {@code ConflatingMetricsAggregator.publish}: when the
+ * Coverage for the inbox-full fast-path in {@code ClientStatsAggregator.publish}: when the
* producer-side inbox is at capacity, the next {@code publish} call short-circuits before any tag
* extraction or {@code SpanSnapshot} allocation and reports {@code onStatsInboxFull()} to health
* metrics.
*/
-class ConflatingMetricsAggregatorInboxFullTest {
+class ClientStatsAggregatorInboxFullTest {
@Test
void publishFiresOnStatsInboxFullOnceInboxIsAtCapacity() {
@@ -38,8 +38,8 @@ void publishFiresOnStatsInboxFullOnceInboxIsAtCapacity() {
// never drains -- snapshots accumulate in the inbox until capacity, then the next publish hits
// the size-vs-capacity fast path.
int queueSize = 8;
- ConflatingMetricsAggregator aggregator =
- new ConflatingMetricsAggregator(
+ ClientStatsAggregator aggregator =
+ new ClientStatsAggregator(
Collections.emptySet(),
features,
healthMetrics,
diff --git a/docs/client_metrics_design.md b/docs/client_metrics_design.md
new file mode 100644
index 00000000000..ca5f200c97f
--- /dev/null
+++ b/docs/client_metrics_design.md
@@ -0,0 +1,315 @@
+# Client-side metrics (stats aggregator) design
+
+This document describes the design of the **client-side metrics pipeline** that
+lives under `dd-trace-core/.../common/metrics/`. The pipeline aggregates per-span
+duration / count / error statistics on the tracer and sends rolled-up "client
+stats" payloads to the Datadog Agent on a fixed reporting interval, so the agent
+does not have to sample every span to know request rates and latencies.
+
+Code lives in package `datadog.trace.common.metrics`.
+
+## High-level shape
+
+```
+ producer thread(s) aggregator thread
+ inbox
+ trace ─▶ ClientStatsAggregator.publish(trace) ──MPSC──▶ Aggregator.run
+ │ │
+ │ per metrics-eligible span │ Drainer.accept
+ │ │
+ │ allocates one SpanSnapshot ▼
+ │ (immutable, ~15 refs) AggregateTable.findOrInsert
+ │ │
+ │ inbox.offer(snapshot) │ canonicalize → hash
+ └────────────────────────────────────▶ │ → lookup or insert
+ │
+ scheduled REPORT signal ──▶│
+ │ Aggregator.report
+ │ → MetricWriter.add(entry)
+ │ → OkHttpSink (HTTP POST)
+ │ → reset cardinality handlers
+```
+
+Three rules govern the design:
+
+1. **The producer never touches shared state.** The hot path on the application
+ thread builds an immutable `SpanSnapshot` and offers it to a bounded MPSC
+ queue. No locks, no maps, no hashing of the metric key.
+2. **The aggregator thread is the sole writer of every shared structure.** The
+ aggregate table, the cardinality handlers, the metric writer state — all of
+ them are accessed only from that thread. Control operations (clear, report,
+ stop) are themselves enqueued as `SignalItem`s so they serialize with data.
+3. **Cardinality is bounded.** Per-field handlers cap the unique values; once a
+ field's budget is exhausted, overflow values collapse into a single
+ `blocked_by_tracer` sentinel so the aggregate table can't blow up.
+
+## Component map
+
+| Component | File | Role |
+|---|---|---|
+| `ClientStatsAggregator` | `ClientStatsAggregator.java` | Producer facade. Decides which spans are eligible, builds `SpanSnapshot`s, offers them to the inbox. Also owns the agent-feature check, the scheduled report timer, and the agent-downgrade handler. |
+| `SpanSnapshot` | `SpanSnapshot.java` | Immutable, allocation-pooled-by-GC value posted from producer → aggregator. Carries raw label fields plus a duration word with `TOP_LEVEL` / `ERROR` bits OR-ed in. |
+| `PeerTagSchema` | `PeerTagSchema.java` | Parallel `String[] names` + `TagCardinalityHandler[] handlers` describing the peer-aggregation tags in effect. One singleton for internal-kind spans; one volatile "current" schema for client/producer/consumer spans, refreshed from `DDAgentFeaturesDiscovery.peerTags()`. |
+| `Aggregator` | `Aggregator.java` | Consumer thread `Runnable`. Drains the inbox; dispatches `SpanSnapshot`s into `AggregateTable`; processes signals (`REPORT`, `CLEAR`, `STOP`); calls the writer on report. |
+| `AggregateTable` | `AggregateTable.java` | Hashtable-backed store keyed on the canonicalized labels. Owns a single reusable `Canonical` scratch buffer. Handles cap-overflow by evicting one stale entry or rejecting new ones. |
+| `AggregateEntry` | `AggregateEntry.java` | `Hashtable.Entry` holding the 13 UTF8 label fields + the mutable `AggregateMetric`. Owns the static `PropertyCardinalityHandler`s for the fixed label fields, and `Canonical` for hot-path canonicalization. |
+| `AggregateMetric` | `AggregateMetric.java` | Per-bucket accumulator: hit count, error count, top-level count, duration sum, ok/error latency histograms. Single-threaded; cleared each report. |
+| `PropertyCardinalityHandler` | `PropertyCardinalityHandler.java` | Per-field UTF8 interner with a max-unique-values cap. Returns a `blocked_by_tracer` sentinel `UTF8BytesString` once the cap is hit. Reset by the aggregator each cycle. |
+| `TagCardinalityHandler` | `TagCardinalityHandler.java` | Same pattern as the property handler, but the cached UTF8 form is the full `tag:value` pair (peer tags are wire-encoded as `tag:value`, not just the value). |
+| `SerializingMetricWriter` / `OkHttpSink` | `SerializingMetricWriter.java`, `OkHttpSink.java` | Wire serialization (MessagePack) + HTTP POST to the agent's `/v0.6/stats` endpoint. |
+| `MetricsAggregatorFactory` / `NoOpMetricsAggregator` | factory + no-op | Picks the real implementation when client stats are enabled and the agent supports the endpoint, no-op otherwise. |
+
+## Producer-side flow (`ClientStatsAggregator.publish`)
+
+The producer holds **no shared state**. Per trace it:
+
+1. Snapshots the current peer-aggregation schema **once per trace** (not per
+ span):
+ ```java
+ PeerTagSchema peerAggSchema = peerAggSchema(features.peerTagsRevision());
+ ```
+ `peerAggSchema(...)` reads a `volatile long` revision held on the
+ aggregator and compares it to the value the cached `PeerTagSchema` was
+ built from. Match → return the cached schema (the common case, since
+ `peerTagsRevision()` only bumps when `DDAgentFeaturesDiscovery` observes a
+ peer-tag set that doesn't equal the previous one). Mismatch → take a
+ monitor on the aggregator, rebuild via `PeerTagSchema.of(names)`, and
+ publish the new schema + revision. The steady-state cost is one volatile
+ read + one long compare.
+
+2. Iterates the trace; for each metrics-eligible span:
+
+ - **Eligibility** (`shouldComputeMetric`):
+ ```java
+ (measured || isTopLevel || isKind(SERVER|CLIENT|PRODUCER|CONSUMER))
+ && longRunningVersion <= 0
+ && durationNano > 0
+ ```
+ `isMeasured` / `isTopLevel` are flag reads on `DDSpanContext`; `isKind`
+ reads the **cached `byte` span-kind ordinal** through a `SpanKindFilter`
+ bitmask test — no tag-map lookup.
+
+ - **Resource-name ignore-list** breaks out of the trace early; the entire
+ trace is dropped on a match.
+
+ - **Picks the peer-tag schema** (`peerTagSchemaFor`): for client/producer/
+ consumer kinds → `peerAggSchema` (already synced for this trace); for
+ internal-kind spans → `PeerTagSchema.INTERNAL` (single `base.service`
+ entry); otherwise `null`.
+
+ - **Captures peer-tag *values***, not pairs: walks `schema.names` and pulls
+ `unsafeGetTag(name)` for each, into a parallel `String[]`. Names + handlers
+ are the schema's job; the producer only carries raw values. Returns `null`
+ when no peer tags are set, in which case the schema reference is dropped
+ too so the consumer doesn't loop over an all-null array.
+
+ - **Builds and offers** a `SpanSnapshot` to the MPSC inbox. The span-kind
+ string is taken from `CoreSpan.getSpanKindString()`, which DDSpan
+ overrides to resolve via the cached byte ordinal through a small lookup
+ array — **no tag-map lookup**. Origin equality uses `contentEquals`.
+ `httpMethod` / `httpEndpoint` are only fetched when
+ `traceClientStatsEndpoints=true`; `grpcStatusCode` only when span type is
+ `rpc`.
+
+ - On inbox-full: the snapshot is dropped and `healthMetrics.onStatsInboxFull()`
+ fires. The producer never blocks.
+
+3. Reports `healthMetrics.onClientStatTraceComputed(counted, total, dropped)`.
+
+ `forceKeep` is the only signal returned upward — `true` if any of the
+ trace's metrics-eligible spans had errors, so the trace writer keeps the
+ raw trace too.
+
+### Why the producer is lean
+
+The cumulative cost of running these checks on every finished span is the
+single biggest concern. The producer deliberately avoids:
+
+- locking or synchronization of any kind on the hot path,
+- hashing the metric key (deferred to the aggregator thread),
+- map / cache lookups for label canonicalization (deferred),
+- tag-map lookups when a span carries the relevant information on the context
+ itself (`span.kind` via the cached byte ordinal; `isMeasured`, `isTopLevel`
+ via flag reads),
+- allocation beyond the `SpanSnapshot` itself and a single `String[]` for peer
+ tag values when any are present.
+
+## Aggregator-side flow (`Aggregator.run`)
+
+A single agent thread runs the `Aggregator.run` loop. The thread drains the
+inbox via `inbox.drain(drainer)`; when the queue is empty it sleeps
+`DEFAULT_SLEEP_MILLIS` (10 ms) and retries. The Drainer dispatches by item
+type:
+
+- `SpanSnapshot` → `AggregateTable.findOrInsert(snapshot)` returns either an
+ existing or freshly-inserted `AggregateMetric`, then the snapshot's
+ `tagAndDuration` is recorded. If the table is at capacity and no stale entry
+ can be evicted, `healthMetrics.onStatsAggregateDropped()` fires.
+
+- `ReportSignal` → on the scheduled cadence (the default report interval is
+ 10 s; configurable via `tracerMetricsMaxAggregates` / reporting interval),
+ `Aggregator.report`:
+ 1. Expunges entries with `hitCount == 0` (stale).
+ 2. If anything remains, opens a bucket via `MetricWriter.startBucket(...)`,
+ walks `AggregateTable.forEach`, writes each entry, clears its metric.
+ 3. Calls `MetricWriter.finishBucket()` (which may do I/O and block).
+ 4. **Resets all cardinality handlers** so the next interval starts with a
+ fresh budget. Existing entries keep their previously-issued UTF8
+ references, and matching is by content-equality, so canonicalizing a
+ post-reset snapshot against an existing entry still resolves to the
+ same bucket.
+
+- `ClearSignal` → drops the aggregate state. The downgrade handler
+ (`onEvent(DOWNGRADED, ...)`) offers `CLEAR` to the inbox rather than calling
+ `clearAggregates()` directly, so the aggregator thread remains the sole
+ writer of the table.
+
+- `StopSignal` → final report + thread exit.
+
+## The canonical-key trick (cardinality-safe deduplication)
+
+The lookup hash is computed from the **canonicalized** label fields, not the
+raw `SpanSnapshot` fields. This is the property that makes
+cardinality-blocking actually save space:
+
+```java
+// AggregateTable.findOrInsert
+canonical.populate(snapshot); // runs every field through its handler
+long keyHash = canonical.keyHash;
+int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash);
+for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) {
+ if (e.keyHash == keyHash) {
+ AggregateEntry candidate = (AggregateEntry) e;
+ if (canonical.matches(candidate)) {
+ return candidate.aggregate;
+ }
+ }
+}
+// miss → toEntry, splice into bucket head
+```
+
+`Canonical.populate` runs each label field through its
+`PropertyCardinalityHandler` (or `TagCardinalityHandler` for peer tags). Once a
+handler's working set is full, **every subsequent unique value resolves to the
+same `UTF8BytesString` sentinel** — so the hash computed from the canonical
+form is identical for all blocked values. They land in the same bucket and
+merge into one `AggregateEntry` rather than fragmenting into N entries.
+
+The `Canonical` scratch buffer is reused per `findOrInsert` call. On a hit,
+nothing is allocated. On a miss, `toEntry` snapshots the buffer's references
+into a fresh entry; the buffer is overwritten on the next call.
+
+### Hash chain (no varargs)
+
+`AggregateEntry.hashOf` uses chained primitive calls into
+`LongHashingUtils.addToHash(long, T)` rather than a varargs `addToHash(long,
+Object...)`. This avoids the `Object[]` allocation and boxing of the primitive
+fields (`httpStatusCode`, `synthetic`, `traceRoot`) that varargs would force.
+
+## Reporting cadence and cardinality reset
+
+Two distinct cadences:
+
+- **Reporting interval** (default 10 s): when the report timer fires,
+ `ReportTask` calls `report()` which `inbox.offer(REPORT)`. The aggregator
+ drains up to that signal, then writes the bucket and resets the cardinality
+ handlers. The handlers reset *every reporting cycle*, so the per-field
+ budgets refresh.
+
+- **Schema sync**: `ClientStatsAggregator.peerAggSchema(long)` runs on the
+ producer thread per trace, keyed on `DDAgentFeaturesDiscovery.peerTagsRevision()`.
+ The cached schema is replaced when remote-config reconfigures the peer-tag
+ set (i.e., when the revision bumps). The schema's
+ `TagCardinalityHandler`s are reset on the aggregator thread each report
+ cycle via a hook passed into `Aggregator`.
+
+## Memory and lifetime
+
+- `AggregateMetric` is **not thread-safe**. It is mutated only by the
+ aggregator thread.
+- `AggregateTable` is **not thread-safe**. All paths (producer-side `CLEAR`,
+ schedule-driven `REPORT`, drainer-driven inserts) route through the inbox.
+- `Canonical` and the cardinality handlers are aggregator-thread-only.
+- The cached `PeerTagSchema` lives on `ClientStatsAggregator` as a `volatile`
+ field paired with the `peerTagsRevision` it was built from; rebuild is
+ guarded by a monitor on the aggregator instance. The schema's
+ `TagCardinalityHandler`s themselves are aggregator-thread-only and are
+ reset alongside the property handlers each cycle.
+- Entries retain their `UTF8BytesString` references across handler resets;
+ matches via content-equality so post-reset snapshots still resolve.
+- Cap: `tracerMetricsMaxAggregates` bounds table size. Cap-overrun policy:
+ evict one stale entry (`hitCount == 0`) or drop the new data point.
+
+## Health metrics
+
+The producer reports per-trace stats via `HealthMetrics`:
+
+- `onClientStatTraceComputed(counted, totalSpans, dropped)` — per `publish`.
+- `onStatsInboxFull()` — when the MPSC queue rejects an offer.
+- `onClientStatPayloadSent()` / `onClientStatDowngraded()` /
+ `onClientStatErrorReceived()` — on agent-side outcomes.
+- `onStatsAggregateDropped()` — when the aggregator thread can't fit a new
+ entry.
+
+## Failure modes
+
+| Failure | Effect |
+|---|---|
+| Inbox full | Snapshot dropped, `onStatsInboxFull` increments, producer continues. |
+| Agent unavailable / errors | `OkHttpSink` reports `BAD_PAYLOAD` / `ERROR`; metric reporting continues. |
+| Agent downgrade (no /v0.6/stats) | `disable()` offers `CLEAR` to the inbox; the aggregator wipes its table. Producer's `features.supportsMetrics()` returns false on subsequent calls, so new snapshots are not built. |
+| Aggregate table full, no stale entry | New snapshot dropped, `onStatsAggregateDropped` increments. Existing entries continue to accumulate. |
+| Cardinality budget exhausted | Overflow values canonicalize to a `blocked_by_tracer` sentinel and merge into one bucket. Total entry count stays bounded by `maxAggregates`. |
+| Producer throws mid-trace | Caught by the writer's normal error path; `onClientStatTraceComputed` is not called for that trace. |
+
+## Why the redesign (history)
+
+The pipeline was previously `ConflatingMetricsAggregator` with:
+
+- producer-side `MetricKey` construction (string-canonicalization on the hot
+ path),
+- a `LRUCache` of `MetricKey → AggregateMetric`,
+- per-tag `DDCache` instances for canonicalization (one per label field),
+- early computation of `tag:value` peer pairs on the producer thread.
+
+The current `ClientStatsAggregator` shape was motivated by JMH benchmarks that
+showed the producer dominating CPU time. The major shifts:
+
+1. **Move all canonicalization off the producer.** Producer just shuffles
+ references into a `SpanSnapshot`.
+2. **Replace `MetricKey` with inlined fields on `AggregateEntry`.** Removes a
+ per-snapshot allocation; lets us own the hash code on the entry itself.
+3. **Replace the `LRUCache` with a `Hashtable`** keyed on canonicalized labels.
+ Hash is computed once per insert/lookup; chained primitive hashing avoids
+ boxing.
+4. **Replace per-tag `DDCache`s with per-field `PropertyCardinalityHandler`s**
+ that share a `blocked_by_tracer` sentinel for cardinality overflow. Reset
+ each reporting cycle.
+5. **Capture peer-tag values, not pairs.** Tag-name + handler live on
+ `PeerTagSchema`; the producer carries values in a parallel `String[]`. The
+ aggregator does the `tag:value` interning via `TagCardinalityHandler` on
+ its own thread.
+6. **Sync peer-tag schema once per trace.** The producer reads
+ `features.peerTagsRevision()` and compares it to the revision the cached
+ `PeerTagSchema` was built from; the steady-state cost is one volatile read
+ and one long compare. The cache lives on `ClientStatsAggregator`, not as
+ static state on `PeerTagSchema`.
+7. **Single owner of all shared state.** `disable()` routes through `CLEAR`
+ rather than mutating the aggregate table directly.
+
+### Benchmark summary
+
+`ClientStatsAggregatorDDSpanBenchmark` (64 client-kind DDSpans per op, single
+trace, real `CoreTracer` with a no-op writer):
+
+| Variant | µs/op |
+|---|---|
+| master (`ConflatingMetricsAggregator`, baseline) | 6.428 |
+| with `SpanSnapshot` + background aggregation | 2.454 |
+| with peer-tag schema hoist | 2.410 |
+| with cached span-kind ordinal + isSynthetic fix | 1.995 |
+
+The remaining producer-thread hotspots (from JFR sampling) are tag-map
+lookups for `peer.hostname` / other peer-tag values inside
+`capturePeerTagValues`. A bulk peer-tag accessor on `DDSpan` would crack that
+chunk further, but is a structural change beyond the current package.