|
23 | 23 |
|
24 | 24 | import com.google.cloud.NoCredentials; |
25 | 25 | import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; |
26 | | -import com.google.common.util.concurrent.ListeningExecutorService; |
27 | | -import com.google.common.util.concurrent.MoreExecutors; |
28 | 26 | import com.google.protobuf.ListValue; |
29 | 27 | import com.google.spanner.v1.ResultSetMetadata; |
30 | 28 | import com.google.spanner.v1.SpannerGrpc; |
|
40 | 38 | import io.grpc.ServerInterceptor; |
41 | 39 | import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; |
42 | 40 | import java.net.InetSocketAddress; |
43 | | -import java.time.Duration; |
44 | 41 | import java.util.Arrays; |
45 | 42 | import java.util.Collection; |
| 43 | +import java.util.Deque; |
46 | 44 | import java.util.Set; |
47 | 45 | import java.util.concurrent.ConcurrentHashMap; |
48 | | -import java.util.concurrent.CountDownLatch; |
49 | | -import java.util.concurrent.Executors; |
| 46 | +import java.util.concurrent.ConcurrentLinkedDeque; |
50 | 47 | import java.util.logging.Level; |
51 | 48 | import java.util.logging.Logger; |
52 | 49 | import org.junit.After; |
@@ -103,6 +100,7 @@ public static Collection<Object[]> data() { |
103 | 100 | // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method |
104 | 101 | private static final Set<Long> batchCreateSessionChannelHints = ConcurrentHashMap.newKeySet(); |
105 | 102 | private static final Set<Long> executeSqlChannelHints = ConcurrentHashMap.newKeySet(); |
| 103 | + private static final Deque<Long> allExecuteSqlChannelHints = new ConcurrentLinkedDeque<>(); |
106 | 104 |
|
107 | 105 | private static Level originalLogLevel; |
108 | 106 |
|
@@ -148,6 +146,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( |
148 | 146 | if (call.getMethodDescriptor() |
149 | 147 | .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { |
150 | 148 | executeSqlChannelHints.add(channelHint); |
| 149 | + allExecuteSqlChannelHints.add(channelHint); |
151 | 150 | } |
152 | 151 | } catch (NumberFormatException e) { |
153 | 152 | // Ignore parse errors |
@@ -185,6 +184,7 @@ public void reset() { |
185 | 184 | mockSpanner.reset(); |
186 | 185 | batchCreateSessionChannelHints.clear(); |
187 | 186 | executeSqlChannelHints.clear(); |
| 187 | + allExecuteSqlChannelHints.clear(); |
188 | 188 | } |
189 | 189 |
|
190 | 190 | private SpannerOptions createSpannerOptions() { |
@@ -215,35 +215,20 @@ public void testUsesAllChannels() throws InterruptedException { |
215 | 215 | final int multiplier = 10; |
216 | 216 | try (Spanner spanner = createSpannerOptions().getService()) { |
217 | 217 | DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); |
218 | | - ListeningExecutorService executor = |
219 | | - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numChannels * multiplier)); |
220 | | - CountDownLatch latch = new CountDownLatch(numChannels * multiplier); |
221 | 218 | for (int run = 0; run < numChannels * multiplier; run++) { |
222 | | - executor.submit( |
223 | | - () -> { |
224 | | - // Use a multi-use read-only transaction to make sure we keep a session in use for |
225 | | - // a longer period of time. |
226 | | - try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) { |
227 | | - try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { |
228 | | - while (resultSet.next()) {} |
229 | | - } |
230 | | - latch.countDown(); |
231 | | - // Wait here until we now that all threads have reached this point and have a |
232 | | - // session in use. |
233 | | - latch.await(); |
234 | | - try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { |
235 | | - while (resultSet.next()) {} |
236 | | - } |
237 | | - } |
238 | | - return true; |
239 | | - }); |
| 219 | + try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) { |
| 220 | + for (int i = 0; i < 2; i++) { |
| 221 | + try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { |
| 222 | + while (resultSet.next()) {} |
| 223 | + } |
| 224 | + } |
| 225 | + } |
240 | 226 | } |
241 | | - executor.shutdown(); |
242 | | - assertTrue(executor.awaitTermination(Duration.ofSeconds(10L))); |
243 | 227 | } |
244 | 228 | // Bound the channel hints to numChannels (matching gRPC-GCP behavior) and verify |
245 | 229 | // that channels are being distributed. The raw channel hints may be unbounded (based on |
246 | 230 | // session index), but gRPC-GCP bounds them to the actual number of channels. |
| 231 | + assertEquals(2 * numChannels * multiplier, allExecuteSqlChannelHints.size()); |
247 | 232 | Set<Long> boundedChannelHints = |
248 | 233 | executeSqlChannelHints.stream().map(hint -> hint % numChannels).collect(toSet()); |
249 | 234 | // Verify that channel distribution is working: |
|
0 commit comments