Skip to content

Commit f68b22f

Browse files
feat(server): add async context manager support to EventQueue (#743)
# Description Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Follow the [`CONTRIBUTING` Guide](https://github.com/a2aproject/a2a-python/blob/main/CONTRIBUTING.md). - [x] Make your Pull Request title in the <https://www.conventionalcommits.org/> specification. - Important Prefixes for [release-please](https://github.com/googleapis/release-please): - `fix:` which represents bug fixes, and correlates to a [SemVer](https://semver.org/) patch. - `feat:` represents a new feature, and correlates to a SemVer minor. - `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking change (indicated by the `!`) and will result in a SemVer major. - [x] Ensure the tests and linter pass (Run `bash scripts/format.sh` from the repository root to format) - [x] Appropriate docs were updated (if necessary) Fixes #720 🦕 ## Problem `EventQueue` has a sophisticated `close()` with graceful/immediate modes, child propagation, and cross-version handling — but doesn't support `async with`. Server-side code must use explicit `try/finally` or risk leaking resources on exceptions: ```python queue = EventQueue() try: await queue.enqueue_event(event) ... finally: await queue.close() ``` ## Fix Add `__aenter__` and `__aexit__` as concrete methods on `EventQueue`: * `__aenter__` returns `Self` (via `typing_extensions`). * `__aexit__` delegates to `close()` with default `immediate=False` (graceful). Code needing immediate shutdown can still call `await queue.close(immediate=True)` explicitly. This enables the idiomatic pattern: ```python async with EventQueue() as queue: await queue.enqueue_event(event) ... # close() called automatically, even on exceptions ``` Unlike the client-side hierarchy where `__aenter__`/`__aexit__` were lifted to the abstract `Client` (#719), `EventQueue` is a concrete class with no abstract base above it — `QueueManager` manages queue lifecycles by task ID but does not wrap or extend `EventQueue`. This is the correct and only place for these methods. Non-breaking, additive change. Manual `close()` and `try/finally` continue to work as before. Follows the pattern established in `ClientTransport` (#682), `BaseClient` (#688), and `Client` (#719). ## Tests Two tests added to `tests/server/events/test_event_queue.py`, following the same approach as `ClientTransport` and `BaseClient`.
1 parent fd0a1bd commit f68b22f

2 files changed

Lines changed: 39 additions & 0 deletions

File tree

src/a2a/server/events/event_queue.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
import logging
33
import sys
44

5+
from types import TracebackType
6+
7+
from typing_extensions import Self
8+
59
from a2a.types.a2a_pb2 import (
610
Message,
711
Task,
@@ -43,6 +47,19 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
4347
self._lock = asyncio.Lock()
4448
logger.debug('EventQueue initialized.')
4549

50+
async def __aenter__(self) -> Self:
51+
"""Enters the async context manager, returning the queue itself."""
52+
return self
53+
54+
async def __aexit__(
55+
self,
56+
exc_type: type[BaseException] | None,
57+
exc_val: BaseException | None,
58+
exc_tb: TracebackType | None,
59+
) -> None:
60+
"""Exits the async context manager, ensuring close() is called."""
61+
await self.close()
62+
4663
async def enqueue_event(self, event: Event) -> None:
4764
"""Enqueues an event to this queue and all its children.
4865

tests/server/events/test_event_queue.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,28 @@ def test_constructor_invalid_max_queue_size() -> None:
7878
EventQueue(max_queue_size=-10)
7979

8080

81+
@pytest.mark.asyncio
82+
async def test_event_queue_async_context_manager(
83+
event_queue: EventQueue,
84+
) -> None:
85+
"""Test that EventQueue can be used as an async context manager."""
86+
async with event_queue as q:
87+
assert q is event_queue
88+
assert event_queue.is_closed() is False
89+
assert event_queue.is_closed() is True
90+
91+
92+
@pytest.mark.asyncio
93+
async def test_event_queue_async_context_manager_on_exception(
94+
event_queue: EventQueue,
95+
) -> None:
96+
"""Test that close() is called even when an exception occurs inside the context."""
97+
with pytest.raises(RuntimeError, match='boom'):
98+
async with event_queue:
99+
raise RuntimeError('boom')
100+
assert event_queue.is_closed() is True
101+
102+
81103
@pytest.mark.asyncio
82104
async def test_enqueue_and_dequeue_event(event_queue: EventQueue) -> None:
83105
"""Test that an event can be enqueued and dequeued."""

0 commit comments

Comments
 (0)