Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 200 additions & 38 deletions inprocess/src/test/java/io/grpc/inprocess/MirroringInterceptorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package io.grpc.inprocess;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import io.grpc.CallOptions;
import io.grpc.Channel;
Expand All @@ -32,8 +34,10 @@
import io.grpc.util.MirroringInterceptor;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -61,80 +65,238 @@ public String parse(java.io.InputStream stream) {
.setResponseMarshaller(MARSHALLER)
.build();

// ─── Helper to build a simple auto-closing server ───────────────────────────

private String buildAutoCloseServer(CountDownLatch latch, AtomicBoolean headerVerified,
Metadata.Key<String> key, String expectedValue) throws Exception {
String name = InProcessServerBuilder.generateName();
grpcCleanup.register(
InProcessServerBuilder.forName(name)
.directExecutor()
.addService(
ServerServiceDefinition.builder("test")
.addMethod(method, (call, headers) -> {
if (key != null && expectedValue.equals(headers.get(key))) {
headerVerified.set(true);
}
if (latch != null) latch.countDown();
call.sendHeaders(new Metadata());
call.close(Status.OK, new Metadata());
return new ServerCall.Listener<String>() {};
})
.build())
.build()
.start());
return name;
}

// ─── Test 1: Unary call is mirrored with headers ────────────────────────────

@Test
public void unaryCallIsMirroredWithHeaders() throws Exception {
CountDownLatch mirrorLatch = new CountDownLatch(1);
Metadata.Key<String> testKey =
Metadata.Key.of("test-header", Metadata.ASCII_STRING_MARSHALLER);
AtomicBoolean mirrorHeaderVerified = new AtomicBoolean(false);

// 1. Setup Mirror Server - IMPORTANT: It must CLOSE the call
String mirrorName = buildAutoCloseServer(mirrorLatch, mirrorHeaderVerified,
testKey, "shadow-value");
String primaryName = buildAutoCloseServer(null, null, null, "");

ManagedChannel mirrorChannel =
grpcCleanup.register(InProcessChannelBuilder.forName(mirrorName).build());
ManagedChannel primaryChannel =
grpcCleanup.register(InProcessChannelBuilder.forName(primaryName).build());

Channel interceptedChannel = ClientInterceptors.intercept(
primaryChannel, new MirroringInterceptor(mirrorChannel, Runnable::run));

Metadata headers = new Metadata();
headers.put(testKey, "shadow-value");

ClientCall<String, String> call = interceptedChannel.newCall(method, CallOptions.DEFAULT);
call.start(new ClientCall.Listener<String>() {}, headers);
call.sendMessage("hello");
call.halfClose();

assertTrue("Mirror server was not reached", mirrorLatch.await(1, TimeUnit.SECONDS));
assertTrue("Headers not mirrored", mirrorHeaderVerified.get());
}

// ─── Test 2: Cancel is propagated to mirror ──────────────────────────────────

@Test
public void cancelIsPropagatedToMirror() throws Exception {
CountDownLatch mirrorStartLatch = new CountDownLatch(1);
AtomicBoolean mirrorCancelSeen = new AtomicBoolean(false);

String mirrorName = InProcessServerBuilder.generateName();
grpcCleanup.register(
InProcessServerBuilder.forName(mirrorName)
.directExecutor()
.addService(
ServerServiceDefinition.builder("test")
.addMethod(
method,
(call, headers) -> {
if ("shadow-value".equals(headers.get(testKey))) {
mirrorHeaderVerified.set(true);
}
mirrorLatch.countDown();

// CRITICAL: Close the call so the channel can shut down
call.sendHeaders(new Metadata());
call.close(Status.OK, new Metadata());
return new ServerCall.Listener<String>() {};
})
.addMethod(method, (call, headers) -> {
mirrorStartLatch.countDown();
call.sendHeaders(new Metadata());
return new ServerCall.Listener<String>() {
@Override
public void onCancel() {
mirrorCancelSeen.set(true);
}
};
})
.build())
.build()
.start());

// 2. Setup Primary Server - Also must CLOSE the call
String primaryName = InProcessServerBuilder.generateName();
String primaryName = buildAutoCloseServer(null, null, null, "");

ManagedChannel mirrorChannel =
grpcCleanup.register(InProcessChannelBuilder.forName(mirrorName).build());
ManagedChannel primaryChannel =
grpcCleanup.register(InProcessChannelBuilder.forName(primaryName).build());

Channel interceptedChannel = ClientInterceptors.intercept(
primaryChannel, new MirroringInterceptor(mirrorChannel, Runnable::run));

ClientCall<String, String> call =
interceptedChannel.newCall(method, CallOptions.DEFAULT);
call.start(new ClientCall.Listener<String>() {}, new Metadata());

assertTrue("Mirror call never started", mirrorStartLatch.await(1, TimeUnit.SECONDS));

// Now cancel — should propagate to mirror
call.cancel("test cancel", null);

// Give mirror time to process cancel
Thread.sleep(200);
assertTrue("Cancel was not propagated to mirror", mirrorCancelSeen.get());
}

// ─── Test 3: Multiple messages are all mirrored ──────────────────────────────

@Test
public void multipleMessagesAreMirrored() throws Exception {
AtomicInteger mirrorMessageCount = new AtomicInteger(0);
CountDownLatch halfCloseLatch = new CountDownLatch(1);

String mirrorName = InProcessServerBuilder.generateName();
grpcCleanup.register(
InProcessServerBuilder.forName(primaryName)
InProcessServerBuilder.forName(mirrorName)
.directExecutor()
.addService(
ServerServiceDefinition.builder("test")
.addMethod(
method,
(call, headers) -> {
call.sendHeaders(new Metadata());
.addMethod(method, (call, headers) -> {
call.sendHeaders(new Metadata());
call.request(10);
return new ServerCall.Listener<String>() {
@Override
public void onMessage(String message) {
mirrorMessageCount.incrementAndGet();
}

@Override
public void onHalfClose() {
halfCloseLatch.countDown();
call.close(Status.OK, new Metadata());
return new ServerCall.Listener<String>() {};
})
}
};
})
.build())
.build()
.start());

String primaryName = buildAutoCloseServer(null, null, null, "");

ManagedChannel mirrorChannel =
grpcCleanup.register(InProcessChannelBuilder.forName(mirrorName).build());
ManagedChannel primaryChannel =
grpcCleanup.register(InProcessChannelBuilder.forName(primaryName).build());

// Use direct executor to keep the mirror call on the same thread
java.util.concurrent.Executor directExecutor = Runnable::run;
Channel interceptedChannel = ClientInterceptors.intercept(
primaryChannel, new MirroringInterceptor(mirrorChannel, Runnable::run));

ClientCall<String, String> call =
interceptedChannel.newCall(method, CallOptions.DEFAULT);
call.start(new ClientCall.Listener<String>() {}, new Metadata());
call.request(1);
call.sendMessage("msg1");
call.sendMessage("msg2");
call.sendMessage("msg3");
call.halfClose();

Channel interceptedChannel =
ClientInterceptors.intercept(
primaryChannel, new MirroringInterceptor(mirrorChannel, directExecutor));
assertTrue("Mirror halfClose never received", halfCloseLatch.await(1, TimeUnit.SECONDS));
assertTrue("Expected 3 mirrored messages, got: " + mirrorMessageCount.get(),
mirrorMessageCount.get() >= 3);
}

// 3. Trigger call with Metadata
Metadata headers = new Metadata();
headers.put(testKey, "shadow-value");
// ─── Test 4: Null mirrorChannel throws NullPointerException ─────────────────

ClientCall<String, String> call = interceptedChannel.newCall(method, CallOptions.DEFAULT);
call.start(new ClientCall.Listener<String>() {}, headers);
@Test
public void nullMirrorChannelThrowsException() {
try {
new MirroringInterceptor(null, Runnable::run);
fail("Expected NullPointerException for null mirrorChannel");
} catch (NullPointerException e) {
assertNotNull(e);
}
}

// ─── Test 5: Null executor throws NullPointerException ──────────────────────

@Test
public void nullExecutorThrowsException() throws Exception {
String mirrorName = buildAutoCloseServer(null, null, null, "");
ManagedChannel mirrorChannel =
grpcCleanup.register(InProcessChannelBuilder.forName(mirrorName).build());
try {
new MirroringInterceptor(mirrorChannel, null);
fail("Expected NullPointerException for null executor");
} catch (NullPointerException e) {
assertNotNull(e);
}
}

// ─── Test 6: Mirror call failure is handled silently ────────────────────────

@Test
public void mirrorCallFailureDoesNotAffectPrimary() throws Exception {
CountDownLatch primaryLatch = new CountDownLatch(1);

String primaryName = InProcessServerBuilder.generateName();
grpcCleanup.register(
InProcessServerBuilder.forName(primaryName)
.directExecutor()
.addService(
ServerServiceDefinition.builder("test")
.addMethod(method, (call, headers) -> {
primaryLatch.countDown();
call.sendHeaders(new Metadata());
call.close(Status.OK, new Metadata());
return new ServerCall.Listener<String>() {};
})
.build())
.build()
.start());

// Mirror channel points to non-existent server — will fail silently
ManagedChannel brokenMirrorChannel = grpcCleanup.register(
InProcessChannelBuilder.forName("non-existent-server-xyz").build());
ManagedChannel primaryChannel =
grpcCleanup.register(InProcessChannelBuilder.forName(primaryName).build());

Channel interceptedChannel = ClientInterceptors.intercept(
primaryChannel, new MirroringInterceptor(brokenMirrorChannel, Runnable::run));

ClientCall<String, String> call =
interceptedChannel.newCall(method, CallOptions.DEFAULT);
call.start(new ClientCall.Listener<String>() {}, new Metadata());
call.sendMessage("hello");
call.halfClose();

// 4. Assertions
assertTrue("Mirror server was not reached", mirrorLatch.await(1, TimeUnit.SECONDS));
assertTrue(
"Headers were not correctly mirrored to shadow service", mirrorHeaderVerified.get());
System.out.println("FULL MIRRORING SUCCESSFUL!");
// Primary should still succeed even if mirror fails
assertTrue("Primary call was affected by mirror failure",
primaryLatch.await(1, TimeUnit.SECONDS));
}
}