From e328d12241b805194761476caf5bf587c4cf4aaa Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Mon, 2 Mar 2026 11:46:34 +0100 Subject: [PATCH] [PROF-10637] Explore tracking Netty / Native socket events --- ddprof-lib/src/main/cpp/arguments.cpp | 19 ++ ddprof-lib/src/main/cpp/arguments.h | 4 +- ddprof-lib/src/main/cpp/codeCache.cpp | 18 ++ ddprof-lib/src/main/cpp/codeCache.h | 7 + ddprof-lib/src/main/cpp/event.h | 18 ++ ddprof-lib/src/main/cpp/flightRecorder.cpp | 26 +++ ddprof-lib/src/main/cpp/flightRecorder.h | 2 + ddprof-lib/src/main/cpp/javaApi.cpp | 17 ++ ddprof-lib/src/main/cpp/jfrMetadata.cpp | 13 ++ ddprof-lib/src/main/cpp/jfrMetadata.h | 1 + ddprof-lib/src/main/cpp/libraries.cpp | 2 + ddprof-lib/src/main/cpp/profiler.cpp | 24 +- ddprof-lib/src/main/cpp/profiler.h | 5 +- ddprof-lib/src/main/cpp/socketPatcher.h | 67 ++++++ .../src/main/cpp/socketPatcher_linux.cpp | 218 ++++++++++++++++++ .../com/datadoghq/profiler/JavaProfiler.java | 35 +++ .../src/test/cpp/test_socketPatcher.cpp | 63 +++++ .../socket/NativeSocketEventTest.java | 145 ++++++++++++ 18 files changed, 681 insertions(+), 3 deletions(-) create mode 100644 ddprof-lib/src/main/cpp/socketPatcher.h create mode 100644 ddprof-lib/src/main/cpp/socketPatcher_linux.cpp create mode 100644 ddprof-lib/src/test/cpp/test_socketPatcher.cpp create mode 100644 ddprof-test/src/test/java/com/datadoghq/profiler/socket/NativeSocketEventTest.java diff --git a/ddprof-lib/src/main/cpp/arguments.cpp b/ddprof-lib/src/main/cpp/arguments.cpp index 72b8aec22..f6afba864 100644 --- a/ddprof-lib/src/main/cpp/arguments.cpp +++ b/ddprof-lib/src/main/cpp/arguments.cpp @@ -362,6 +362,25 @@ Error Arguments::parse(const char *args) { _remote_symbolication = true; } + CASE("nativesock") + if (value != NULL) { + switch (value[0]) { + case 'n': // no + case 'f': // false + case '0': // 0 + _native_sockets = false; + break; + case 'y': // yes + case 't': // true + case '1': // 1 + default: + _native_sockets = true; + } + } else { + // No value means enable + _native_sockets = true; + } + CASE("wallsampler") if (value != NULL) { switch (value[0]) { diff --git a/ddprof-lib/src/main/cpp/arguments.h b/ddprof-lib/src/main/cpp/arguments.h index 3f2542705..6ab62f302 100644 --- a/ddprof-lib/src/main/cpp/arguments.h +++ b/ddprof-lib/src/main/cpp/arguments.h @@ -189,6 +189,7 @@ class Arguments { bool _lightweight; bool _enable_method_cleanup; bool _remote_symbolication; // Enable remote symbolication for native frames + bool _native_sockets; // Enable PLT patching of Netty native socket functions Arguments(bool persistent = false) : _buf(NULL), @@ -223,7 +224,8 @@ class Arguments { _wallclock_sampler(ASGCT), _lightweight(false), _enable_method_cleanup(true), - _remote_symbolication(false) {} + _remote_symbolication(false), + _native_sockets(false) {} ~Arguments(); diff --git a/ddprof-lib/src/main/cpp/codeCache.cpp b/ddprof-lib/src/main/cpp/codeCache.cpp index a8a360160..57185c983 100644 --- a/ddprof-lib/src/main/cpp/codeCache.cpp +++ b/ddprof-lib/src/main/cpp/codeCache.cpp @@ -336,6 +336,24 @@ void CodeCache::addImport(void **entry, const char *name) { case 'r': if (strcmp(name, "realloc") == 0) { saveImport(im_realloc, entry); + } else if (strcmp(name, "recv") == 0) { + saveImport(im_recv, entry); + } else if (strcmp(name, "recvfrom") == 0) { + saveImport(im_recvfrom, entry); + } else if (strcmp(name, "readv") == 0) { + saveImport(im_readv, entry); + } + break; + case 's': + if (strcmp(name, "send") == 0) { + saveImport(im_send, entry); + } else if (strcmp(name, "sendto") == 0) { + saveImport(im_sendto, entry); + } + break; + case 'w': + if (strcmp(name, "writev") == 0) { + saveImport(im_writev, entry); } break; } diff --git a/ddprof-lib/src/main/cpp/codeCache.h b/ddprof-lib/src/main/cpp/codeCache.h index 9192719ff..075124daa 100644 --- a/ddprof-lib/src/main/cpp/codeCache.h +++ b/ddprof-lib/src/main/cpp/codeCache.h @@ -31,6 +31,13 @@ enum ImportId { im_calloc, im_realloc, im_free, + // Socket I/O — intercepted in Netty native transport libraries + im_recv, + im_send, + im_recvfrom, + im_sendto, + im_readv, + im_writev, NUM_IMPORTS }; diff --git a/ddprof-lib/src/main/cpp/event.h b/ddprof-lib/src/main/cpp/event.h index e9363165f..3c8584985 100644 --- a/ddprof-lib/src/main/cpp/event.h +++ b/ddprof-lib/src/main/cpp/event.h @@ -173,4 +173,22 @@ typedef struct QueueTimeEvent { u32 _queueLength; } QueueTimeEvent; +// Operation types for NativeSocketEvent +enum SocketOp { + SOCKET_OP_RECV = 0, + SOCKET_OP_SEND = 1, + SOCKET_OP_RECVFROM = 2, + SOCKET_OP_SENDTO = 3, + SOCKET_OP_READV = 4, + SOCKET_OP_WRITEV = 5, +}; + +typedef struct NativeSocketEvent { + u64 _start; + u64 _end; + int _fd; + u64 _bytes; + u32 _operation; +} NativeSocketEvent; + #endif // _EVENT_H diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 78d1b6711..715c78950 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -1548,6 +1548,20 @@ void Recording::recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event) { flushIfNeeded(buf); } +void Recording::recordNativeSocketEvent(Buffer *buf, int tid, NativeSocketEvent *event) { + int start = buf->skip(1); + buf->putVar64(T_NATIVE_SOCKET_EVENT); + buf->putVar64(event->_start); + buf->putVar64(event->_end - event->_start); + buf->putVar64(tid); + buf->putVar64(static_cast(event->_fd)); + buf->putVar64(event->_bytes); + buf->putVar64(event->_operation); + writeContext(buf, Contexts::get()); + writeEventSizePrefix(buf, start); + flushIfNeeded(buf); +} + void Recording::recordAllocation(RecordingBuffer *buf, int tid, u64 call_trace_id, AllocEvent *event) { int start = buf->skip(1); @@ -1751,6 +1765,18 @@ void FlightRecorder::recordQueueTime(int lock_index, int tid, } } +void FlightRecorder::recordNativeSocketEvent(int lock_index, int tid, + NativeSocketEvent *event) { + OptionalSharedLockGuard locker(&_rec_lock); + if (locker.ownsLock()) { + Recording* rec = _rec; + if (rec != nullptr) { + Buffer *buf = rec->buffer(lock_index); + rec->recordNativeSocketEvent(buf, tid, event); + } + } +} + void FlightRecorder::recordDatadogSetting(int lock_index, int length, const char *name, const char *value, const char *unit) { diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index c1ab88262..99cf1ff46 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -278,6 +278,7 @@ class Recording { void recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event); void recordTraceRoot(Buffer *buf, int tid, TraceRootEvent *event); void recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event); + void recordNativeSocketEvent(Buffer *buf, int tid, NativeSocketEvent *event); void recordAllocation(RecordingBuffer *buf, int tid, u64 call_trace_id, AllocEvent *event); void recordHeapLiveObject(Buffer *buf, int tid, u64 call_trace_id, @@ -344,6 +345,7 @@ class FlightRecorder { void wallClockEpoch(int lock_index, WallClockEpochEvent *event); void recordTraceRoot(int lock_index, int tid, TraceRootEvent *event); void recordQueueTime(int lock_index, int tid, QueueTimeEvent *event); + void recordNativeSocketEvent(int lock_index, int tid, NativeSocketEvent *event); bool active() const { return _rec != NULL; } diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 355fcd512..b31c8a1b4 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -21,6 +21,7 @@ #include "counters.h" #include "common.h" #include "engine.h" +#include "event.h" #include "incbin.h" #include "os.h" #include "otel_process_ctx.h" @@ -283,6 +284,22 @@ Java_com_datadoghq_profiler_JavaProfiler_recordSettingEvent0( tid, length, name_str.c_str(), value_str.c_str(), unit_str.c_str()); } +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_recordNativeSocketEvent0( + JNIEnv *env, jclass unused, jlong startTime, jlong endTime, + jint fd, jlong bytes, jint operation) { + if (fd < 0 || bytes < 0 || operation < SOCKET_OP_RECV || operation > SOCKET_OP_WRITEV) { + return; + } + NativeSocketEvent event; + event._start = startTime; + event._end = endTime; + event._fd = fd; + event._bytes = (u64)bytes; + event._operation = (u32)operation; + Profiler::instance()->recordNativeSocketEvent(&event); +} + static int dictionarizeClassName(JNIEnv* env, jstring className) { JniString str(env, className); return Profiler::instance()->lookupClass(str.c_str(), str.length()); diff --git a/ddprof-lib/src/main/cpp/jfrMetadata.cpp b/ddprof-lib/src/main/cpp/jfrMetadata.cpp index 6991a8a12..a7cb3d6fd 100644 --- a/ddprof-lib/src/main/cpp/jfrMetadata.cpp +++ b/ddprof-lib/src/main/cpp/jfrMetadata.cpp @@ -286,6 +286,19 @@ void JfrMetadata::initialize( << field("name", T_STRING, "Name") << field("count", T_LONG, "Count")) + << (type("datadog.NativeSocketEvent", T_NATIVE_SOCKET_EVENT, + "Native Socket Event") + << category("Datadog") + << field("startTime", T_LONG, "Start Time", F_TIME_TICKS) + << field("duration", T_LONG, "Duration", F_DURATION_TICKS) + << field("eventThread", T_THREAD, "Event Thread", F_CPOOL) + << field("fd", T_INT, "File Descriptor") + << field("bytesTransferred", T_LONG, "Bytes Transferred") + << field("operation", T_INT, "Operation") + << field("spanId", T_LONG, "Span ID") + << field("localRootSpanId", T_LONG, "Local Root Span ID") || + contextAttributes) + << (type("jdk.OSInformation", T_OS_INFORMATION, "OS Information") << category("Operating System") << field("startTime", T_LONG, "Start Time", F_TIME_TICKS) diff --git a/ddprof-lib/src/main/cpp/jfrMetadata.h b/ddprof-lib/src/main/cpp/jfrMetadata.h index 77da96d3f..a94343668 100644 --- a/ddprof-lib/src/main/cpp/jfrMetadata.h +++ b/ddprof-lib/src/main/cpp/jfrMetadata.h @@ -78,6 +78,7 @@ enum JfrType { T_DATADOG_CLASSREF_CACHE = 124, T_DATADOG_COUNTER = 125, T_UNWIND_FAILURE = 126, + T_NATIVE_SOCKET_EVENT = 127, T_ANNOTATION = 200, T_LABEL = 201, T_CATEGORY = 202, diff --git a/ddprof-lib/src/main/cpp/libraries.cpp b/ddprof-lib/src/main/cpp/libraries.cpp index d65b2861f..ef374870d 100644 --- a/ddprof-lib/src/main/cpp/libraries.cpp +++ b/ddprof-lib/src/main/cpp/libraries.cpp @@ -3,6 +3,7 @@ #include "libraries.h" #include "libraryPatcher.h" #include "log.h" +#include "socketPatcher.h" #include "symbols.h" #include "symbols_linux.h" #include "vmEntry.h" @@ -38,6 +39,7 @@ void Libraries::mangle(const char *name, char *buf, size_t size) { void Libraries::updateSymbols(bool kernel_symbols) { Symbols::parseLibraries(&_native_libs, kernel_symbols); LibraryPatcher::patch_libraries(); + SocketPatcher::patch_libraries(); } // Platform-specific implementation of updateBuildIds() is in libraries_linux.cpp (Linux) diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index 8a5faecef..9b82ac9d7 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -18,6 +18,7 @@ #include "j9WallClock.h" #include "libraryPatcher.h" #include "objectSampler.h" +#include "socketPatcher.h" #include "os.h" #include "perfEvents.h" #include "safeAccess.h" @@ -913,6 +914,21 @@ void Profiler::recordQueueTime(int tid, QueueTimeEvent *event) { _locks[lock_index].unlock(); } +void Profiler::recordNativeSocketEvent(NativeSocketEvent *event) { + int tid = ProfiledThread::currentTid(); + if (tid < 0) { + return; + } + u32 lock_index = getLockIndex(tid); + if (!_locks[lock_index].tryLock() && + !_locks[lock_index = (lock_index + 1) % CONCURRENCY_LEVEL].tryLock() && + !_locks[lock_index = (lock_index + 2) % CONCURRENCY_LEVEL].tryLock()) { + return; + } + _jfr.recordNativeSocketEvent(lock_index, tid, event); + _locks[lock_index].unlock(); +} + void Profiler::recordExternalSample(u64 weight, int tid, int num_frames, ASGCT_CallFrame *frames, bool truncated, jint event_type, Event *event) { @@ -1271,6 +1287,7 @@ Error Profiler::start(Arguments &args, bool reset) { _omit_stacktraces = args._lightweight; _remote_symbolication = args._remote_symbolication; + _native_sockets = args._native_sockets; _event_mask = ((args._event != NULL && strcmp(args._event, EVENT_NOOP) != 0) ? EM_CPU : 0) | @@ -1485,7 +1502,12 @@ Error Profiler::stop() { // Unpatch libraries AFTER JFR serialization completes // Remote symbolication RemoteFrameInfo structs contain pointers to build-ID strings - // owned by library metadata, so we must keep library patches active until after serialization + // owned by library metadata, so we must keep library patches active until after serialization. + // Note: between unlockAll() above and unpatch_libraries() below, PLT hooks may still + // fire from concurrent Netty I/O threads. This is safe because + // FlightRecorder::recordNativeSocketEvent checks _rec != nullptr (set to null by _jfr.stop()). + _native_sockets = false; // Prevent dlopen hooks from re-patching after unpatch + SocketPatcher::unpatch_libraries(); LibraryPatcher::unpatch_libraries(); _state = IDLE; diff --git a/ddprof-lib/src/main/cpp/profiler.h b/ddprof-lib/src/main/cpp/profiler.h index 0977a0a95..eca2a7007 100644 --- a/ddprof-lib/src/main/cpp/profiler.h +++ b/ddprof-lib/src/main/cpp/profiler.h @@ -155,6 +155,7 @@ class alignas(alignof(SpinLock)) Profiler { u32 _num_context_attributes; bool _omit_stacktraces; bool _remote_symbolication; // Enable remote symbolication for native frames + bool _native_sockets; // Enable PLT patching of Netty native socket functions // dlopen() hook support void **_dlopen_entry; @@ -209,7 +210,7 @@ class alignas(alignof(SpinLock)) Profiler { _num_context_attributes(0), _class_map(1), _string_label_map(2), _context_value_map(3), _cpu_engine(), _alloc_engine(), _event_mask(0), _stop_time(), _total_samples(0), _failures(), _cstack(CSTACK_NO), - _omit_stacktraces(false) { + _omit_stacktraces(false), _native_sockets(false) { for (int i = 0; i < CONCURRENCY_LEVEL; i++) { _calltrace_buffer[i] = NULL; @@ -375,6 +376,8 @@ class alignas(alignof(SpinLock)) Profiler { void recordWallClockEpoch(int tid, WallClockEpochEvent *event); void recordTraceRoot(int tid, TraceRootEvent *event); void recordQueueTime(int tid, QueueTimeEvent *event); + void recordNativeSocketEvent(NativeSocketEvent *event); + bool nativeSockets() const { return _native_sockets; } void writeLog(LogLevel level, const char *message); void writeLog(LogLevel level, const char *message, size_t len); void writeDatadogProfilerSetting(int tid, int length, const char *name, diff --git a/ddprof-lib/src/main/cpp/socketPatcher.h b/ddprof-lib/src/main/cpp/socketPatcher.h new file mode 100644 index 000000000..6517f32e4 --- /dev/null +++ b/ddprof-lib/src/main/cpp/socketPatcher.h @@ -0,0 +1,67 @@ +/* + * Copyright 2026, Datadog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _SOCKETPATCHER_H +#define _SOCKETPATCHER_H + +#include "codeCache.h" +#include "spinLock.h" + +#ifdef __linux__ + +static const int MAX_SOCKET_PATCHED_LIBS = 16; + +// Stores the set of PLT entries patched for a single library. +// Each entry holds the PLT location and the original function pointer +// for one of the six intercepted socket functions. +typedef struct _socketPatchSet { + CodeCache* _lib; + void** _locations[6]; // recv, send, recvfrom, sendto, readv, writev + void* _originals[6]; +} SocketPatchSet; + +// Patches PLT entries for socket I/O functions (recv, send, recvfrom, sendto, +// readv, writev) in Netty native transport libraries. Hook functions record +// NativeSocketEvent entries to JFR via the profiler's recording pipeline. +// +// Only libraries whose basename contains "libnetty_transport_native" are +// patched. The profiler's own library is never patched — hook functions +// resolve to real libc symbols through the profiler's unpatched PLT. +class SocketPatcher { +private: + static SpinLock _lock; + static SocketPatchSet _patched_libs[MAX_SOCKET_PATCHED_LIBS]; + static int _size; + + static void patch_library_unlocked(CodeCache* lib); + +public: + static bool isNettyNativeLibrary(const char* name); + static void patch_libraries(); + static void unpatch_libraries(); +}; + +#else + +class SocketPatcher { +public: + static void patch_libraries() { } + static void unpatch_libraries() { } +}; + +#endif + +#endif // _SOCKETPATCHER_H diff --git a/ddprof-lib/src/main/cpp/socketPatcher_linux.cpp b/ddprof-lib/src/main/cpp/socketPatcher_linux.cpp new file mode 100644 index 000000000..9d71abf56 --- /dev/null +++ b/ddprof-lib/src/main/cpp/socketPatcher_linux.cpp @@ -0,0 +1,218 @@ +/* + * Copyright 2026, Datadog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "socketPatcher.h" + +#ifdef __linux__ +#include "common.h" +#include "context.h" +#include "event.h" +#include "guards.h" +#include "libraries.h" +#include "log.h" +#include "profiler.h" +#include "tsc.h" + +#include +#include +#include + +SpinLock SocketPatcher::_lock; +SocketPatchSet SocketPatcher::_patched_libs[MAX_SOCKET_PATCHED_LIBS]; +int SocketPatcher::_size = 0; + +// Index positions in SocketPatchSet arrays — must match ImportId order +enum SocketFuncIndex { + SF_RECV = 0, + SF_SEND = 1, + SF_RECVFROM = 2, + SF_SENDTO = 3, + SF_READV = 4, + SF_WRITEV = 5, +}; + +static void recordEvent(int fd, u64 bytes, u32 operation, u64 start) { + u64 end = TSC::ticks(); + NativeSocketEvent event; + event._start = start; + event._end = end; + event._fd = fd; + event._bytes = bytes; + event._operation = operation; + Profiler::instance()->recordNativeSocketEvent(&event); +} + +// --- Hook functions --- +// Each hook: capture start ticks, call real function (via profiler's own PLT), +// record event on success. The real libc functions resolve through the +// profiler library's unpatched PLT entries. + +static ssize_t recv_hook(int fd, void* buf, size_t len, int flags) { + u64 start = TSC::ticks(); + ssize_t ret = recv(fd, buf, len, flags); + if (ret > 0) { + recordEvent(fd, (u64)ret, SOCKET_OP_RECV, start); + } + return ret; +} + +static ssize_t send_hook(int fd, const void* buf, size_t len, int flags) { + u64 start = TSC::ticks(); + ssize_t ret = send(fd, buf, len, flags); + if (ret > 0) { + recordEvent(fd, (u64)ret, SOCKET_OP_SEND, start); + } + return ret; +} + +static ssize_t recvfrom_hook(int fd, void* buf, size_t len, int flags, + struct sockaddr* src_addr, socklen_t* addrlen) { + u64 start = TSC::ticks(); + ssize_t ret = recvfrom(fd, buf, len, flags, src_addr, addrlen); + if (ret > 0) { + recordEvent(fd, (u64)ret, SOCKET_OP_RECVFROM, start); + } + return ret; +} + +static ssize_t sendto_hook(int fd, const void* buf, size_t len, int flags, + const struct sockaddr* dest_addr, socklen_t addrlen) { + u64 start = TSC::ticks(); + ssize_t ret = sendto(fd, buf, len, flags, dest_addr, addrlen); + if (ret > 0) { + recordEvent(fd, (u64)ret, SOCKET_OP_SENDTO, start); + } + return ret; +} + +// readv/writev are generic I/O syscalls that work on any fd type (sockets, +// pipes, files). We only patch them inside Netty native transport libraries +// where they are used exclusively for socket I/O. The rare false positive +// (e.g. Netty's internal wakeup pipe) is acceptable — it avoids a getsockopt() +// syscall on every call that would double the syscall cost. +static ssize_t readv_hook(int fd, const struct iovec* iov, int iovcnt) { + u64 start = TSC::ticks(); + ssize_t ret = readv(fd, iov, iovcnt); + if (ret > 0) { + recordEvent(fd, (u64)ret, SOCKET_OP_READV, start); + } + return ret; +} + +static ssize_t writev_hook(int fd, const struct iovec* iov, int iovcnt) { + u64 start = TSC::ticks(); + ssize_t ret = writev(fd, iov, iovcnt); + if (ret > 0) { + recordEvent(fd, (u64)ret, SOCKET_OP_WRITEV, start); + } + return ret; +} + +// Hook function pointers indexed by SocketFuncIndex +static void* const _hook_funcs[] = { + (void*)recv_hook, + (void*)send_hook, + (void*)recvfrom_hook, + (void*)sendto_hook, + (void*)readv_hook, + (void*)writev_hook, +}; + +// ImportId values for each socket function, indexed by SocketFuncIndex +static const ImportId _import_ids[] = { + im_recv, im_send, im_recvfrom, im_sendto, im_readv, im_writev, +}; + +bool SocketPatcher::isNettyNativeLibrary(const char* name) { + if (name == nullptr) return false; + // Find basename + const char* base = strrchr(name, '/'); + base = (base != nullptr) ? base + 1 : name; + return strstr(base, "libnetty_transport_native") != nullptr; +} + +void SocketPatcher::patch_library_unlocked(CodeCache* lib) { + if (!isNettyNativeLibrary(lib->name())) { + return; + } + + // Check if already patched + for (int i = 0; i < _size; i++) { + if (_patched_libs[i]._lib == lib) { + return; + } + } + + if (_size >= MAX_SOCKET_PATCHED_LIBS) { + Log::warn("SocketPatcher: too many Netty libraries, skipping %s", lib->name()); + return; + } + + SocketPatchSet& ps = _patched_libs[_size]; + ps._lib = lib; + int patched_count = 0; + + for (int i = 0; i < 6; i++) { + void** location = (void**)lib->findImport(_import_ids[i]); + if (location != nullptr) { + ps._locations[i] = location; + ps._originals[i] = __atomic_load_n(location, __ATOMIC_RELAXED); + __atomic_store_n(location, _hook_funcs[i], __ATOMIC_RELAXED); + patched_count++; + } else { + ps._locations[i] = nullptr; + ps._originals[i] = nullptr; + } + } + + // Always increment to record that this library was examined, + // preventing redundant re-scanning on subsequent patch_libraries() calls. + _size++; + if (patched_count > 0) { + TEST_LOG("SocketPatcher: patching %s (%d functions)", lib->name(), patched_count); + } +} + +void SocketPatcher::patch_libraries() { + if (!Profiler::instance()->nativeSockets()) { + TEST_LOG("SocketPatcher: disabled by nativesock=false"); + return; + } + + const CodeCacheArray& native_libs = Libraries::instance()->native_libs(); + int num_of_libs = native_libs.count(); + ExclusiveLockGuard locker(&_lock); + for (int i = 0; i < num_of_libs; i++) { + CodeCache* lib = native_libs.at(i); + patch_library_unlocked(lib); + } +} + +void SocketPatcher::unpatch_libraries() { + ExclusiveLockGuard locker(&_lock); + for (int i = 0; i < _size; i++) { + SocketPatchSet& ps = _patched_libs[i]; + for (int j = 0; j < 6; j++) { + if (ps._locations[j] != nullptr) { + __atomic_store_n(ps._locations[j], ps._originals[j], __ATOMIC_RELAXED); + } + } + } + TEST_LOG("SocketPatcher: restored %d libraries", _size); + _size = 0; +} + +#endif // __linux__ diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index c49b4479e..5790ed34d 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -267,6 +267,39 @@ public boolean isThresholdExceeded(long thresholdMillis, long startTicks, long e return endTicks - startTicks > thresholdMillis * TSCFrequencyHolder.FREQUENCY / 1000; } + /** Operation constant for recv(). */ + public static final int SOCKET_OP_RECV = 0; + /** Operation constant for send(). */ + public static final int SOCKET_OP_SEND = 1; + /** Operation constant for recvfrom(). */ + public static final int SOCKET_OP_RECVFROM = 2; + /** Operation constant for sendto(). */ + public static final int SOCKET_OP_SENDTO = 3; + /** Operation constant for readv(). */ + public static final int SOCKET_OP_READV = 4; + /** Operation constant for writev(). */ + public static final int SOCKET_OP_WRITEV = 5; + + /** + * Records a native socket I/O event. Intended for use by Netty instrumentation + * or other native socket interceptors to feed events into the profiler's JFR output. + *

+ * Invalid parameters (fd < 0, bytes < 0, operation outside [0..5]) are silently ignored. + * + * @param startTicks start timestamp from {@link #getCurrentTicks()} + * @param endTicks end timestamp from {@link #getCurrentTicks()} + * @param fd file descriptor + * @param bytes number of bytes transferred + * @param operation one of SOCKET_OP_* constants + */ + public void recordNativeSocketEvent(long startTicks, long endTicks, + int fd, long bytes, int operation) { + if (fd < 0 || bytes < 0 || operation < SOCKET_OP_RECV || operation > SOCKET_OP_WRITEV) { + return; + } + recordNativeSocketEvent0(startTicks, endTicks, fd, bytes, operation); + } + /** * Records when queueing ended * @param task the name of the enqueue task @@ -335,6 +368,8 @@ private static ThreadContext initializeThreadContext() { private static native void recordSettingEvent0(String name, String value, String unit); + private static native void recordNativeSocketEvent0(long startTicks, long endTicks, int fd, long bytes, int operation); + private static native void recordQueueEnd0(long startTicks, long endTicks, String task, String scheduler, Thread origin, String queueType, int queueLength); private static native long currentTicks0(); diff --git a/ddprof-lib/src/test/cpp/test_socketPatcher.cpp b/ddprof-lib/src/test/cpp/test_socketPatcher.cpp new file mode 100644 index 000000000..8adeca2a7 --- /dev/null +++ b/ddprof-lib/src/test/cpp/test_socketPatcher.cpp @@ -0,0 +1,63 @@ +/* + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "gtest/gtest.h" +#include "event.h" +#include "codeCache.h" + +// Tests for NativeSocketEvent data structures and SocketPatcher name matching. +// PLT patching integration requires Linux and is tested via Java integration tests. + +TEST(NativeSocketEvent, StructLayout) { + NativeSocketEvent event; + event._start = 1000; + event._end = 2000; + event._fd = 42; + event._bytes = 1024; + event._operation = SOCKET_OP_RECV; + + EXPECT_EQ(event._start, 1000ULL); + EXPECT_EQ(event._end, 2000ULL); + EXPECT_EQ(event._fd, 42); + EXPECT_EQ(event._bytes, 1024ULL); + EXPECT_EQ(event._operation, (u32)SOCKET_OP_RECV); +} + +TEST(NativeSocketEvent, OperationEnumValues) { + EXPECT_EQ(SOCKET_OP_RECV, 0); + EXPECT_EQ(SOCKET_OP_SEND, 1); + EXPECT_EQ(SOCKET_OP_RECVFROM, 2); + EXPECT_EQ(SOCKET_OP_SENDTO, 3); + EXPECT_EQ(SOCKET_OP_READV, 4); + EXPECT_EQ(SOCKET_OP_WRITEV, 5); +} + +TEST(NativeSocketEvent, ImportIdBounds) { + // Socket imports should be contiguous and within NUM_IMPORTS + EXPECT_LT(im_recv, NUM_IMPORTS); + EXPECT_LT(im_send, NUM_IMPORTS); + EXPECT_LT(im_recvfrom, NUM_IMPORTS); + EXPECT_LT(im_sendto, NUM_IMPORTS); + EXPECT_LT(im_readv, NUM_IMPORTS); + EXPECT_LT(im_writev, NUM_IMPORTS); +} + +#ifdef __linux__ +#include "socketPatcher.h" + +TEST(SocketPatcher, NettyNameMatching) { + EXPECT_TRUE(SocketPatcher::isNettyNativeLibrary( + "/tmp/libnetty_transport_native_epoll_linux_x86_64.so")); + EXPECT_TRUE(SocketPatcher::isNettyNativeLibrary( + "/tmp/libnetty_transport_native_kqueue.jnilib")); + EXPECT_TRUE(SocketPatcher::isNettyNativeLibrary( + "libnetty_transport_native_io_uring.so")); + EXPECT_FALSE(SocketPatcher::isNettyNativeLibrary( + "/usr/lib/libc.so.6")); + EXPECT_FALSE(SocketPatcher::isNettyNativeLibrary( + "/path/to/libjavaProfiler.so")); + EXPECT_FALSE(SocketPatcher::isNettyNativeLibrary(nullptr)); +} +#endif diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/socket/NativeSocketEventTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/socket/NativeSocketEventTest.java new file mode 100644 index 000000000..40af9f318 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/socket/NativeSocketEventTest.java @@ -0,0 +1,145 @@ +package com.datadoghq.profiler.socket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.JavaProfiler; +import org.junit.jupiter.api.Test; +import org.openjdk.jmc.common.item.IAttribute; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.common.unit.IQuantity; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.openjdk.jmc.common.item.Attribute.attr; +import static org.openjdk.jmc.common.unit.UnitLookup.NUMBER; + +/** + * Tests for native socket event recording via the Java API. + */ +public class NativeSocketEventTest extends AbstractProfilerTest { + private static final IAttribute FD_ATTR = + attr("fd", "", "", NUMBER); + private static final IAttribute BYTES_ATTR = + attr("bytesTransferred", "", "", NUMBER); + private static final IAttribute OPERATION_ATTR = + attr("operation", "", "", NUMBER); + + @Override + protected String getProfilerCommand() { + return "cpu=10ms"; + } + + @Test + public void testRecordSocketEvent() throws Exception { + long start = profiler.getCurrentTicks(); + Thread.sleep(1); + long end = profiler.getCurrentTicks(); + + // Record a recv event + profiler.recordNativeSocketEvent(start, end, 42, 1024, JavaProfiler.SOCKET_OP_RECV); + + // Record a send event + start = profiler.getCurrentTicks(); + Thread.sleep(1); + end = profiler.getCurrentTicks(); + profiler.recordNativeSocketEvent(start, end, 42, 512, JavaProfiler.SOCKET_OP_SEND); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + int count = 0; + for (IItemIterable it : events) { + IMemberAccessor fdAccessor = FD_ATTR.getAccessor(it.getType()); + IMemberAccessor bytesAccessor = BYTES_ATTR.getAccessor(it.getType()); + IMemberAccessor opAccessor = OPERATION_ATTR.getAccessor(it.getType()); + for (IItem item : it) { + long fd = fdAccessor.getMember(item).longValue(); + long bytes = bytesAccessor.getMember(item).longValue(); + long op = opAccessor.getMember(item).longValue(); + assertEquals(42, fd); + assertTrue(bytes > 0); + assertTrue(op >= JavaProfiler.SOCKET_OP_RECV && op <= JavaProfiler.SOCKET_OP_WRITEV); + count++; + } + } + assertEquals(2, count, "Expected exactly 2 socket events (recv + send)"); + } + + @Test + public void testAllOperationTypes() throws Exception { + for (int op = JavaProfiler.SOCKET_OP_RECV; op <= JavaProfiler.SOCKET_OP_WRITEV; op++) { + long start = profiler.getCurrentTicks(); + Thread.sleep(1); + long end = profiler.getCurrentTicks(); + profiler.recordNativeSocketEvent(start, end, 10 + op, 100 * (op + 1), op); + } + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + int count = 0; + boolean[] seenOps = new boolean[6]; + for (IItemIterable it : events) { + IMemberAccessor opAccessor = OPERATION_ATTR.getAccessor(it.getType()); + for (IItem item : it) { + int op = (int) opAccessor.getMember(item).longValue(); + seenOps[op] = true; + count++; + } + } + assertEquals(6, count, "Expected 6 socket events, one per operation type"); + for (int i = 0; i < 6; i++) { + assertTrue(seenOps[i], "Missing operation type " + i); + } + } + + @Test + public void testContextPropagation() throws Exception { + profiler.setContext(123, 456); + long start = profiler.getCurrentTicks(); + Thread.sleep(1); + long end = profiler.getCurrentTicks(); + profiler.recordNativeSocketEvent(start, end, 7, 256, JavaProfiler.SOCKET_OP_RECV); + profiler.clearContext(); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + for (IItemIterable it : events) { + IMemberAccessor spanAccessor = SPAN_ID.getAccessor(it.getType()); + IMemberAccessor rootAccessor = LOCAL_ROOT_SPAN_ID.getAccessor(it.getType()); + for (IItem item : it) { + assertEquals(123, spanAccessor.getMember(item).longValue()); + assertEquals(456, rootAccessor.getMember(item).longValue()); + } + } + } + + @Test + public void testInvalidParametersAreRejected() throws Exception { + // Record one valid event + long start = profiler.getCurrentTicks(); + Thread.sleep(1); + long end = profiler.getCurrentTicks(); + profiler.recordNativeSocketEvent(start, end, 5, 100, JavaProfiler.SOCKET_OP_RECV); + + // Record invalid events — all should be silently dropped + profiler.recordNativeSocketEvent(start, end, -1, 100, JavaProfiler.SOCKET_OP_RECV); // negative fd + profiler.recordNativeSocketEvent(start, end, 5, -1, JavaProfiler.SOCKET_OP_RECV); // negative bytes + profiler.recordNativeSocketEvent(start, end, 5, 100, -1); // negative op + profiler.recordNativeSocketEvent(start, end, 5, 100, 6); // op out of range + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + int count = 0; + for (IItemIterable it : events) { + for (IItem item : it) { + count++; + } + } + assertEquals(1, count, "Only the valid event should be recorded"); + } +}