-
Notifications
You must be signed in to change notification settings - Fork 421
Expand file tree
/
Copy patherrors.py
More file actions
198 lines (140 loc) · 5.28 KB
/
errors.py
File metadata and controls
198 lines (140 loc) · 5.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
"""Custom exceptions and error types for A2A server-side errors.
This module contains A2A-specific error codes,
as well as server exception classes.
"""
from typing import NamedTuple
class RestErrorMap(NamedTuple):
"""Named tuple mapping HTTP status, gRPC status, and reason strings."""
http_code: int
grpc_status: str
reason: str
class A2AError(Exception):
"""Base exception for A2A errors."""
message: str = 'A2A Error'
data: dict | None = None
def __init__(self, message: str | None = None, data: dict | None = None):
if message:
self.message = message
self.data = data
super().__init__(self.message)
class TaskNotFoundError(A2AError):
"""Exception raised when a task is not found."""
message = 'Task not found'
class TaskNotCancelableError(A2AError):
"""Exception raised when a task cannot be canceled."""
message = 'Task cannot be canceled'
class PushNotificationNotSupportedError(A2AError):
"""Exception raised when push notifications are not supported."""
message = 'Push Notification is not supported'
class UnsupportedOperationError(A2AError):
"""Exception raised when an operation is not supported."""
message = 'This operation is not supported'
class ContentTypeNotSupportedError(A2AError):
"""Exception raised when the content type is incompatible."""
message = 'Incompatible content types'
class InternalError(A2AError):
"""Exception raised for internal server errors."""
message = 'Internal error'
class InvalidAgentResponseError(A2AError):
"""Exception raised when the agent response is invalid."""
message = 'Invalid agent response'
class ExtendedAgentCardNotConfiguredError(A2AError):
"""Exception raised when the authenticated extended card is not configured."""
message = 'Authenticated Extended Card is not configured'
class InvalidParamsError(A2AError):
"""Exception raised when parameters are invalid."""
message = 'Invalid params'
class InvalidRequestError(A2AError):
"""Exception raised when the request is invalid."""
message = 'Invalid Request'
class MethodNotFoundError(A2AError):
"""Exception raised when a method is not found."""
message = 'Method not found'
class ExtensionSupportRequiredError(A2AError):
"""Exception raised when extension support is required but not present."""
message = 'Extension support required'
class VersionNotSupportedError(A2AError):
"""Exception raised when the requested version is not supported."""
message = 'Version not supported'
# For backward compatibility if needed, or just aliases for clean refactor
# We remove the Pydantic models here.
__all__ = [
'A2A_ERROR_REASONS',
'A2A_REASON_TO_ERROR',
'A2A_REST_ERROR_MAPPING',
'JSON_RPC_ERROR_CODE_MAP',
'ExtensionSupportRequiredError',
'InternalError',
'InvalidAgentResponseError',
'InvalidParamsError',
'InvalidRequestError',
'MethodNotFoundError',
'PushNotificationNotSupportedError',
'RestErrorMap',
'TaskNotCancelableError',
'TaskNotFoundError',
'UnsupportedOperationError',
'VersionNotSupportedError',
]
JSON_RPC_ERROR_CODE_MAP: dict[type[A2AError], int] = {
TaskNotFoundError: -32001,
TaskNotCancelableError: -32002,
PushNotificationNotSupportedError: -32003,
UnsupportedOperationError: -32004,
ContentTypeNotSupportedError: -32005,
InvalidAgentResponseError: -32006,
ExtendedAgentCardNotConfiguredError: -32007,
ExtensionSupportRequiredError: -32008,
VersionNotSupportedError: -32009,
InvalidParamsError: -32602,
InvalidRequestError: -32600,
MethodNotFoundError: -32601,
InternalError: -32603,
}
A2A_REST_ERROR_MAPPING: dict[type[A2AError], RestErrorMap] = {
TaskNotFoundError: RestErrorMap(404, 'NOT_FOUND', 'TASK_NOT_FOUND'),
TaskNotCancelableError: RestErrorMap(
409, 'FAILED_PRECONDITION', 'TASK_NOT_CANCELABLE'
),
PushNotificationNotSupportedError: RestErrorMap(
400,
'UNIMPLEMENTED',
'PUSH_NOTIFICATION_NOT_SUPPORTED',
),
UnsupportedOperationError: RestErrorMap(
400, 'UNIMPLEMENTED', 'UNSUPPORTED_OPERATION'
),
ContentTypeNotSupportedError: RestErrorMap(
415,
'INVALID_ARGUMENT',
'CONTENT_TYPE_NOT_SUPPORTED',
),
InvalidAgentResponseError: RestErrorMap(
502, 'INTERNAL', 'INVALID_AGENT_RESPONSE'
),
ExtendedAgentCardNotConfiguredError: RestErrorMap(
400,
'FAILED_PRECONDITION',
'EXTENDED_AGENT_CARD_NOT_CONFIGURED',
),
ExtensionSupportRequiredError: RestErrorMap(
400,
'FAILED_PRECONDITION',
'EXTENSION_SUPPORT_REQUIRED',
),
VersionNotSupportedError: RestErrorMap(
400, 'UNIMPLEMENTED', 'VERSION_NOT_SUPPORTED'
),
InvalidParamsError: RestErrorMap(400, 'INVALID_ARGUMENT', 'INVALID_PARAMS'),
InvalidRequestError: RestErrorMap(
400, 'INVALID_ARGUMENT', 'INVALID_REQUEST'
),
MethodNotFoundError: RestErrorMap(404, 'NOT_FOUND', 'METHOD_NOT_FOUND'),
InternalError: RestErrorMap(500, 'INTERNAL', 'INTERNAL_ERROR'),
}
A2A_ERROR_REASONS = {
cls: mapping.reason for cls, mapping in A2A_REST_ERROR_MAPPING.items()
}
A2A_REASON_TO_ERROR = {
mapping.reason: cls for cls, mapping in A2A_REST_ERROR_MAPPING.items()
}