-
Notifications
You must be signed in to change notification settings - Fork 422
Expand file tree
/
Copy pathrequest_handler.py
More file actions
270 lines (216 loc) · 8.24 KB
/
request_handler.py
File metadata and controls
270 lines (216 loc) · 8.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import functools
import inspect
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, Callable
from typing import Any
from google.protobuf.message import Message as ProtoMessage
from a2a.server.context import ServerCallContext
from a2a.server.events.event_queue import Event
from a2a.types.a2a_pb2 import (
CancelTaskRequest,
DeleteTaskPushNotificationConfigRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigsRequest,
ListTaskPushNotificationConfigsResponse,
ListTasksRequest,
ListTasksResponse,
Message,
SendMessageRequest,
SubscribeToTaskRequest,
Task,
TaskPushNotificationConfig,
)
from a2a.utils.errors import UnsupportedOperationError
from a2a.utils.proto_utils import validate_proto_required_fields
class RequestHandler(ABC):
"""A2A request handler interface.
This interface defines the methods that an A2A server implementation must
provide to handle incoming JSON-RPC requests.
"""
@abstractmethod
async def on_get_task(
self,
params: GetTaskRequest,
context: ServerCallContext,
) -> Task | None:
"""Handles the 'tasks/get' method.
Retrieves the state and history of a specific task.
Args:
params: Parameters specifying the task ID and optionally history length.
context: Context provided by the server.
Returns:
The `Task` object if found, otherwise `None`.
"""
@abstractmethod
async def on_list_tasks(
self, params: ListTasksRequest, context: ServerCallContext
) -> ListTasksResponse:
"""Handles the tasks/list method.
Retrieves all task for an agent. Supports filtering, pagination,
ordering, limiting the history length, excluding artifacts, etc.
Args:
params: Parameters with filtering criteria.
context: Context provided by the server.
Returns:
The `ListTasksResponse` containing the tasks.
"""
@abstractmethod
async def on_cancel_task(
self,
params: CancelTaskRequest,
context: ServerCallContext,
) -> Task | None:
"""Handles the 'tasks/cancel' method.
Requests the agent to cancel an ongoing task.
Args:
params: Parameters specifying the task ID.
context: Context provided by the server.
Returns:
The `Task` object with its status updated to canceled, or `None` if the task was not found.
"""
@abstractmethod
async def on_message_send(
self,
params: SendMessageRequest,
context: ServerCallContext,
) -> Task | Message:
"""Handles the 'message/send' method (non-streaming).
Sends a message to the agent to create, continue, or restart a task,
and waits for the final result (Task or Message).
Args:
params: Parameters including the message and configuration.
context: Context provided by the server.
Returns:
The final `Task` object or a final `Message` object.
"""
@abstractmethod
async def on_message_send_stream(
self,
params: SendMessageRequest,
context: ServerCallContext,
) -> AsyncGenerator[Event]:
"""Handles the 'message/stream' method (streaming).
Sends a message to the agent and yields stream events as they are
produced (Task updates, Message chunks, Artifact updates).
Args:
params: Parameters including the message and configuration.
context: Context provided by the server.
Yields:
`Event` objects from the agent's execution.
Raises:
UnsupportedOperationError: By default, if not implemented.
"""
raise UnsupportedOperationError
yield
@abstractmethod
async def on_create_task_push_notification_config(
self,
params: TaskPushNotificationConfig,
context: ServerCallContext,
) -> TaskPushNotificationConfig:
"""Handles the 'tasks/pushNotificationConfig/create' method.
Sets or updates the push notification configuration for a task.
Args:
params: Parameters including the task ID and push notification configuration.
context: Context provided by the server.
Returns:
The provided `TaskPushNotificationConfig` upon success.
"""
@abstractmethod
async def on_get_task_push_notification_config(
self,
params: GetTaskPushNotificationConfigRequest,
context: ServerCallContext,
) -> TaskPushNotificationConfig:
"""Handles the 'tasks/pushNotificationConfig/get' method.
Retrieves the current push notification configuration for a task.
Args:
params: Parameters including the task ID.
context: Context provided by the server.
Returns:
The `TaskPushNotificationConfig` for the task.
"""
@abstractmethod
async def on_subscribe_to_task(
self,
params: SubscribeToTaskRequest,
context: ServerCallContext,
) -> AsyncGenerator[Event]:
"""Handles the 'SubscribeToTask' method.
Allows a client to subscribe to a running streaming task's event stream.
Args:
params: Parameters including the task ID.
context: Context provided by the server.
Yields:
`Event` objects from the agent's ongoing execution for the specified task.
Raises:
UnsupportedOperationError: By default, if not implemented.
"""
raise UnsupportedOperationError
yield
@abstractmethod
async def on_list_task_push_notification_configs(
self,
params: ListTaskPushNotificationConfigsRequest,
context: ServerCallContext,
) -> ListTaskPushNotificationConfigsResponse:
"""Handles the 'ListTaskPushNotificationConfigs' method.
Retrieves the current push notification configurations for a task.
Args:
params: Parameters including the task ID.
context: Context provided by the server.
Returns:
The `list[TaskPushNotificationConfig]` for the task.
"""
@abstractmethod
async def on_delete_task_push_notification_config(
self,
params: DeleteTaskPushNotificationConfigRequest,
context: ServerCallContext,
) -> None:
"""Handles the 'tasks/pushNotificationConfig/delete' method.
Deletes a push notification configuration associated with a task.
Args:
params: Parameters including the task ID.
context: Context provided by the server.
Returns:
None
"""
def validate_request_params(method: Callable) -> Callable:
"""Decorator for RequestHandler methods to validate required fields on incoming requests."""
if inspect.isasyncgenfunction(method):
@functools.wraps(method)
async def async_gen_wrapper(
self: RequestHandler,
params: ProtoMessage,
context: ServerCallContext,
*args: Any,
**kwargs: Any,
) -> Any:
if params is not None:
validate_proto_required_fields(params)
# Ensure the inner async generator is closed explicitly;
# bare async-for does not call aclose() on GeneratorExit,
# which on Python 3.12+ prevents the except/finally blocks
# in on_message_send_stream from running on client disconnect
# (background_consume and cleanup_producer tasks are never created).
inner = method(self, params, context, *args, **kwargs)
try:
async for item in inner:
yield item
finally:
await inner.aclose()
return async_gen_wrapper
@functools.wraps(method)
async def async_wrapper(
self: RequestHandler,
params: ProtoMessage,
context: ServerCallContext,
*args: Any,
**kwargs: Any,
) -> Any:
if params is not None:
validate_proto_required_fields(params)
return await method(self, params, context, *args, **kwargs)
return async_wrapper