Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/a2a/client/transports/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
TaskPushNotificationConfig,
)
from a2a.utils.constants import PROTOCOL_VERSION_CURRENT, VERSION_HEADER
from a2a.utils.errors import A2A_REASON_TO_ERROR
from a2a.utils.errors import A2A_REASON_TO_ERROR, A2AError
from a2a.utils.proto_utils import bad_request_to_validation_errors
from a2a.utils.telemetry import SpanKind, trace_class


Expand All @@ -61,17 +62,23 @@ def _map_grpc_error(e: grpc.aio.AioRpcError) -> NoReturn:

# Use grpc_status to cleanly extract the rich Status from the call
status = rpc_status.from_call(cast('grpc.Call', e))
data = None

if status is not None:
exception_cls: type[A2AError] | None = None
for detail in status.details:
if detail.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
error_info = error_details_pb2.ErrorInfo()
detail.Unpack(error_info)

if error_info.domain == 'a2a-protocol.org':
exception_cls = A2A_REASON_TO_ERROR.get(error_info.reason)
if exception_cls:
raise exception_cls(status.message) from e
elif detail.Is(error_details_pb2.BadRequest.DESCRIPTOR):
bad_request = error_details_pb2.BadRequest()
detail.Unpack(bad_request)
data = {'errors': bad_request_to_validation_errors(bad_request)}

if exception_cls:
raise exception_cls(status.message, data=data) from e

raise A2AClientError(f'gRPC Error {e.code().name}: {e.details()}') from e

Expand Down
3 changes: 2 additions & 1 deletion src/a2a/client/transports/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,10 @@ def _create_jsonrpc_error(self, error_dict: dict[str, Any]) -> Exception:
"""Creates the appropriate A2AError from a JSON-RPC error dictionary."""
code = error_dict.get('code')
message = error_dict.get('message', str(error_dict))
data = error_dict.get('data')

if isinstance(code, int) and code in _JSON_RPC_ERROR_CODE_TO_A2A_ERROR:
return _JSON_RPC_ERROR_CODE_TO_A2A_ERROR[code](message)
return _JSON_RPC_ERROR_CODE_TO_A2A_ERROR[code](message, data=data)

# Fallback to general A2AClientError
return A2AClientError(f'JSON-RPC Error {code}: {message}')
Expand Down
6 changes: 5 additions & 1 deletion src/a2a/server/request_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
DefaultRequestHandler,
)
from a2a.server.request_handlers.jsonrpc_handler import JSONRPCHandler
from a2a.server.request_handlers.request_handler import RequestHandler
from a2a.server.request_handlers.request_handler import (
RequestHandler,
validate_request_params,
)
from a2a.server.request_handlers.response_helpers import (
build_error_response,
prepare_response_object,
Expand Down Expand Up @@ -45,4 +48,5 @@ def __init__(self, *args, **kwargs):
'RequestHandler',
'build_error_response',
'prepare_response_object',
'validate_request_params',
]
15 changes: 14 additions & 1 deletion src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
InMemoryQueueManager,
QueueManager,
)
from a2a.server.request_handlers.request_handler import RequestHandler
from a2a.server.request_handlers.request_handler import (
RequestHandler,
validate_request_params,
)
from a2a.server.tasks import (
PushNotificationConfigStore,
PushNotificationEvent,
Expand Down Expand Up @@ -118,6 +121,7 @@ def __init__( # noqa: PLR0913
# asyncio tasks and to surface unexpected exceptions.
self._background_tasks = set()

@validate_request_params
async def on_get_task(
self,
params: GetTaskRequest,
Expand All @@ -133,6 +137,7 @@ async def on_get_task(

return apply_history_length(task, params)

@validate_request_params
async def on_list_tasks(
self,
params: ListTasksRequest,
Expand All @@ -154,6 +159,7 @@ async def on_list_tasks(

return page

@validate_request_params
async def on_cancel_task(
self,
params: CancelTaskRequest,
Expand Down Expand Up @@ -317,6 +323,7 @@ async def _send_push_notification_if_needed(
):
await self._push_sender.send_notification(task_id, event)

@validate_request_params
async def on_message_send(
self,
params: SendMessageRequest,
Expand Down Expand Up @@ -386,6 +393,7 @@ async def push_notification_callback(event: Event) -> None:

return result

@validate_request_params
async def on_message_send_stream(
self,
params: SendMessageRequest,
Expand Down Expand Up @@ -474,6 +482,7 @@ async def _cleanup_producer(
async with self._running_agents_lock:
self._running_agents.pop(task_id, None)

@validate_request_params
async def on_create_task_push_notification_config(
self,
params: TaskPushNotificationConfig,
Expand All @@ -499,6 +508,7 @@ async def on_create_task_push_notification_config(

return params

@validate_request_params
async def on_get_task_push_notification_config(
self,
params: GetTaskPushNotificationConfigRequest,
Expand Down Expand Up @@ -530,6 +540,7 @@ async def on_get_task_push_notification_config(

raise InternalError(message='Push notification config not found')

@validate_request_params
async def on_subscribe_to_task(
self,
params: SubscribeToTaskRequest,
Expand Down Expand Up @@ -572,6 +583,7 @@ async def on_subscribe_to_task(
async for event in result_aggregator.consume_and_emit(consumer):
yield event

@validate_request_params
async def on_list_task_push_notification_configs(
self,
params: ListTaskPushNotificationConfigsRequest,
Expand All @@ -597,6 +609,7 @@ async def on_list_task_push_notification_configs(
configs=push_notification_config_list
)

@validate_request_params
async def on_delete_task_push_notification_config(
self,
params: DeleteTaskPushNotificationConfigRequest,
Expand Down
27 changes: 18 additions & 9 deletions src/a2a/server/request_handlers/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@
from a2a.types import a2a_pb2
from a2a.types.a2a_pb2 import AgentCard
from a2a.utils import proto_utils
from a2a.utils.errors import (
A2A_ERROR_REASONS,
A2AError,
TaskNotFoundError,
)
from a2a.utils.errors import A2A_ERROR_REASONS, A2AError, TaskNotFoundError
from a2a.utils.helpers import maybe_await, validate
from a2a.utils.proto_utils import validation_errors_to_bad_request


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -403,11 +400,23 @@ async def abort_context(
error.message if hasattr(error, 'message') else str(error)
)

# Create standard Status and pack the ErrorInfo
# Create standard Status with ErrorInfo for all A2A errors
status = status_pb2.Status(code=status_code, message=error_msg)
detail = any_pb2.Any()
detail.Pack(error_info)
status.details.append(detail)
error_info_detail = any_pb2.Any()
error_info_detail.Pack(error_info)
status.details.append(error_info_detail)

# Append structured field violations for validation errors
if (
isinstance(error, types.InvalidParamsError)
and error.data
and error.data.get('errors')
):
bad_request_detail = any_pb2.Any()
bad_request_detail.Pack(
validation_errors_to_bad_request(error.data['errors'])
)
status.details.append(bad_request_detail)

# Use grpc_status to safely generate standard trailing metadata
rich_status = rpc_status.to_status(status)
Expand Down
1 change: 1 addition & 0 deletions src/a2a/server/request_handlers/jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def _build_error_response(
jsonrpc_error = model_class(
code=code,
message=str(error),
data=error.data,
)
else:
jsonrpc_error = JSONRPCInternalError(message=str(error))
Expand Down
52 changes: 51 additions & 1 deletion src/a2a/server/request_handlers/request_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import functools
import inspect

from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Callable
from typing import Any

from google.protobuf.message import Message as ProtoMessage

from a2a.server.context import ServerCallContext
from a2a.server.events.event_queue import Event
Expand All @@ -19,6 +25,7 @@
TaskPushNotificationConfig,
)
from a2a.utils.errors import UnsupportedOperationError
from a2a.utils.proto_utils import validate_proto_required_fields


class RequestHandler(ABC):
Expand Down Expand Up @@ -218,3 +225,46 @@ async def on_delete_task_push_notification_config(
Returns:
None
"""


def validate_request_params(method: Callable) -> Callable:
"""Decorator for RequestHandler methods to validate required fields on incoming requests."""
if inspect.isasyncgenfunction(method):

@functools.wraps(method)
async def async_gen_wrapper(
self: RequestHandler,
params: ProtoMessage,
context: ServerCallContext,
*args: Any,
**kwargs: Any,
) -> Any:
if params is not None:
validate_proto_required_fields(params)
# Ensure the inner async generator is closed explicitly;
# bare async-for does not call aclose() on GeneratorExit,
# which on Python 3.12+ prevents the except/finally blocks
# in on_message_send_stream from running on client disconnect
# (background_consume and cleanup_producer tasks are never created).
inner = method(self, params, context, *args, **kwargs)
try:
async for item in inner:
yield item
finally:
await inner.aclose()

return async_gen_wrapper

@functools.wraps(method)
async def async_wrapper(
self: RequestHandler,
params: ProtoMessage,
context: ServerCallContext,
*args: Any,
**kwargs: Any,
) -> Any:
if params is not None:
validate_proto_required_fields(params)
return await method(self, params, context, *args, **kwargs)

return async_wrapper
3 changes: 2 additions & 1 deletion src/a2a/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ class A2AError(Exception):
message: str = 'A2A Error'
data: dict | None = None

def __init__(self, message: str | None = None):
def __init__(self, message: str | None = None, data: dict | None = None):
if message:
self.message = message
self.data = data
super().__init__(self.message)


Expand Down
Loading
Loading