Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-runtime"
version = "0.9.3"
version = "0.10.0"
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
4 changes: 4 additions & 0 deletions src/uipath/runtime/chat/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ async def emit_exchange_end_event(self) -> None:
"""Send an exchange end event."""
...

async def emit_exchange_error_event(self, error: Exception) -> None:
"""Emit an exchange error event."""
...

async def wait_for_resume(self) -> dict[str, Any]:
"""Wait for the interrupt_end event to be received."""
...
132 changes: 81 additions & 51 deletions src/uipath/runtime/chat/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
UiPathStreamOptions,
)
from uipath.runtime.chat.protocol import UiPathChatProtocol
from uipath.runtime.errors import UiPathBaseRuntimeError
from uipath.runtime.errors.contract import UiPathErrorCategory
from uipath.runtime.events import (
UiPathRuntimeEvent,
UiPathRuntimeMessageEvent,
Expand Down Expand Up @@ -65,62 +67,90 @@ async def stream(
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream execution events with chat support."""
await self.chat_bridge.connect()

execution_completed = False
current_input = input
current_options = UiPathStreamOptions(
resume=options.resume if options else False,
breakpoints=options.breakpoints if options else None,
)

while not execution_completed:
async for event in self.delegate.stream(
current_input, options=current_options
):
if isinstance(event, UiPathRuntimeMessageEvent):
if event.payload:
await self.chat_bridge.emit_message_event(event.payload)

if isinstance(event, UiPathRuntimeResult):
runtime_result = event

if (
runtime_result.status == UiPathRuntimeStatus.SUSPENDED
and runtime_result.triggers
):
api_triggers = [
t
for t in runtime_result.triggers
if t.trigger_type == UiPathResumeTriggerType.API
]

if api_triggers:
resume_map: dict[str, Any] = {}

for trigger in api_triggers:
await self.chat_bridge.emit_interrupt_event(trigger)

resume_data = await self.chat_bridge.wait_for_resume()

assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
resume_map[trigger.interrupt_id] = resume_data

current_input = resume_map
current_options.resume = True
break
try:
await self.chat_bridge.connect()

execution_completed = False
current_input = input
current_options = UiPathStreamOptions(
resume=options.resume if options else False,
breakpoints=options.breakpoints if options else None,
)

while not execution_completed:
async for event in self.delegate.stream(
current_input, options=current_options
):
if isinstance(event, UiPathRuntimeMessageEvent):
if event.payload:
await self.chat_bridge.emit_message_event(event.payload)

if isinstance(event, UiPathRuntimeResult):
runtime_result = event

if (
runtime_result.status == UiPathRuntimeStatus.SUSPENDED
and runtime_result.triggers
):
api_triggers = [
t
for t in runtime_result.triggers
if t.trigger_type == UiPathResumeTriggerType.API
]

if api_triggers:
resume_map: dict[str, Any] = {}

for trigger in api_triggers:
await self.chat_bridge.emit_interrupt_event(trigger)

resume_data = (
await self.chat_bridge.wait_for_resume()
)

assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
resume_map[trigger.interrupt_id] = resume_data

current_input = resume_map
current_options.resume = True
break
else:
# No API triggers - yield result and complete
yield event
execution_completed = True
elif runtime_result.status == UiPathRuntimeStatus.FAULTED:
error = runtime_result.error
faulted_error = UiPathBaseRuntimeError(
code=error.code if error else "UNKNOWN",
title=error.title if error else "Unknown Error",
detail=error.detail if error else "",
category=error.category
if error
else UiPathErrorCategory.UNKNOWN,
status=error.status if error else None,
)
await self._emit_error_event(faulted_error)
yield event
execution_completed = True
else:
# No API triggers - yield result and complete
yield event
execution_completed = True
await self.chat_bridge.emit_exchange_end_event()
else:
yield event
execution_completed = True
await self.chat_bridge.emit_exchange_end_event()
else:
yield event

except Exception as e:
await self._emit_error_event(e)
raise

async def _emit_error_event(self, error: Exception) -> None:
"""Emit an exchange error event to the chat bridge."""
try:
await self.chat_bridge.emit_exchange_error_event(error)
except Exception:
logger.warning("Failed to emit exchange error event", exc_info=True)

async def get_schema(self) -> UiPathRuntimeSchema:
"""Get schema from the delegate runtime."""
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading