Skip to content

Commit dbb9146

Browse files
smflorentinoclaude
andcommitted
fix: isolate per-execution log buffers for parallel eval runs
When UIPATH_JOB_KEY is set and eval runs execute in parallel, two bugs caused jumbled and truncated log output in the execution log file: 1. **Interleaved output**: Each child interceptor replaced the global sys.stdout/sys.stderr with a new LoggerWriter, stomping the previous one. All concurrent async tasks wrote to the last-installed writer's single shared buffer, merging partial lines from different executions. 2. **Truncated output**: LoggerWriter.buffer was never flushed before teardown. Partial lines (without a trailing newline) were silently discarded when the handler was closed. Fix: - LoggerWriter now maintains per-context buffers keyed by current_execution_id, so concurrent tasks never share a buffer. - Child interceptors no longer replace sys.stdout/sys.stderr — only the master interceptor owns the global streams. Children register their handler on the stdout/stderr loggers; the existing filter system routes records to the correct handler. - Teardown flushes buffers before clearing context and removing handlers, so partial lines are never lost. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0d3231a commit dbb9146

4 files changed

Lines changed: 355 additions & 37 deletions

File tree

src/uipath/runtime/logging/_interceptor.py

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,16 @@ def setup(self) -> None:
148148
# logger.propagate remains True (default)
149149
self.patched_loggers.add(logger_name)
150150

151-
# Child executions should redirect stdout/stderr to their own handler
152-
# This ensures print statements are captured per execution
153-
self._redirect_stdout_stderr()
151+
# Register our handler on stdout/stderr loggers so that
152+
# print() output routed through the master's LoggerWriter
153+
# is captured per-execution via filters.
154+
# We do NOT replace sys.stdout/sys.stderr — the master owns those.
155+
if not isinstance(sys.stdout, LoggerWriter):
156+
self.logger.warning(
157+
"Child interceptor set up without a master LoggerWriter on sys.stdout. "
158+
"print() output will not be captured for this execution context."
159+
)
160+
self._register_stdout_stderr_handlers()
154161
else:
155162
# Master execution mode: remove all handlers and add only ours
156163
self._clean_all_handlers(self.root_logger)
@@ -165,28 +172,33 @@ def setup(self) -> None:
165172
# Master redirects stdout/stderr
166173
self._redirect_stdout_stderr()
167174

175+
def _register_stdout_stderr_handlers(self) -> None:
176+
"""Register our handler on stdout/stderr loggers without replacing the streams."""
177+
stdout_logger = logging.getLogger("stdout")
178+
stderr_logger = logging.getLogger("stderr")
179+
180+
stdout_logger.propagate = False
181+
stderr_logger.propagate = False
182+
183+
if self.log_handler not in stdout_logger.handlers:
184+
stdout_logger.addHandler(self.log_handler)
185+
if self.log_handler not in stderr_logger.handlers:
186+
stderr_logger.addHandler(self.log_handler)
187+
168188
def _redirect_stdout_stderr(self) -> None:
169-
"""Redirect stdout and stderr to the logging system."""
170-
# Set up stdout and stderr loggers
189+
"""Redirect stdout and stderr to the logging system.
190+
191+
Only called by master execution mode. Replaces sys.stdout/sys.stderr
192+
with LoggerWriter instances that route output through the logging system.
193+
"""
171194
stdout_logger = logging.getLogger("stdout")
172195
stderr_logger = logging.getLogger("stderr")
173196

174-
if self.execution_id:
175-
# Child execution: add our handler to stdout/stderr loggers
176-
stdout_logger.propagate = False
177-
stderr_logger.propagate = False
178-
179-
if self.log_handler not in stdout_logger.handlers:
180-
stdout_logger.addHandler(self.log_handler)
181-
if self.log_handler not in stderr_logger.handlers:
182-
stderr_logger.addHandler(self.log_handler)
183-
else:
184-
# Master execution: clean and set up handlers
185-
stdout_logger.propagate = False
186-
stderr_logger.propagate = False
197+
stdout_logger.propagate = False
198+
stderr_logger.propagate = False
187199

188-
self._clean_all_handlers(stdout_logger)
189-
self._clean_all_handlers(stderr_logger)
200+
self._clean_all_handlers(stdout_logger)
201+
self._clean_all_handlers(stderr_logger)
190202

191203
# Use the min_level in the LoggerWriter to filter messages
192204
sys.stdout = LoggerWriter(
@@ -198,15 +210,29 @@ def _redirect_stdout_stderr(self) -> None:
198210

199211
def teardown(self) -> None:
200212
"""Restore original logging configuration."""
201-
# Clear the context variable
213+
# Step 1: Flush LoggerWriter buffers while context and handlers are still active.
214+
# Child mode: flush only this context's buffer from the shared LoggerWriter.
215+
# Master mode: flush ALL remaining buffers before restoring streams.
216+
if self.execution_id:
217+
if isinstance(sys.stdout, LoggerWriter):
218+
sys.stdout.flush()
219+
if isinstance(sys.stderr, LoggerWriter):
220+
sys.stderr.flush()
221+
else:
222+
if isinstance(sys.stdout, LoggerWriter):
223+
sys.stdout.flush_all()
224+
if isinstance(sys.stderr, LoggerWriter):
225+
sys.stderr.flush_all()
226+
227+
# Step 2: Clear the context variable (after flush used it)
202228
if self.execution_id:
203229
current_execution_id.set(None)
204230

205-
# Restore the original disable level
231+
# Step 3: Restore the original disable level
206232
if not self.execution_id:
207233
logging.disable(self.original_disable_level)
208234

209-
# Remove our handler and filter
235+
# Step 4: Remove our handler and filter
210236
if self.execution_filter:
211237
self.log_handler.removeFilter(self.execution_filter)
212238

@@ -240,8 +266,8 @@ def teardown(self) -> None:
240266
if self._owns_handler:
241267
self.log_handler.close()
242268

243-
# Only restore streams if we redirected them
244-
if self.original_stdout and self.original_stderr:
269+
# Step 5: Only master restores streams (children never replaced them)
270+
if not self.execution_id and self.original_stdout and self.original_stderr:
245271
sys.stdout = self.original_stdout
246272
sys.stderr = self.original_stderr
247273

src/uipath/runtime/logging/_writers.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,15 @@
33
import logging
44
from typing import TextIO
55

6+
from uipath.runtime.logging._context import current_execution_id
7+
68

79
class LoggerWriter:
8-
"""Redirect stdout/stderr to logging system."""
10+
"""Redirect stdout/stderr to logging system.
11+
12+
Maintains per-execution-context buffers so that concurrent async tasks
13+
(e.g. parallel eval runs) do not interleave partial lines.
14+
"""
915

1016
def __init__(
1117
self,
@@ -18,7 +24,7 @@ def __init__(
1824
self.logger = logger
1925
self.level = level
2026
self.min_level = min_level
21-
self.buffer = ""
27+
self._buffers: dict[str | None, str] = {}
2228
self.sys_file = sys_file
2329
self._in_logging = False # Recursion guard
2430

@@ -35,17 +41,22 @@ def write(self, message: str) -> None:
3541

3642
try:
3743
self._in_logging = True
38-
self.buffer += message
39-
while "\n" in self.buffer:
40-
line, self.buffer = self.buffer.split("\n", 1)
44+
ctx = current_execution_id.get()
45+
buf = self._buffers.get(ctx, "") + message
46+
while "\n" in buf:
47+
line, buf = buf.split("\n", 1)
4148
# Only log if the message is not empty and the level is sufficient
4249
if line and self.level >= self.min_level:
4350
self.logger._log(self.level, line, ())
51+
if buf:
52+
self._buffers[ctx] = buf
53+
else:
54+
self._buffers.pop(ctx, None)
4455
finally:
4556
self._in_logging = False
4657

4758
def flush(self) -> None:
48-
"""Flush any remaining buffered messages to the logger."""
59+
"""Flush the current execution context's buffered messages to the logger."""
4960
if self._in_logging:
5061
if self.sys_file:
5162
try:
@@ -56,10 +67,24 @@ def flush(self) -> None:
5667

5768
try:
5869
self._in_logging = True
59-
# Log any remaining content in the buffer on flush
60-
if self.buffer and self.level >= self.min_level:
61-
self.logger._log(self.level, self.buffer, ())
62-
self.buffer = ""
70+
ctx = current_execution_id.get()
71+
buf = self._buffers.pop(ctx, "")
72+
if buf and self.level >= self.min_level:
73+
self.logger._log(self.level, buf, ())
74+
finally:
75+
self._in_logging = False
76+
77+
def flush_all(self) -> None:
78+
"""Flush all execution contexts' buffered messages. Called by master teardown."""
79+
if self._in_logging:
80+
return
81+
82+
try:
83+
self._in_logging = True
84+
for buf in self._buffers.values():
85+
if buf and self.level >= self.min_level:
86+
self.logger._log(self.level, buf, ())
87+
self._buffers.clear()
6388
finally:
6489
self._in_logging = False
6590

tests/test_executor.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Simple test for runtime factory and executor span capture."""
22

3+
import logging
4+
import sys
35
from typing import Any, AsyncGenerator, TypeVar
46

57
import pytest
@@ -13,6 +15,7 @@
1315
UiPathRuntimeProtocol,
1416
)
1517
from uipath.runtime.base import UiPathStreamOptions
18+
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
1619
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
1720
from uipath.runtime.schema import UiPathRuntimeSchema
1821

@@ -119,9 +122,32 @@ async def new_runtime(
119122
return self.runtime_class()
120123

121124

125+
@pytest.fixture(autouse=True)
126+
def _isolate_logging():
127+
"""Save and restore logging state so tests don't leak into each other."""
128+
root = logging.getLogger()
129+
original_level = root.level
130+
original_handlers = list(root.handlers)
131+
original_stdout = sys.stdout
132+
original_stderr = sys.stderr
133+
yield
134+
root.setLevel(original_level)
135+
root.handlers = original_handlers
136+
sys.stdout = original_stdout
137+
sys.stderr = original_stderr
138+
logging.disable(logging.NOTSET)
139+
140+
122141
@pytest.mark.asyncio
123-
async def test_multiple_factories_same_executor():
142+
async def test_multiple_factories_same_executor(tmp_path):
124143
"""Test factories using same trace manager, verify spans are captured correctly."""
144+
# Set up a master interceptor so that sys.stdout is a LoggerWriter,
145+
# matching real production usage where UiPathRuntimeContext provides one.
146+
master = UiPathRuntimeLogsInterceptor(
147+
job_id="test-job", dir=str(tmp_path), file="test.log"
148+
)
149+
master.setup()
150+
125151
trace_manager = UiPathTraceManager()
126152

127153
# Create factories for different runtimes
@@ -228,3 +254,5 @@ async def test_multiple_factories_same_executor():
228254
assert execution_runtime_c.log_handler
229255
assert len(execution_runtime_c.log_handler.buffer) > 0
230256
assert execution_runtime_c.log_handler.buffer[0].msg == "executing {'input': 'c'}"
257+
258+
master.teardown()

0 commit comments

Comments
 (0)