diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index 971ee5cf6e4..b9a2f7f8c54 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -1,6 +1,8 @@ package datadog.trace.common.metrics; import static datadog.trace.api.ProtocolVersion.V0_4; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -52,6 +54,7 @@ static List> generateTrace(int len) { final List> trace = new ArrayList<>(); for (int i = 0; i < len; i++) { SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1); + span.setTag(SPAN_KIND, SPAN_KIND_CLIENT); span.setTag("peer.hostname", Strings.random(10)); trace.add(span); } diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java new file mode 100644 index 00000000000..02c6aaffc1a --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java @@ -0,0 +1,98 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.trace.api.WellKnownTags; +import datadog.trace.common.writer.Writer; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.CoreTracer; +import datadog.trace.core.DDSpan; +import datadog.trace.core.monitor.HealthMetrics; +import datadog.trace.util.Strings; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Parallels {@link ConflatingMetricsAggregatorBenchmark} but uses real {@link DDSpan} instances + * instead of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link + * CoreSpan#isKind} path (cached span.kind ordinal + bit-test) rather than the groovy mock's + * dispatch. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 1, time = 30, timeUnit = SECONDS) +@Measurement(iterations = 3, time = 30, timeUnit = SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(MICROSECONDS) +@Fork(value = 1) +public class ConflatingMetricsAggregatorDDSpanBenchmark { + + private static final CoreTracer TRACER = + CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build(); + + private final DDAgentFeaturesDiscovery featuresDiscovery = + new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()); + private final ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + featuresDiscovery, + HealthMetrics.NO_OP, + new ConflatingMetricsAggregatorBenchmark.NullSink(), + 2048, + 2048, + false); + private final List> spans = generateTrace(64); + + static List> generateTrace(int len) { + final List> trace = new ArrayList<>(); + for (int i = 0; i < len; i++) { + DDSpan span = (DDSpan) TRACER.startSpan("benchmark", "op"); + span.setTag(SPAN_KIND, SPAN_KIND_CLIENT); + span.setTag("peer.hostname", Strings.random(10)); + // Fix duration; bypasses the wall clock and avoids per-fork drift. + span.finishWithDuration(10); + trace.add(span); + } + return trace; + } + + static class NoopWriter implements Writer { + @Override + public void write(List trace) {} + + @Override + public void start() {} + + @Override + public boolean flush() { + return true; + } + + @Override + public void close() {} + + @Override + public void incrementDropCounts(int spanCount) {} + } + + @Benchmark + public void benchmark(Blackhole blackhole) { + blackhole.consume(aggregator.publish(spans)); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java index 478ff520a37..dba66a5ab9c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java @@ -46,6 +46,27 @@ public AggregateMetric recordDurations(int count, AtomicLongArray durations) { return this; } + /** + * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link + * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. + */ + public AggregateMetric recordOneDuration(long tagAndDuration) { + ++hitCount; + if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + tagAndDuration ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { + tagAndDuration ^= ERROR_TAG; + errorLatencies.accept(tagAndDuration); + ++errorCount; + } else { + okLatencies.accept(tagAndDuration); + } + duration += tagAndDuration; + return this; + } + public int getErrorCount() { return errorCount; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index 8a69dbc6e56..e632555cc21 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -1,16 +1,26 @@ package datadog.trace.common.metrics; +import static datadog.trace.api.Functions.UTF8_ENCODE; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.StopSignal; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.util.LRUCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,11 +31,8 @@ final class Aggregator implements Runnable { private static final Logger log = LoggerFactory.getLogger(Aggregator.class); - private final MessagePassingQueue batchPool; private final MessagePassingQueue inbox; private final LRUCache aggregates; - private final ConcurrentMap pending; - private final Set commonKeys; private final MetricWriter writer; // the reporting interval controls how much history will be buffered // when the agent is unresponsive (only 10 pending requests will be @@ -41,20 +48,14 @@ final class Aggregator implements Runnable { Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, HealthMetrics healthMetrics) { this( writer, - batchPool, inbox, - pending, - commonKeys, maxAggregates, reportingInterval, reportingIntervalTimeUnit, @@ -64,30 +65,37 @@ final class Aggregator implements Runnable { Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, long sleepMillis, HealthMetrics healthMetrics) { this.writer = writer; - this.batchPool = batchPool; this.inbox = inbox; - this.commonKeys = commonKeys; this.aggregates = new LRUCache<>( - new CommonKeyCleaner(commonKeys, healthMetrics), - maxAggregates * 4 / 3, - 0.75f, - maxAggregates); - this.pending = pending; + new AggregateExpiry(healthMetrics), maxAggregates * 4 / 3, 0.75f, maxAggregates); this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; } + private static final class AggregateExpiry + implements LRUCache.ExpiryListener { + private final HealthMetrics healthMetrics; + + AggregateExpiry(HealthMetrics healthMetrics) { + this.healthMetrics = healthMetrics; + } + + @Override + public void accept(Map.Entry expired) { + if (expired.getValue().getHitCount() > 0) { + healthMetrics.onStatsAggregateDropped(); + } + } + } + public void clearAggregates() { this.aggregates.clear(); } @@ -129,20 +137,54 @@ public void accept(InboxItem item) { } else { signal.ignore(); } - } else if (item instanceof Batch && !stopped) { - Batch batch = (Batch) item; - MetricKey key = batch.getKey(); - // important that it is still *this* batch pending, must not remove otherwise - pending.remove(key, batch); + } else if (item instanceof SpanSnapshot && !stopped) { + SpanSnapshot snapshot = (SpanSnapshot) item; + MetricKey key = buildMetricKey(snapshot); AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); - batch.contributeTo(aggregate); + aggregate.recordOneDuration(snapshot.tagAndDuration); dirty = true; - // return the batch for reuse - batchPool.offer(batch); } } } + private static MetricKey buildMetricKey(SpanSnapshot s) { + return new MetricKey( + s.resourceName, + SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE), + s.operationName, + s.serviceNameSource, + s.spanType, + s.httpStatusCode, + s.synthetic, + s.traceRoot, + SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create), + materializePeerTags(s.peerTagPairs), + s.httpMethod, + s.httpEndpoint, + s.grpcStatusCode); + } + + private static List materializePeerTags(String[] pairs) { + if (pairs == null || pairs.length == 0) { + return Collections.emptyList(); + } + if (pairs.length == 2) { + // single-entry fast path (matches the original singletonList shape for INTERNAL spans) + return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); + } + List tags = new ArrayList<>(pairs.length / 2); + for (int i = 0; i < pairs.length; i += 2) { + tags.add(encodePeerTag(pairs[i], pairs[i + 1])); + } + return tags; + } + + private static UTF8BytesString encodePeerTag(String name, String value) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); + return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); + } + private void report(long when, SignalItem signal) { boolean skipped = true; if (dirty) { @@ -177,7 +219,6 @@ private void expungeStaleAggregates() { AggregateMetric metric = pair.getValue(); if (metric.getHitCount() == 0) { it.remove(); - commonKeys.remove(pair.getKey()); } } } @@ -185,24 +226,4 @@ private void expungeStaleAggregates() { private long wallClockTime() { return MILLISECONDS.toNanos(System.currentTimeMillis()); } - - private static final class CommonKeyCleaner - implements LRUCache.ExpiryListener { - - private final Set commonKeys; - private final HealthMetrics healthMetrics; - - private CommonKeyCleaner(Set commonKeys, HealthMetrics healthMetrics) { - this.commonKeys = commonKeys; - this.healthMetrics = healthMetrics; - } - - @Override - public void accept(Map.Entry expired) { - commonKeys.remove(expired.getKey()); - if (expired.getValue().getHitCount() > 0) { - healthMetrics.onStatsAggregateDropped(); - } - } - } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java deleted file mode 100644 index 5f103805e98..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java +++ /dev/null @@ -1,90 +0,0 @@ -package datadog.trace.common.metrics; - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongArray; - -/** - * This is a thread-safe container for partial conflating and accumulating partial aggregates on the - * same key. - * - *

Updates to an already consumed batch are rejected. - * - *

A batch can currently take at most 64 values. Attempts to add the 65th update will be - * rejected. - */ -public final class Batch implements InboxItem { - - private static final int MAX_BATCH_SIZE = 64; - private static final AtomicIntegerFieldUpdater COUNT = - AtomicIntegerFieldUpdater.newUpdater(Batch.class, "count"); - private static final AtomicIntegerFieldUpdater COMMITTED = - AtomicIntegerFieldUpdater.newUpdater(Batch.class, "committed"); - - /** - * This counter has two states: - * - *

    - *
  1. negative: the batch has been used, must not add values - *
  2. otherwise: the number of values added to the batch - *
- */ - private volatile int count = 0; - - /** incremented when a duration has been added. */ - private volatile int committed = 0; - - private MetricKey key; - private final AtomicLongArray durations; - - Batch(MetricKey key) { - this(new AtomicLongArray(MAX_BATCH_SIZE)); - this.key = key; - } - - Batch() { - this(new AtomicLongArray(MAX_BATCH_SIZE)); - } - - private Batch(AtomicLongArray durations) { - this.durations = durations; - } - - public MetricKey getKey() { - return key; - } - - public Batch reset(MetricKey key) { - this.key = key; - COUNT.lazySet(this, 0); - return this; - } - - public boolean isUsed() { - return count < 0; - } - - public boolean add(long tag, long durationNanos) { - // technically this would be wrong if there were 2^31 unsuccessful - // attempts to add a value, but this an acceptable risk - int position = COUNT.getAndIncrement(this); - if (position >= 0 && position < durations.length()) { - durations.set(position, tag | durationNanos); - COMMITTED.getAndIncrement(this); - return true; - } - return false; - } - - public void contributeTo(AggregateMetric aggregate) { - int count = Math.min(COUNT.getAndSet(this, Integer.MIN_VALUE), MAX_BATCH_SIZE); - if (count >= 0) { - // wait for the duration to have been set. - // note this mechanism only supports a single reader - while (committed != count) { - Thread.yield(); - } - COMMITTED.lazySet(this, 0); - aggregate.recordDurations(count, durations); - } - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index f60edf1d700..525dc802e3c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -3,15 +3,9 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT; import static datadog.trace.api.DDSpanTypes.RPC; import static datadog.trace.api.DDTags.BASE_SERVICE; -import static datadog.trace.api.Functions.UTF8_ENCODE; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; -import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; -import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; -import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL; -import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; -import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; @@ -19,7 +13,6 @@ import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; import static datadog.trace.util.AgentThreadFactory.newAgentThread; -import static java.util.Collections.unmodifiableSet; import static java.util.concurrent.TimeUnit.SECONDS; import datadog.common.queue.Queues; @@ -36,21 +29,17 @@ import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.core.CoreSpan; import datadog.trace.core.DDTraceCoreInfo; +import datadog.trace.core.SpanKindFilter; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import javax.annotation.Nonnull; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,18 +51,16 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - private static final DDCache SERVICE_NAMES = - DDCaches.newFixedSizeCache(32); + static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32); - private static final DDCache SPAN_KINDS = - DDCaches.newFixedSizeCache(16); - private static final DDCache< + static final DDCache SPAN_KINDS = DDCaches.newFixedSizeCache(16); + static final DDCache< String, Pair, Function>> PEER_TAGS_CACHE = DDCaches.newFixedSizeCache( 64); // it can be unbounded since those values are returned by the agent and should be // under control. 64 entries is enough in this case to contain all the peer tags. - private static final Function< + static final Function< String, Pair, Function>> PEER_TAGS_CACHE_ADDER = key -> @@ -82,20 +69,21 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; - private static final Set ELIGIBLE_SPAN_KINDS_FOR_METRICS = - unmodifiableSet( - new HashSet<>( - Arrays.asList( - SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER))); + private static final SpanKindFilter METRICS_ELIGIBLE_KINDS = + SpanKindFilter.builder() + .includeServer() + .includeClient() + .includeProducer() + .includeConsumer() + .build(); - private static final Set ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION = - unmodifiableSet( - new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER))); + private static final SpanKindFilter PEER_AGGREGATION_KINDS = + SpanKindFilter.builder().includeClient().includeProducer().includeConsumer().build(); + + private static final SpanKindFilter INTERNAL_KIND = + SpanKindFilter.builder().includeInternal().build(); private final Set ignoredResources; - private final MessagePassingQueue batchPool; - private final ConcurrentHashMap pending; - private final ConcurrentHashMap keys; private final Thread thread; private final MessagePassingQueue inbox; private final Sink sink; @@ -189,23 +177,12 @@ public ConflatingMetricsAggregator( this.ignoredResources = ignoredResources; this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); - this.batchPool = Queues.spmcArrayQueue(maxAggregates); - this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3); - this.keys = new ConcurrentHashMap<>(); this.features = features; this.healthMetrics = healthMetric; this.sink = sink; this.aggregator = new Aggregator( - metricWriter, - batchPool, - inbox, - pending, - keys.keySet(), - maxAggregates, - reportingInterval, - timeUnit, - healthMetric); + metricWriter, inbox, maxAggregates, reportingInterval, timeUnit, healthMetric); this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator); this.reportingInterval = reportingInterval; this.reportingIntervalTimeUnit = timeUnit; @@ -289,8 +266,7 @@ public boolean publish(List> trace) { if (features.supportsMetrics()) { for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); - final CharSequence spanKind = span.unsafeGetTag(SPAN_KIND, ""); - if (shouldComputeMetric(span, spanKind)) { + if (shouldComputeMetric(span, isTopLevel)) { final CharSequence resourceName = span.getResourceName(); if (resourceName != null && ignoredResources.contains(resourceName.toString())) { // skip publishing all children @@ -298,7 +274,7 @@ public boolean publish(List> trace) { break; } counted++; - forceKeep |= publish(span, isTopLevel, spanKind); + forceKeep |= publish(span, isTopLevel); } } healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep); @@ -306,19 +282,27 @@ public boolean publish(List> trace) { return forceKeep; } - private boolean shouldComputeMetric(CoreSpan span, @Nonnull CharSequence spanKind) { - return (span.isMeasured() || span.isTopLevel() || spanKindEligible(spanKind)) + private boolean shouldComputeMetric(CoreSpan span, boolean isTopLevel) { + return (span.isMeasured() || isTopLevel || span.isKind(METRICS_ELIGIBLE_KINDS)) && span.getLongRunningVersion() <= 0 // either not long-running or unpublished long-running span && span.getDurationNano() > 0; } - private boolean spanKindEligible(@Nonnull CharSequence spanKind) { - // use toString since it could be a CharSequence... - return ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString()); - } + private boolean publish(CoreSpan span, boolean isTopLevel) { + // Error decision drives force-keep sampling regardless of whether the snapshot gets queued. + boolean error = span.getError() > 0; + + // Fast-path the inbox-full case before any tag extraction or snapshot allocation. size() is + // approximate on jctools' MPSC queue but that's fine: if we under-estimate, we fall through + // and let inbox.offer be the source of truth (existing behavior); if we over-estimate, we + // drop a snapshot that would have fit -- acceptable, onStatsInboxFull was going to fire + // imminently anyway. + if (inbox.size() >= inbox.capacity()) { + healthMetrics.onStatsInboxFull(); + return error; + } - private boolean publish(CoreSpan span, boolean isTopLevel, CharSequence spanKind) { // Extract HTTP method and endpoint only if the feature is enabled String httpMethod = null; String httpEndpoint = null; @@ -335,96 +319,75 @@ private boolean publish(CoreSpan span, boolean isTopLevel, CharSequence spanK Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE); grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null; } - MetricKey newKey = - new MetricKey( + // CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString + // tag values don't trigger a ClassCastException on the String assignment. + final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString(); + + long tagAndDuration = + span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); + + SpanSnapshot snapshot = + new SpanSnapshot( span.getResourceName(), - SERVICE_NAMES.computeIfAbsent(span.getServiceName(), UTF8_ENCODE), + span.getServiceName(), span.getOperationName(), span.getServiceNameSource(), spanType, span.getHttpStatusCode(), isSynthetic(span), span.getParentId() == 0, - SPAN_KINDS.computeIfAbsent( - spanKind, UTF8BytesString::create), // save repeated utf8 conversions - getPeerTags(span, spanKind.toString()), + spanKind, + extractPeerTagPairs(span), httpMethod, httpEndpoint, - grpcStatusCode); - MetricKey key = keys.putIfAbsent(newKey, newKey); - if (null == key) { - key = newKey; - } - long tag = (span.getError() > 0 ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); - long durationNanos = span.getDurationNano(); - Batch batch = pending.get(key); - if (null != batch) { - // there is a pending batch, try to win the race to add to it - // returning false means that either the batch can't take any - // more data, or it has already been consumed - if (batch.add(tag, durationNanos)) { - // added to a pending batch prior to consumption, - // so skip publishing to the queue (we also know - // the key isn't rare enough to override the sampler) - return false; - } - // recycle the older key - key = batch.getKey(); + grpcStatusCode, + tagAndDuration); + if (!inbox.offer(snapshot)) { + healthMetrics.onStatsInboxFull(); } - batch = newBatch(key); - batch.add(tag, durationNanos); - // overwrite the last one if present, it was already full - // or had been consumed by the time we tried to add to it - pending.put(key, batch); - // must offer to the queue after adding to pending - inbox.offer(batch); // force keep keys if there are errors - return span.getError() > 0; + return error; } - private List getPeerTags(CoreSpan span, String spanKind) { - if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { + private String[] extractPeerTagPairs(CoreSpan span) { + if (span.isKind(PEER_AGGREGATION_KINDS)) { final Set eligiblePeerTags = features.peerTags(); - List peerTags = new ArrayList<>(eligiblePeerTags.size()); + String[] pairs = null; + int count = 0; for (String peerTag : eligiblePeerTags) { Object value = span.unsafeGetTag(peerTag); if (value != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); - peerTags.add( - cacheAndCreator - .getLeft() - .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); + if (pairs == null) { + // pairs are flattened [name, value, ...]; size for worst case + pairs = new String[eligiblePeerTags.size() * 2]; + } + pairs[count++] = peerTag; + pairs[count++] = value.toString(); } } - return peerTags; - } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { + if (pairs == null) { + return null; + } + if (count < pairs.length) { + String[] trimmed = new String[count]; + System.arraycopy(pairs, 0, trimmed, 0, count); + return trimmed; + } + return pairs; + } else if (span.isKind(INTERNAL_KIND)) { // in this case only the base service should be aggregated if present final Object baseService = span.unsafeGetTag(BASE_SERVICE); if (baseService != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); - return Collections.singletonList( - cacheAndCreator - .getLeft() - .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); + return new String[] {BASE_SERVICE, baseService.toString()}; } } - return Collections.emptyList(); + return null; } private static boolean isSynthetic(CoreSpan span) { return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } - private Batch newBatch(MetricKey key) { - Batch batch = batchPool.poll(); - if (null == batch) { - return new Batch(key); - } - return batch.reset(key); - } - public void stop() { if (null != cancellation) { cancellation.cancel(); @@ -467,8 +430,6 @@ private void disable() { features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); - this.pending.clear(); - this.batchPool.clear(); this.inbox.clear(); this.aggregator.clearAggregates(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java new file mode 100644 index 00000000000..2816fad0411 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -0,0 +1,65 @@ +package datadog.trace.common.metrics; + +/** + * Immutable per-span value posted from the producer to the aggregator thread. Carries the raw + * inputs the aggregator needs to build a {@link MetricKey} and update an {@link AggregateMetric}. + * + *

All cache-canonicalization (service-name, span-kind, peer-tag string interning) happens on the + * aggregator thread; the producer just shuffles references. + */ +final class SpanSnapshot implements InboxItem { + + final CharSequence resourceName; + final String serviceName; + final CharSequence operationName; + final CharSequence serviceNameSource; + final CharSequence spanType; + final short httpStatusCode; + final boolean synthetic; + final boolean traceRoot; + final String spanKind; + + /** + * Flattened name/value pairs of peer-tag matches: {@code [name0, value0, name1, value1, ...]}. + * {@code null} when there are no matches (the common case). + */ + final String[] peerTagPairs; + + final String httpMethod; + final String httpEndpoint; + final String grpcStatusCode; + + /** Duration in nanoseconds, OR-ed with {@code ERROR_TAG} / {@code TOP_LEVEL_TAG} as needed. */ + final long tagAndDuration; + + SpanSnapshot( + CharSequence resourceName, + String serviceName, + CharSequence operationName, + CharSequence serviceNameSource, + CharSequence spanType, + short httpStatusCode, + boolean synthetic, + boolean traceRoot, + String spanKind, + String[] peerTagPairs, + String httpMethod, + String httpEndpoint, + String grpcStatusCode, + long tagAndDuration) { + this.resourceName = resourceName; + this.serviceName = serviceName; + this.operationName = operationName; + this.serviceNameSource = serviceNameSource; + this.spanType = spanType; + this.httpStatusCode = httpStatusCode; + this.synthetic = synthetic; + this.traceRoot = traceRoot; + this.spanKind = spanKind; + this.peerTagPairs = peerTagPairs; + this.httpMethod = httpMethod; + this.httpEndpoint = httpEndpoint; + this.grpcStatusCode = grpcStatusCode; + this.tagAndDuration = tagAndDuration; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java index 8c98cbbc58a..7d183670883 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java @@ -80,6 +80,8 @@ default U unsafeGetTag(CharSequence name) { boolean isForceKeep(); + boolean isKind(SpanKindFilter filter); + CharSequence getType(); /** diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java index 2c62819e97a..4c438e1c915 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java @@ -959,6 +959,10 @@ public boolean isOutbound() { return ordinal == DDSpanContext.SPAN_KIND_CLIENT || ordinal == DDSpanContext.SPAN_KIND_PRODUCER; } + public boolean isKind(SpanKindFilter filter) { + return filter.matches(context.getSpanKindOrdinal()); + } + @Override public void copyPropagationAndBaggage(final AgentSpan source) { if (source instanceof DDSpan) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index f2eb17fe8a2..e403efd543b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -771,22 +771,26 @@ static boolean tagEquals(String tagValue, String tagLiteral) { * span.kind is set. */ public void setSpanKindOrdinal(String kind) { + spanKindOrdinal = spanKindOrdinalOf(kind); + } + + public static byte spanKindOrdinalOf(String kind) { if (kind == null) { - spanKindOrdinal = SPAN_KIND_UNSET; + return SPAN_KIND_UNSET; } else if (tagEquals(kind, Tags.SPAN_KIND_SERVER)) { - spanKindOrdinal = SPAN_KIND_SERVER; + return SPAN_KIND_SERVER; } else if (tagEquals(kind, Tags.SPAN_KIND_CLIENT)) { - spanKindOrdinal = SPAN_KIND_CLIENT; + return SPAN_KIND_CLIENT; } else if (tagEquals(kind, Tags.SPAN_KIND_PRODUCER)) { - spanKindOrdinal = SPAN_KIND_PRODUCER; + return SPAN_KIND_PRODUCER; } else if (tagEquals(kind, Tags.SPAN_KIND_CONSUMER)) { - spanKindOrdinal = SPAN_KIND_CONSUMER; + return SPAN_KIND_CONSUMER; } else if (tagEquals(kind, Tags.SPAN_KIND_INTERNAL)) { - spanKindOrdinal = SPAN_KIND_INTERNAL; + return SPAN_KIND_INTERNAL; } else if (tagEquals(kind, Tags.SPAN_KIND_BROKER)) { - spanKindOrdinal = SPAN_KIND_BROKER; + return SPAN_KIND_BROKER; } else { - spanKindOrdinal = SPAN_KIND_CUSTOM; + return SPAN_KIND_CUSTOM; } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/SpanKindFilter.java b/dd-trace-core/src/main/java/datadog/trace/core/SpanKindFilter.java new file mode 100644 index 00000000000..9ac3fa9dc06 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/SpanKindFilter.java @@ -0,0 +1,60 @@ +package datadog.trace.core; + +public final class SpanKindFilter { + public static final class Builder { + private int kindMask; + + public Builder includeServer() { + return this.include(DDSpanContext.SPAN_KIND_SERVER); + } + + public Builder includeClient() { + return this.include(DDSpanContext.SPAN_KIND_CLIENT); + } + + public Builder includeProducer() { + return this.include(DDSpanContext.SPAN_KIND_PRODUCER); + } + + public Builder includeConsumer() { + return this.include(DDSpanContext.SPAN_KIND_CONSUMER); + } + + public Builder includeInternal() { + return this.include(DDSpanContext.SPAN_KIND_INTERNAL); + } + + public Builder includeBroker() { + return this.include(DDSpanContext.SPAN_KIND_BROKER); + } + + public final SpanKindFilter build() { + return new SpanKindFilter(this.kindMask); + } + + private Builder include(int spanKindConstant) { + this.kindMask |= (1 << spanKindConstant); + return this; + } + } + + public static final Builder builder() { + return new Builder(); + } + + private final int kindMask; + + private SpanKindFilter(int kindMask) { + this.kindMask = kindMask; + } + + /** Test whether a span with the given span.kind string passes this filter. */ + public boolean matches(String spanKind) { + return matches(DDSpanContext.spanKindOrdinalOf(spanKind)); + } + + /** Fast-path test for callers that already hold the span's cached kind ordinal. */ + public boolean matches(byte spanKindOrdinal) { + return (kindMask & (1 << spanKindOrdinal)) != 0; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index 257d887029b..d1c7fe126b4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -93,6 +93,11 @@ public void onClientStatDowngraded() {} public void onStatsAggregateDropped() {} + /** + * Reports a single span whose stats snapshot was dropped because the aggregator inbox was full. + */ + public void onStatsInboxFull() {} + /** * @return Human-readable summary of the current health metrics. */ diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 2df54241e56..db384a7e42e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -98,6 +98,7 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder clientStatsDowngrades = new LongAdder(); private final LongAdder statsAggregateDropped = new LongAdder(); + private final LongAdder statsInboxFull = new LongAdder(); private final StatsDClient statsd; private final long interval; @@ -357,6 +358,11 @@ public void onStatsAggregateDropped() { statsAggregateDropped.increment(); } + @Override + public void onStatsInboxFull() { + statsInboxFull.increment(); + } + @Override public void close() { if (null != cancellation) { @@ -374,8 +380,9 @@ private static class Flush implements AgentTaskScheduler.Task= 99 okLatencies.getMaxValue() <= 5 } - - def "consistent under concurrent attempts to read and write"() { - given: - AggregateMetric aggregate = new AggregateMetric() - MetricKey key = new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null) - BlockingDeque queue = new LinkedBlockingDeque<>(1000) - ExecutorService reader = Executors.newSingleThreadExecutor() - int writerCount = 10 - ExecutorService writers = Executors.newFixedThreadPool(writerCount) - CountDownLatch readerLatch = new CountDownLatch(1) - CountDownLatch writerLatch = new CountDownLatch(writerCount) - CountDownLatch queueEmptyLatch = new CountDownLatch(1) - - AtomicInteger written = new AtomicInteger(0) - - when: - for (int i = 0; i < writerCount; ++i) { - writers.submit({ - readerLatch.await() - for (int j = 0; j < 10_000; ++j) { - Batch batch = queue.peekLast() - if (batch?.add(0L, 1)) { - written.incrementAndGet() - } else { - queue.offer(new Batch().reset(key)) - } - } - writerLatch.countDown() - }) - } - def future = reader.submit({ - readerLatch.countDown() - while (!Thread.currentThread().isInterrupted()) { - Batch batch = queue.poll(100, TimeUnit.MILLISECONDS) - if (null == batch && writerLatch.count == 0) { - queueEmptyLatch.countDown() - } else if (null != batch) { - batch.contributeTo(aggregate) - } - } - }) - assert writerLatch.await(10, TimeUnit.SECONDS) - // Wait here until we know that the queue is empty - assert queueEmptyLatch.await(10, TimeUnit.SECONDS) - future.cancel(true) - - then: - aggregate.getHitCount() == written.get() - } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy index bfc1ee2f4e7..2fd8554d499 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy @@ -2,8 +2,11 @@ package datadog.trace.common.metrics import datadog.trace.api.DDSpanId import datadog.trace.api.DDTraceId +import datadog.trace.bootstrap.instrumentation.api.Tags import datadog.trace.core.CoreSpan +import datadog.trace.core.DDSpanContext import datadog.trace.core.MetadataConsumer +import datadog.trace.core.SpanKindFilter class SimpleSpan implements CoreSpan { @@ -24,6 +27,8 @@ class SimpleSpan implements CoreSpan { private final Map tags = [:] + private byte spanKindOrdinal = 0 // SPAN_KIND_UNSET + SimpleSpan( String serviceName, String operationName, @@ -171,6 +176,9 @@ class SimpleSpan implements CoreSpan { @Override SimpleSpan setTag(String tag, Object value) { tags.put(tag, value) + if (Tags.SPAN_KIND == tag) { + spanKindOrdinal = DDSpanContext.spanKindOrdinalOf(value == null ? null : value.toString()) + } return this } @@ -211,6 +219,11 @@ class SimpleSpan implements CoreSpan { return false } + @Override + boolean isKind(SpanKindFilter filter) { + return filter.matches(spanKindOrdinal) + } + @Override CharSequence getType() { return type diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceGenerator.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceGenerator.groovy index 66bdbab137b..49e13472249 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceGenerator.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceGenerator.groovy @@ -11,10 +11,12 @@ import datadog.trace.api.ProcessTags import datadog.trace.api.TagMap import datadog.trace.api.sampling.PrioritySampling import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink +import datadog.trace.bootstrap.instrumentation.api.Tags import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.core.CoreSpan import datadog.trace.core.Metadata import datadog.trace.core.MetadataConsumer +import datadog.trace.core.SpanKindFilter import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit @@ -321,6 +323,12 @@ class TraceGenerator { return false } + @Override + boolean isKind(SpanKindFilter filter) { + def kind = metadata.getTags().get(Tags.SPAN_KIND) + return filter.matches(kind == null ? null : kind.toString()) + } + @Override short getHttpStatusCode() { return httpStatusCode diff --git a/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java b/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java index 670c4cda113..2f9ac1ea7da 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java @@ -398,6 +398,19 @@ void testOnStatsAggregateDropped() throws InterruptedException { verifyNoMoreInteractions(statsD); } + @Test + void testOnStatsInboxFull() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + try (TracerHealthMetrics healthMetrics = + new TracerHealthMetrics(new Latched(statsD, latch), 100, TimeUnit.MILLISECONDS)) { + healthMetrics.start(); + healthMetrics.onStatsInboxFull(); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + verify(statsD).count("stats.dropped_aggregates", 1L, "reason:inbox_full"); + verifyNoMoreInteractions(statsD); + } + private static class Latched implements StatsDClient { private final StatsDClient delegate; private final CountDownLatch latch; diff --git a/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy b/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy index e668d0112a6..2b2bca79406 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy @@ -9,10 +9,12 @@ import datadog.trace.api.DDTags import datadog.trace.api.DDTraceId import datadog.trace.api.IdGenerationStrategy import datadog.trace.api.TagMap +import datadog.trace.bootstrap.instrumentation.api.Tags import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.core.CoreSpan import datadog.trace.core.Metadata import datadog.trace.core.MetadataConsumer +import datadog.trace.core.SpanKindFilter import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit @@ -298,6 +300,12 @@ class TraceGenerator { return false } + @Override + boolean isKind(SpanKindFilter filter) { + def kind = metadata.getTags().get(Tags.SPAN_KIND) + return filter.matches(kind == null ? null : kind.toString()) + } + Map getBaggage() { return metadata.getBaggage() }