Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,8 @@ public StdioMcpSessionTransport() {
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {

return Mono.zip(inboundReady.asMono(), outboundReady.asMono()).then(Mono.defer(() -> {
if (outboundSink.tryEmitNext(message).isSuccess()) {
return Mono.empty();
}
else {
return Mono.error(new RuntimeException("Failed to enqueue message"));
}
outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(java.time.Duration.ofSeconds(1)));
return Mono.<Void>empty();
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,47 @@ void shouldHandleSessionClose() throws Exception {
verify(mockSession).closeGracefully();
}

@Test
void shouldHandleConcurrentMessages() throws Exception {
java.io.PipedOutputStream pipedOut = new java.io.PipedOutputStream();
java.io.PipedInputStream pipedIn = new java.io.PipedInputStream(pipedOut);

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
transportProvider = new StdioServerTransportProvider(McpJsonDefaults.getMapper(), pipedIn, outputStream);

McpServerSession.Factory realSessionFactory = transport -> {
McpServerSession session = mock(McpServerSession.class);
when(session.handle(any())).thenAnswer(invocation -> {
McpSchema.JSONRPCMessage incomingMessage = invocation.getArgument(0);
// Simulate async tool call processing with a delay
return Mono.delay(java.time.Duration.ofMillis(50))
.then(transport.sendMessage(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION,
((McpSchema.JSONRPCRequest) incomingMessage).id(), Map.of("result", "ok"), null)));
});
when(session.closeGracefully()).thenReturn(Mono.empty());
return session;
};

// Set session factory
transportProvider.setSessionFactory(realSessionFactory);

String jsonMessage1 = "{\"jsonrpc\":\"2.0\",\"method\":\"test1\",\"params\":{},\"id\":1}\n";
String jsonMessage2 = "{\"jsonrpc\":\"2.0\",\"method\":\"test2\",\"params\":{},\"id\":2}\n";
pipedOut.write(jsonMessage1.getBytes(StandardCharsets.UTF_8));
pipedOut.write(jsonMessage2.getBytes(StandardCharsets.UTF_8));
pipedOut.flush();

// Verify both concurrent responses complete without error
StepVerifier
.create(Mono.delay(java.time.Duration.ofSeconds(2))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test passes if Duration.ofSeconds(3) is provided. Preferably, instead of using delays, you'd launch a swarm of threads trying to process the messages in parallel and ensure none of them fail and all the messages are processed.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test according to your suggestion. The delay-based test has been replaced with a swarm test, which sends 100 requests. The test verifies all response IDs are received, no failures occur, and that concurrency actually happened. Added a 15s Timeout, consistent with existing tests in the repository.

.then(Mono.fromCallable(() -> outputStream.toString(StandardCharsets.UTF_8))))
.assertNext(output -> {
assertThat(output).contains("\"id\":1");
assertThat(output).contains("\"id\":2");
})
.verifyComplete();

pipedOut.close();
}

}