From 875f094fb683343cdb2db8c3e987c8588eaee7a3 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 8 May 2026 13:18:14 -0400 Subject: [PATCH] debugger/symdb: add upload metadata fields to upload event message Add the following fields to the SymDB upload event message that accompanies each multipart upload: - "version" (top-level): the service version - "language" (top-level): "java" - "upload_id" (top-level): a UUID generated once per SymbolSink instance, shared by all batches uploaded by the sink - "batch_num" (top-level): 1-indexed counter incremented per upload - "final" (top-level): always false; the Java tracer continuously uploads new code as classes get loaded, so there is no defined end-of-upload point - "attachment_size" (top-level): size in bytes of the (compressed) attachment payload Some of these fields are new, to be used by the backend in the future. Others duplicate info that was already included in the attachment; by duplicating some metadata out of the SymDB attachment body into the EvP event body, the backend can populate per-attachment bookkeeping without downloading the attachment. --- .../com/datadog/debugger/sink/SymbolSink.java | 62 +++++++++++++++---- .../debugger/symbol/ServiceVersion.java | 22 ++++++- .../datadog/debugger/sink/SymbolSinkTest.java | 26 ++++++-- 3 files changed, 93 insertions(+), 17 deletions(-) diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java index a18cd2d3df5..fd233f41893 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java @@ -10,6 +10,7 @@ import com.datadog.debugger.util.MoshiHelper; import com.squareup.moshi.JsonAdapter; import datadog.trace.api.Config; +import datadog.trace.util.RandomUtils; import datadog.trace.util.TagsHelper; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -19,8 +20,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.zip.GZIPOutputStream; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -36,21 +39,34 @@ public class SymbolSink { public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(10); private static final JsonAdapter SERVICE_VERSION_ADAPTER = MoshiHelper.createMoshiSymbol().adapter(ServiceVersion.class); + // The upload event message JSON. The "final" field is hard-coded to false: + // the Java tracer continuously uploads new code as classes get loaded, so + // there is no defined end-of-upload point. private static final String EVENT_FORMAT = "{%n" + "\"ddsource\": \"dd_debugger\",%n" + "\"service\": \"%s\",%n" + + "\"version\": \"%s\",%n" + + "\"language\": \"java\",%n" + "\"runtimeId\": \"%s\",%n" - + "\"type\": \"symdb\"%n" + + "\"type\": \"symdb\",%n" + + "\"uploadId\": \"%s\",%n" + + "\"batchNum\": %d,%n" + + "\"final\": false,%n" + + "\"attachmentSize\": %d%n" + "}"; static final int MAX_SYMDB_UPLOAD_SIZE = 50 * 1024 * 1024; private final String serviceName; private final String env; private final String version; + private final String runtimeId; private final BatchUploader symbolUploader; private final int maxPayloadSize; - private final BatchUploader.MultiPartContent event; + // uploadId is shared by all batches uploaded by this sink. The backend uses + // (runtimeId, uploadId) to group batches belonging to the same logical upload. + private final UUID uploadId = RandomUtils.randomUUID(); + private final AtomicLong batchNum = new AtomicLong(0); private final BlockingQueue scopes = new ArrayBlockingQueue<>(CAPACITY); private final Stats stats = new Stats(); private final boolean isCompressed; @@ -66,15 +82,10 @@ public SymbolSink(Config config) { this.serviceName = TagsHelper.sanitize(config.getServiceName()); this.env = TagsHelper.sanitize(config.getEnv()); this.version = TagsHelper.sanitize(config.getVersion()); + this.runtimeId = config.getRuntimeId(); this.symbolUploader = symbolUploader; this.maxPayloadSize = maxPayloadSize; this.isCompressed = config.isSymbolDatabaseCompressed(); - byte[] eventContent = - String.format( - EVENT_FORMAT, TagsHelper.sanitize(config.getServiceName()), config.getRuntimeId()) - .getBytes(StandardCharsets.UTF_8); - this.event = - new BatchUploader.MultiPartContent(eventContent, "event", "event.json", APPLICATION_JSON); } public void stop() { @@ -111,22 +122,35 @@ public void flush() { } private void serializeAndUpload(List scopesToSerialize) { + // Determine the batch number once so the attachment body and the EvP event + // message agree on it. + long currentBatch = batchNum.incrementAndGet(); try { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(2 * 1024 * 1024); try (OutputStream outputStream = isCompressed ? new GZIPOutputStream(byteArrayOutputStream) : byteArrayOutputStream) { BufferedSink sink = Okio.buffer(Okio.sink(outputStream)); SERVICE_VERSION_ADAPTER.toJson( - sink, new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize)); + sink, + new ServiceVersion( + serviceName, + env, + version, + "JAVA", + scopesToSerialize, + uploadId.toString(), + currentBatch, + false /* isFinal */)); sink.flush(); } - doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed); + doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed, currentBatch); } catch (IOException e) { LOGGER.debug("Error serializing scopes", e); } } - private void doUpload(List scopesToSerialize, byte[] payload, boolean isCompressed) { + private void doUpload( + List scopesToSerialize, byte[] payload, boolean isCompressed, long currentBatch) { if (payload.length > maxPayloadSize) { LOGGER.warn( "Payload is too big: {}/{} isCompressed={}", @@ -148,10 +172,26 @@ private void doUpload(List scopesToSerialize, byte[] payload, boolean isC fileName = "file.gz"; mediaType = APPLICATION_GZIP; } + BatchUploader.MultiPartContent event = buildEvent(currentBatch, payload.length); symbolUploader.uploadAsMultipart( "", event, new BatchUploader.MultiPartContent(payload, "file", fileName, mediaType)); } + private BatchUploader.MultiPartContent buildEvent(long currentBatch, int attachmentSize) { + byte[] eventContent = + String.format( + EVENT_FORMAT, + serviceName, + version, + runtimeId, + uploadId.toString(), + currentBatch, + attachmentSize) + .getBytes(StandardCharsets.UTF_8); + return new BatchUploader.MultiPartContent( + eventContent, "event", "event.json", APPLICATION_JSON); + } + private static byte[] compressPayload(byte[] jsonBytes) { // usual compression factor 40:1 for those json payload, so we are preallocating 1/40 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(jsonBytes.length / 40); diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/symbol/ServiceVersion.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/symbol/ServiceVersion.java index a88fc25da63..fd1efaaac04 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/symbol/ServiceVersion.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/symbol/ServiceVersion.java @@ -1,5 +1,6 @@ package com.datadog.debugger.symbol; +import com.squareup.moshi.Json; import java.util.List; public class ServiceVersion { @@ -10,13 +11,32 @@ public class ServiceVersion { private final String language; private final List scopes; + @Json(name = "upload_id") + private final String uploadId; + + @Json(name = "batch_num") + private final long batchNum; + + @Json(name = "final") + private final boolean isFinal; + public ServiceVersion( - String service, String env, String version, String language, List scopes) { + String service, + String env, + String version, + String language, + List scopes, + String uploadId, + long batchNum, + boolean isFinal) { this.service = service; this.env = env; this.version = version; this.language = language; this.scopes = scopes; + this.uploadId = uploadId; + this.batchNum = batchNum; + this.isFinal = isFinal; } public List getScopes() { diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java index 91a5743d2a0..9a04fbae6ba 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/sink/SymbolSinkTest.java @@ -24,6 +24,8 @@ public void testSimpleFlush() { SymbolUploaderMock symbolUploaderMock = new SymbolUploaderMock(); Config config = mock(Config.class); when(config.getServiceName()).thenReturn("service1"); + when(config.getVersion()).thenReturn("1.0.0"); + when(config.getRuntimeId()).thenReturn("test-runtime"); when(config.isSymbolDatabaseCompressed()).thenReturn(false); SymbolSink symbolSink = new SymbolSink(config, symbolUploaderMock, MAX_SYMDB_UPLOAD_SIZE); symbolSink.addScope(Scope.builder(ScopeType.JAR, null, 0, 0).build()); @@ -35,13 +37,25 @@ public void testSimpleFlush() { String strEventContent = new String(eventContent.getContent()); assertTrue(strEventContent.contains("\"ddsource\": \"dd_debugger\"")); assertTrue(strEventContent.contains("\"service\": \"service1\"")); + assertTrue(strEventContent.contains("\"version\": \"1.0.0\"")); + assertTrue(strEventContent.contains("\"language\": \"java\"")); + assertTrue(strEventContent.contains("\"runtimeId\": \"test-runtime\"")); assertTrue(strEventContent.contains("\"type\": \"symdb\"")); + assertTrue(strEventContent.contains("\"uploadId\":")); + assertTrue(strEventContent.contains("\"batchNum\": 1")); + assertTrue(strEventContent.contains("\"final\": false")); + assertTrue(strEventContent.contains("\"attachmentSize\":")); BatchUploader.MultiPartContent symbolContent = symbolUploaderMock.multiPartContents.get(1); assertEquals("file", symbolContent.getPartName()); assertEquals("file.json", symbolContent.getFileName()); - assertEquals( - "{\"language\":\"JAVA\",\"scopes\":[{\"end_line\":0,\"has_injectible_lines\":false,\"scope_type\":\"JAR\",\"start_line\":0}],\"service\":\"service1\"}", - new String(symbolContent.getContent())); + String fileContent = new String(symbolContent.getContent()); + assertTrue(fileContent.contains("\"language\":\"JAVA\"")); + assertTrue(fileContent.contains("\"scopes\":[")); + assertTrue(fileContent.contains("\"service\":\"service1\"")); + assertTrue(fileContent.contains("\"version\":\"1.0.0\"")); + assertTrue(fileContent.contains("\"upload_id\":")); + assertTrue(fileContent.contains("\"batch_num\":1")); + assertTrue(fileContent.contains("\"final\":false")); } @Test @@ -219,8 +233,10 @@ public void maxCompressedAndSplit() { .build()); } symbolSink.flush(); - assertEquals(4, symbolUploaderMock.multiPartContents.size()); - for (int i = 0; i < 4; i += 2) { + int total = symbolUploaderMock.multiPartContents.size(); + assertTrue(total >= 4, "expected at least 4 multipart entries (2+ event/file pairs), got " + total); + assertTrue(total % 2 == 0, "expected an even number of multipart entries (event/file pairs), got " + total); + for (int i = 0; i < total; i += 2) { BatchUploader.MultiPartContent eventContent = symbolUploaderMock.multiPartContents.get(i); assertEquals("event", eventContent.getPartName()); BatchUploader.MultiPartContent symbolContent =