Skip to content

Commit 12ce017

Browse files
authored
feat: Support AgentExectuor enqueue of a Task object. (#960)
Fixes Task object handling when using new DefaultRequestHandlerV2. Fixes #869🦕
1 parent 62e5e59 commit 12ce017

3 files changed

Lines changed: 101 additions & 7 deletions

File tree

src/a2a/server/agent_execution/active_task.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,11 +391,22 @@ async def _run_consumer(self) -> None: # noqa: PLR0915, PLR0912
391391
)
392392

393393
if isinstance(event, Task):
394-
new_task = event
395-
await self._task_manager.save_task_event(
396-
new_task
394+
existing_task = (
395+
await self._task_manager.get_task()
397396
)
398-
# TODO: Avoid duplicated messages
397+
if existing_task:
398+
logger.error(
399+
'Task %s already exists. Ignoring task replacement.',
400+
self._task_id,
401+
)
402+
else:
403+
await (
404+
self._task_manager.save_task_event(
405+
event
406+
)
407+
)
408+
# Initial task should already contain the message.
409+
message_to_save = None
399410
else:
400411
new_task = (
401412
await self._task_manager.ensure_task_id(

tests/integration/test_end_to_end.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from a2a.server.agent_execution import AgentExecutor, RequestContext
1414
from a2a.server.events import EventQueue
1515
from a2a.server.events.in_memory_queue_manager import InMemoryQueueManager
16-
from a2a.server.request_handlers import GrpcHandler, LegacyRequestHandler
16+
from a2a.server.request_handlers import GrpcHandler, DefaultRequestHandler
1717
from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
1818
from a2a.server.routes.rest_routes import create_rest_routes
1919
from a2a.server.tasks import TaskUpdater
@@ -174,8 +174,7 @@ class ClientSetup(NamedTuple):
174174
@pytest.fixture
175175
def base_e2e_setup(agent_card):
176176
task_store = InMemoryTaskStore()
177-
# TODO(https://github.com/a2aproject/a2a-python/issues/869): Use DefaultRequestHandler once it's fixed
178-
handler = LegacyRequestHandler(
177+
handler = DefaultRequestHandler(
179178
agent_executor=MockAgentExecutor(),
180179
task_store=task_store,
181180
agent_card=agent_card,

tests/integration/test_scenarios.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1558,3 +1558,87 @@ async def cancel(
15581558
'Bug: Task should include the published artifact'
15591559
)
15601560
assert last_event.task.artifacts[0].artifact_id == 'art-1'
1561+
1562+
1563+
# Scenario: Enqueue Task twice
1564+
@pytest.mark.timeout(2.0)
1565+
@pytest.mark.asyncio
1566+
@pytest.mark.parametrize('use_legacy', [False, True], ids=['v2', 'legacy'])
1567+
@pytest.mark.parametrize(
1568+
'streaming', [False, True], ids=['blocking', 'streaming']
1569+
)
1570+
async def test_scenario_enqueue_task_twice(caplog, use_legacy, streaming):
1571+
class DoubleTaskAgent(AgentExecutor):
1572+
async def execute(
1573+
self, context: RequestContext, event_queue: EventQueue
1574+
):
1575+
task1 = Task(
1576+
id=context.task_id,
1577+
context_id=context.context_id,
1578+
status=TaskStatus(
1579+
state=TaskState.TASK_STATE_WORKING,
1580+
message=Message(parts=[Part(text='First task')]),
1581+
),
1582+
)
1583+
await event_queue.enqueue_event(task1)
1584+
1585+
# This is undefined behavior, but it should not crash or hang.
1586+
task2 = Task(
1587+
id=context.task_id,
1588+
context_id=context.context_id,
1589+
status=TaskStatus(
1590+
state=TaskState.TASK_STATE_WORKING,
1591+
message=Message(parts=[Part(text='Second task')]),
1592+
),
1593+
)
1594+
await event_queue.enqueue_event(task2)
1595+
1596+
await event_queue.enqueue_event(
1597+
TaskStatusUpdateEvent(
1598+
task_id=context.task_id,
1599+
context_id=context.context_id,
1600+
status=TaskStatus(state=TaskState.TASK_STATE_COMPLETED),
1601+
)
1602+
)
1603+
1604+
async def cancel(
1605+
self, context: RequestContext, event_queue: EventQueue
1606+
):
1607+
pass
1608+
1609+
handler = create_handler(DoubleTaskAgent(), use_legacy)
1610+
client = await create_client(
1611+
handler, agent_card=agent_card(), streaming=streaming
1612+
)
1613+
1614+
msg = Message(
1615+
message_id='test-msg', role=Role.ROLE_USER, parts=[Part(text='start')]
1616+
)
1617+
1618+
it = client.send_message(
1619+
SendMessageRequest(
1620+
message=msg,
1621+
configuration=SendMessageConfiguration(return_immediately=False),
1622+
)
1623+
)
1624+
events = [event async for event in it]
1625+
1626+
(final_task,) = (await client.list_tasks(ListTasksRequest())).tasks
1627+
1628+
if use_legacy:
1629+
assert [part.text for part in final_task.history[0].parts] == [
1630+
'Second task'
1631+
]
1632+
else:
1633+
assert [part.text for part in final_task.history[0].parts] == [
1634+
'First task'
1635+
]
1636+
1637+
# Validate that new version logs with error exactly once 'Ignoring task replacement'
1638+
error_logs = [
1639+
record.message
1640+
for record in caplog.records
1641+
if record.levelname == 'ERROR'
1642+
and 'Ignoring task replacement' in record.message
1643+
]
1644+
assert len(error_logs) == 1

0 commit comments

Comments
 (0)