-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Expand file tree
/
Copy pathtest_88_random_error.py
More file actions
116 lines (96 loc) · 4.8 KB
/
test_88_random_error.py
File metadata and controls
116 lines (96 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Test to reproduce issue #88: Random error thrown on response."""
from pathlib import Path
import anyio
import pytest
from anyio.abc import TaskStatus
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp import types
from mcp.client.session import ClientSession
from mcp.server import Server, ServerRequestContext
from mcp.shared.exceptions import MCPError
from mcp.shared.message import SessionMessage
from mcp.types import CallToolRequestParams, CallToolResult, ListToolsResult, PaginatedRequestParams, TextContent
@pytest.mark.anyio
async def test_notification_validation_error(tmp_path: Path):
"""Test that timeouts are handled gracefully and don't break the server.
This test verifies that when a client request times out:
1. The server task stays alive
2. The server can still handle new requests
3. The client can make new requests
4. No resources are leaked
Uses per-request timeouts to avoid race conditions:
- Fast operations use no timeout (reliable in any environment)
- Slow operations use minimal timeout (10ms) for quick test execution
"""
request_count = 0
slow_request_lock = anyio.Event()
async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListToolsResult:
return ListToolsResult(
tools=[
types.Tool(
name="slow",
description="A slow tool",
input_schema={"type": "object"},
),
types.Tool(
name="fast",
description="A fast tool",
input_schema={"type": "object"},
),
]
)
async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
nonlocal request_count
request_count += 1
assert params.name in ("slow", "fast"), f"Unknown tool: {params.name}"
if params.name == "slow":
await slow_request_lock.wait() # it should timeout here
text = f"slow {request_count}"
else:
text = f"fast {request_count}"
return CallToolResult(content=[TextContent(type="text", text=text)])
server = Server(name="test", on_list_tools=handle_list_tools, on_call_tool=handle_call_tool)
async def server_handler(
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception],
write_stream: MemoryObjectSendStream[SessionMessage],
task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED,
):
with anyio.CancelScope() as scope:
task_status.started(scope) # type: ignore
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
raise_exceptions=True,
)
async def client(
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception],
write_stream: MemoryObjectSendStream[SessionMessage],
scope: anyio.CancelScope,
):
# No session-level timeout to avoid race conditions with fast operations
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# First call should work (fast operation, no timeout)
result = await session.call_tool("fast", read_timeout_seconds=None)
assert result.content == [TextContent(type="text", text="fast 1")]
assert not slow_request_lock.is_set()
# Second call should timeout (slow operation with minimal timeout)
# Use very small timeout to trigger quickly without waiting
with pytest.raises(MCPError) as exc_info:
await session.call_tool("slow", read_timeout_seconds=0.000001) # artificial timeout that always fails
assert "Timed out while waiting" in str(exc_info.value)
# release the slow request not to have hanging process
slow_request_lock.set()
# Third call should work (fast operation, no timeout),
# proving server is still responsive
result = await session.call_tool("fast", read_timeout_seconds=None)
assert result.content == [TextContent(type="text", text="fast 3")]
scope.cancel() # pragma: lax no cover
# Run server and client in separate task groups to avoid cancellation
server_writer, server_reader = anyio.create_memory_object_stream[SessionMessage](1)
client_writer, client_reader = anyio.create_memory_object_stream[SessionMessage](1)
async with anyio.create_task_group() as tg:
scope = await tg.start(server_handler, server_reader, client_writer)
# Run client in a separate task to avoid cancellation
tg.start_soon(client, client_reader, server_writer, scope)