Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,14 @@ def _convert_workflow_event_to_agent_update(
return update
return None

case WorkflowOutputEvent(data=data, source_executor_id=source_executor_id):
case WorkflowOutputEvent(data=data, executor_id=executor_id):
# Convert workflow output to an agent response update.
# Handle different data types appropriately.

# Skip AgentResponse from AgentExecutor with output_response=True
# since streaming events already surfaced the content.
if isinstance(data, AgentResponse):
executor = self.workflow.executors.get(source_executor_id)
executor = self.workflow.executors.get(executor_id)
if isinstance(executor, AgentExecutor) and executor.output_response:
return None

Expand All @@ -332,7 +332,7 @@ def _convert_workflow_event_to_agent_update(
return AgentResponseUpdate(
contents=list(data.contents),
role=data.role,
author_name=data.author_name or source_executor_id,
author_name=data.author_name or executor_id,
response_id=response_id,
message_id=str(uuid.uuid4()),
created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
Expand All @@ -344,7 +344,7 @@ def _convert_workflow_event_to_agent_update(
return AgentResponseUpdate(
contents=contents,
role=Role.ASSISTANT,
author_name=source_executor_id,
author_name=executor_id,
response_id=response_id,
message_id=str(uuid.uuid4()),
created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
Expand Down
8 changes: 4 additions & 4 deletions python/packages/core/agent_framework/_workflows/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,20 +278,20 @@ class WorkflowOutputEvent(WorkflowEvent):
def __init__(
self,
data: Any,
source_executor_id: str,
executor_id: str,
):
"""Initialize the workflow output event.

Args:
data: The output yielded by the executor.
source_executor_id: ID of the executor that yielded the output.
executor_id: ID of the executor that yielded the output.
"""
super().__init__(data)
self.source_executor_id = source_executor_id
self.executor_id = executor_id

def __repr__(self) -> str:
"""Return a string representation of the workflow output event."""
return f"{self.__class__.__name__}(data={self.data}, source_executor_id={self.source_executor_id})"
return f"{self.__class__.__name__}(data={self.data}, executor_id={self.executor_id})"


class SuperStepEvent(WorkflowEvent):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ async def yield_output(self, output: T_W_Out) -> None:
self._yielded_outputs.append(copy.deepcopy(output))

with _framework_event_origin():
event = WorkflowOutputEvent(data=output, source_executor_id=self._executor_id)
event = WorkflowOutputEvent(data=output, executor_id=self._executor_id)
await self._runner_context.add_event(event)

async def add_event(self, event: WorkflowEvent) -> None:
Expand Down
4 changes: 2 additions & 2 deletions python/packages/devui/agent_framework_devui/_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) ->
# Handle WorkflowOutputEvent separately to preserve output data
if event_class == "WorkflowOutputEvent":
output_data = getattr(event, "data", None)
source_executor_id = getattr(event, "source_executor_id", "unknown")
executor_id = getattr(event, "executor_id", "unknown")

if output_data is not None:
# Import required types
Expand Down Expand Up @@ -942,7 +942,7 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) ->
# Emit output_item.added for each yield_output
logger.debug(
f"WorkflowOutputEvent converted to output_item.added "
f"(executor: {source_executor_id}, length: {len(text)})"
f"(executor: {executor_id}, length: {len(text)})"
)
return [
ResponseOutputItemAddedEvent(
Expand Down
5 changes: 2 additions & 3 deletions python/packages/devui/frontend/src/types/agent-framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ export interface AgentThread {
export interface WorkflowEvent {
type?: string; // Event class name like "WorkflowOutputEvent", "WorkflowCompletedEvent", "ExecutorInvokedEvent", etc.
data?: unknown;
executor_id?: string; // Present for executor-related events
source_executor_id?: string; // Present for WorkflowOutputEvent
executor_id?: string; // Present for executor-related events and WorkflowOutputEvent
}

export interface WorkflowStartedEvent extends WorkflowEvent {
Expand All @@ -286,7 +285,7 @@ export interface WorkflowCompletedEvent extends WorkflowEvent {
export interface WorkflowOutputEvent extends WorkflowEvent {
// Event-specific data for workflow output (new)
readonly event_type: "workflow_output";
source_executor_id: string; // ID of executor that yielded the output
executor_id: string; // ID of executor that yielded the output
}

export interface WorkflowWarningEvent extends WorkflowEvent {
Expand Down
4 changes: 2 additions & 2 deletions python/packages/devui/tests/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ async def test_workflow_output_event(mapper: MessageMapper, test_request: AgentF
"""Test WorkflowOutputEvent is converted to output_item.added."""
from agent_framework._workflows._events import WorkflowOutputEvent

event = WorkflowOutputEvent(data="Final workflow output", source_executor_id="final_executor")
event = WorkflowOutputEvent(data="Final workflow output", executor_id="final_executor")
events = await mapper.convert_event(event, test_request)

# WorkflowOutputEvent should emit output_item.added
Expand All @@ -607,7 +607,7 @@ async def test_workflow_output_event_with_list_data(mapper: MessageMapper, test_
ChatMessage(role=Role.USER, contents=[TextContent(text="Hello")]),
ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="World")]),
]
event = WorkflowOutputEvent(data=messages, source_executor_id="complete")
event = WorkflowOutputEvent(data=messages, executor_id="complete")
events = await mapper.convert_event(event, test_request)

assert len(events) == 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def main():
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH
"""

Expand Down
Loading