feat(server): add aclose() to drain ActiveTask background tasks (#1101)#1105
feat(server): add aclose() to drain ActiveTask background tasks (#1101)#1105astrogilda wants to merge 1 commit into
Conversation
…roject#1101) At shutdown the ActiveTask producer can stay pending and surface as "Task was destroyed but it is pending!". The producer's finally closes the subscriber queue with immediate=False, which joins every subscriber sink; an abandoned subscriber leaves an undrained sink so the join never returns and the producer hangs. No public API drains these background tasks today. Add aclose() to ActiveTask, ActiveTaskRegistry, and DefaultRequestHandlerV2. It force-closes the event queues (immediate=True), which releases the wedged producer, then cancels and awaits the producer and consumer tasks. ActiveTask sets _is_finished under _lock so it is mutually exclusive with start(); the registry marks itself closed so get_or_create refuses new work during teardown, closing the orphan-task race. Fixes a2aproject#1101
There was a problem hiding this comment.
Code Review
This pull request introduces aclose() methods to ActiveTask, ActiveTaskRegistry, and DefaultRequestHandlerV2 to facilitate a bounded, force-closed teardown of background tasks and queues during server shutdown, preventing pending task warnings. It also adds comprehensive unit tests to verify the teardown behavior, idempotency, and error handling. The review feedback suggests explicitly shutting down self._request_queue in ActiveTask.aclose() to ensure proper cleanup of all queues if start() was never called or failed early.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| await self._event_queue_agent.close(immediate=True) | ||
| await self._event_queue_subscribers.close(immediate=True) |
There was a problem hiding this comment.
If start() was never called or failed early, the background tasks are never spawned, meaning their finally blocks (which shut down self._request_queue) will not run. Explicitly shutting down self._request_queue in aclose() ensures that all queues are properly cleaned up and any pending operations on the request queue are unblocked.
| await self._event_queue_agent.close(immediate=True) | |
| await self._event_queue_subscribers.close(immediate=True) | |
| await self._event_queue_agent.close(immediate=True) | |
| await self._event_queue_subscribers.close(immediate=True) | |
| self._request_queue.shutdown(immediate=True) |
🧪 Code Coverage (vs
|
| Base | PR | Delta | |
|---|---|---|---|
| src/a2a/server/agent_execution/active_task.py | 95.92% | 95.37% | 🔴 -0.55% |
| src/a2a/server/agent_execution/active_task_registry.py | 93.75% | 96.61% | 🟢 +2.86% |
| src/a2a/server/events/event_queue_v2.py | 91.19% | 91.71% | 🟢 +0.52% |
| src/a2a/server/request_handlers/default_request_handler_v2.py | 94.12% | 94.17% | 🟢 +0.05% |
| src/a2a/utils/telemetry.py | 90.70% | 91.47% | 🟢 +0.78% |
| Total | 92.99% | 93.02% | 🟢 +0.03% |
Generated by coverage-comment.yml
Summary
Adds a public
aclose()teardown toActiveTask,ActiveTaskRegistry, andDefaultRequestHandlerV2that force-drains the SDK-owned producer, consumer,and dispatcher
asyncio.Tasks so none are left pending at event-loop shutdown.ActiveTask.aclose()force-closes both event queues and cancels the producerand consumer tasks, then awaits them. It sets
_is_finishedunder_lock, soit is mutually exclusive with
start()(which refuses to spawn once_is_finishedis set).ActiveTaskRegistry.aclose()marks the registry closed soget_or_createrefuses new work, drains every active task, then awaits the in-flight
_remove_taskcleanup tasks. The lock is released before awaiting because_remove_taskre-acquires it.DefaultRequestHandlerV2.aclose()delegates to the registry drain, for wiringinto an ASGI
lifespan/on_shutdownhook.Why
Fixes #1101. At shutdown the
ActiveTaskproducer can stay pending and surfaceas
Task was destroyed but it is pending!. The producer'sfinallycalls_event_queue_subscribers.close(immediate=False), which awaitsjoin()onevery subscriber sink; an abandoned subscriber leaves an undrained sink, so the
join()never returns and the producer hangs. There is no public way to drainthese background tasks today.
aclose()closes the queues withimmediate=True,which releases the wedged producer, and reaps the tasks.
The teardown always forces rather than exposing a graceful
immediate=Falseoption, because that path inherits the documented
close(immediate=False)deadlock and a shutdown hook must be bounded.
Test plan
uv run pytest tests/server/agent_execution/ tests/server/request_handlers/ tests/server/events/— passuv run pytest --cov=a2a --cov-fail-under=88— pass./scripts/lint.sh— ruff, ruff-format, and ty cleanNew tests cover the registry drain, idempotency, empty registry, new-work
rejected after close, and an errored task being logged rather than propagated;
ActiveTaskreaping a running producer and force-closing past an undrainedsubscriber (the #1101 repro); and the handler drain.
Fixes #1101 🦕