Skip to content

Commit 849a908

Browse files
authored
refactor: cleanup @Validate decorators. (#853)
Use validate_async_generator and add validators to compat grpc handler.
1 parent fce163c commit 849a908

6 files changed

Lines changed: 37 additions & 10 deletions

File tree

src/a2a/compat/v0_3/grpc_handler.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from a2a.server.request_handlers.request_handler import RequestHandler
3030
from a2a.types.a2a_pb2 import AgentCard
3131
from a2a.utils.errors import A2AError, InvalidParamsError
32-
from a2a.utils.helpers import maybe_await
32+
from a2a.utils.helpers import maybe_await, validate, validate_async_generator
3333

3434

3535
logger = logging.getLogger(__name__)
@@ -170,6 +170,10 @@ async def _handler(
170170
context, _handler, a2a_v0_3_pb2.SendMessageResponse()
171171
)
172172

173+
@validate_async_generator(
174+
lambda self: self.agent_card.capabilities.streaming,
175+
'Streaming is not supported by the agent',
176+
)
173177
async def SendStreamingMessage(
174178
self,
175179
request: a2a_v0_3_pb2.SendMessageRequest,
@@ -229,6 +233,10 @@ async def _handler(
229233

230234
return await self._handle_unary(context, _handler, a2a_v0_3_pb2.Task())
231235

236+
@validate_async_generator(
237+
lambda self: self.agent_card.capabilities.streaming,
238+
'Streaming is not supported by the agent',
239+
)
232240
async def TaskSubscription(
233241
self,
234242
request: a2a_v0_3_pb2.TaskSubscriptionRequest,
@@ -252,6 +260,10 @@ async def _handler(
252260
async for item in self._handle_stream(context, _handler):
253261
yield item
254262

263+
@validate(
264+
lambda self: self.agent_card.capabilities.push_notifications,
265+
'Push notifications are not supported by the agent',
266+
)
255267
async def CreateTaskPushNotificationConfig(
256268
self,
257269
request: a2a_v0_3_pb2.CreateTaskPushNotificationConfigRequest,

src/a2a/compat/v0_3/rest_handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from a2a.compat.v0_3 import types as types_v03
2929
from a2a.compat.v0_3.request_handler import RequestHandler03
3030
from a2a.server.context import ServerCallContext
31-
from a2a.utils.helpers import validate
31+
from a2a.utils.helpers import validate, validate_async_generator
3232
from a2a.utils.telemetry import SpanKind, trace_class
3333

3434

@@ -78,7 +78,7 @@ async def on_message_send(
7878
pb2_v03_resp = proto_utils.ToProto.task_or_message(v03_resp)
7979
return MessageToDict(pb2_v03_resp)
8080

81-
@validate(
81+
@validate_async_generator(
8282
lambda self: self.agent_card.capabilities.streaming,
8383
'Streaming is not supported by the agent',
8484
)
@@ -134,7 +134,7 @@ async def on_cancel_task(
134134
pb2_v03_task = proto_utils.ToProto.task(v03_resp)
135135
return MessageToDict(pb2_v03_task)
136136

137-
@validate(
137+
@validate_async_generator(
138138
lambda self: self.agent_card.capabilities.streaming,
139139
'Streaming is not supported by the agent',
140140
)

src/a2a/server/request_handlers/jsonrpc_handler.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
UnsupportedOperationError,
5050
VersionNotSupportedError,
5151
)
52-
from a2a.utils.helpers import maybe_await, validate
52+
from a2a.utils.helpers import maybe_await, validate, validate_async_generator
5353
from a2a.utils.telemetry import SpanKind, trace_class
5454

5555

@@ -171,7 +171,7 @@ async def on_message_send(
171171
except A2AError as e:
172172
return _build_error_response(request_id, e)
173173

174-
@validate(
174+
@validate_async_generator(
175175
lambda self: self.agent_card.capabilities.streaming,
176176
'Streaming is not supported by the agent',
177177
)
@@ -235,6 +235,10 @@ async def on_cancel_task(
235235

236236
return _build_error_response(request_id, TaskNotFoundError())
237237

238+
@validate_async_generator(
239+
lambda self: self.agent_card.capabilities.streaming,
240+
'Streaming is not supported by the agent',
241+
)
238242
async def on_subscribe_to_task(
239243
self,
240244
request: SubscribeToTaskRequest,

src/a2a/server/request_handlers/rest_handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
)
3030
from a2a.utils import proto_utils
3131
from a2a.utils.errors import TaskNotFoundError
32-
from a2a.utils.helpers import validate
32+
from a2a.utils.helpers import validate, validate_async_generator
3333
from a2a.utils.telemetry import SpanKind, trace_class
3434

3535

@@ -87,7 +87,7 @@ async def on_message_send(
8787
response = a2a_pb2.SendMessageResponse(message=task_or_message)
8888
return MessageToDict(response)
8989

90-
@validate(
90+
@validate_async_generator(
9191
lambda self: self.agent_card.capabilities.streaming,
9292
'Streaming is not supported by the agent',
9393
)
@@ -139,7 +139,7 @@ async def on_cancel_task(
139139
return MessageToDict(task)
140140
raise TaskNotFoundError
141141

142-
@validate(
142+
@validate_async_generator(
143143
lambda self: self.agent_card.capabilities.streaming,
144144
'Streaming is not supported by the agent',
145145
)

tests/compat/v0_3/test_grpc_handler.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ def sample_agent_card() -> a2a_pb2.AgentCard:
3434
name='Test Agent',
3535
description='A test agent',
3636
version='1.0.0',
37+
capabilities=a2a_pb2.AgentCapabilities(
38+
streaming=True,
39+
push_notifications=True,
40+
),
3741
supported_interfaces=[
3842
a2a_pb2.AgentInterface(
3943
url='http://jsonrpc.v03.com',
@@ -445,7 +449,10 @@ async def test_get_agent_card_success(
445449
version='1.0.0',
446450
protocol_version='0.3',
447451
preferred_transport='JSONRPC',
448-
capabilities=a2a_v0_3_pb2.AgentCapabilities(),
452+
capabilities=a2a_v0_3_pb2.AgentCapabilities(
453+
streaming=True,
454+
push_notifications=True,
455+
),
449456
)
450457
assert response == expected_res
451458

tests/server/request_handlers/test_jsonrpc_handler.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ def init_fixtures(self) -> None:
123123
)
124124
self.mock_agent_card.capabilities = MagicMock(spec=AgentCapabilities)
125125
self.mock_agent_card.capabilities.extended_agent_card = True
126+
self.mock_agent_card.capabilities.streaming = True
127+
self.mock_agent_card.capabilities.push_notifications = True
126128

127129
# Mock supported_interfaces list
128130
interface = MagicMock(spec=AgentInterface)
@@ -710,6 +712,8 @@ async def test_on_resubscribe_existing_task_success(
710712
mock_agent_executor, mock_task_store, mock_queue_manager
711713
)
712714
self.mock_agent_card = MagicMock(spec=AgentCard)
715+
self.mock_agent_card.capabilities = MagicMock(spec=AgentCapabilities)
716+
self.mock_agent_card.capabilities.streaming = True
713717
handler = JSONRPCHandler(self.mock_agent_card, request_handler)
714718
mock_task = create_task()
715719
events: list[Any] = [

0 commit comments

Comments
 (0)