Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.google.cloud.spanner.spi.v1;

import com.google.api.core.InternalApi;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CacheUpdate;
import com.google.spanner.v1.CommitRequest;
Expand All @@ -34,8 +36,13 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import javax.annotation.Nullable;

Expand All @@ -47,11 +54,16 @@
@InternalApi
public final class ChannelFinder {
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
private static final int MAX_CACHE_UPDATE_THREADS =
Math.max(2, Runtime.getRuntime().availableProcessors());
private static final ExecutorService CACHE_UPDATE_POOL = createCacheUpdatePool();

private final Object updateLock = new Object();
private final AtomicLong databaseId = new AtomicLong();
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
private final KeyRangeCache rangeCache;
private final AtomicReference<CacheUpdate> pendingUpdate = new AtomicReference<>();
private volatile java.util.concurrent.CountDownLatch drainingLatch;
@Nullable private final EndpointLifecycleManager lifecycleManager;
@Nullable private final String finderKey;

Expand All @@ -77,6 +89,22 @@ void useDeterministicRandom() {
rangeCache.useDeterministicRandom();
}

private static ExecutorService createCacheUpdatePool() {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
MAX_CACHE_UPDATE_THREADS,
MAX_CACHE_UPDATE_THREADS,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("spanner-cache-update-%d")
.build());
executor.allowCoreThreadTimeOut(true);
return executor;
}

public void update(CacheUpdate update) {
synchronized (updateLock) {
long currentId = databaseId.get();
Expand Down Expand Up @@ -112,6 +140,49 @@ public void update(CacheUpdate update) {
}
}

public void updateAsync(CacheUpdate update) {
// Replace any pending update atomically. Each CacheUpdate contains the full current state,
// so intermediate updates can be safely dropped to prevent unbounded queue growth.
if (pendingUpdate.getAndSet(update) == null) {
// No previous pending update means no drain task is scheduled yet — submit one.
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
drainingLatch = latch;
CACHE_UPDATE_POOL.execute(
() -> {
try {
drainPendingUpdate();
} finally {
latch.countDown();
}
});
}
}

private void drainPendingUpdate() {
CacheUpdate toApply;
while ((toApply = pendingUpdate.getAndSet(null)) != null) {
update(toApply);
}
}

/**
* Test-only hook used by {@link KeyAwareChannel#awaitPendingCacheUpdates()} to wait until the
* async cache update worker has finished applying the latest pending update.
*/
@VisibleForTesting
void awaitPendingUpdates() throws InterruptedException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this method to src/test/java? Or otherwise add a comment that it should only be called from test code? (The @VisibleForTesting annotation is intended to indicate that a method has higher visibility than otherwise needed to be able to test, it does not indicate that it should only be invoked from test code.)

// Spin until no pending update remains.
long deadline = System.nanoTime() + java.util.concurrent.TimeUnit.SECONDS.toNanos(5);
while (pendingUpdate.get() != null && System.nanoTime() < deadline) {
Thread.sleep(1);
}
// Wait for the drain task to fully complete (including the update() call).
java.util.concurrent.CountDownLatch latch = drainingLatch;
if (latch != null) {
latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
}
}

public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ class EndpointLifecycleManager {
*/
private static final int MAX_TRANSIENT_FAILURE_COUNT = 3;

private enum EvictionReason {
TRANSIENT_FAILURE,
SHUTDOWN,
IDLE,
STALE
}

/** Per-endpoint lifecycle state. */
static final class EndpointState {
final String address;
Expand All @@ -95,6 +102,7 @@ static final class EndpointState {

private final ChannelEndpointCache endpointCache;
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();

/**
* Active addresses reported by each ChannelFinder, keyed by database id.
Expand All @@ -103,8 +111,8 @@ static final class EndpointState {
* stable database-id key instead of a strong ChannelFinder reference. KeyAwareChannel unregisters
* stale entries when a finder is cleared.
*
* <p>All reads and writes to this map, and stale-endpoint eviction based on it, are synchronized
* on {@link #activeAddressLock}.
* <p>All reads and writes to this map, and all updates to {@link
* #transientFailureEvictedAddresses}, are synchronized on {@link #activeAddressLock}.
*/
private final Map<String, Set<String>> activeAddressesPerFinder = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -187,6 +195,24 @@ private boolean ensureEndpointExists(String address) {
return created[0];
}

private void retainTransientFailureEvictionMarkers(Set<String> activeAddresses) {
synchronized (activeAddressLock) {
transientFailureEvictedAddresses.retainAll(activeAddresses);
}
}

private void markTransientFailureEvicted(String address) {
synchronized (activeAddressLock) {
transientFailureEvictedAddresses.add(address);
}
}

private void clearTransientFailureEvictionMarker(String address) {
synchronized (activeAddressLock) {
transientFailureEvictedAddresses.remove(address);
}
}

/**
* Records that real (non-probe) traffic was routed to an endpoint. This refreshes the idle
* eviction timer for this endpoint.
Expand Down Expand Up @@ -235,6 +261,7 @@ void updateActiveAddresses(String finderKey, Set<String> activeAddresses) {
for (Set<String> addresses : activeAddressesPerFinder.values()) {
allActive.addAll(addresses);
}
retainTransientFailureEvictionMarkers(allActive);

// Evict managed endpoints not referenced by any finder.
List<String> stale = new ArrayList<>();
Expand Down Expand Up @@ -276,6 +303,7 @@ void unregisterFinder(String finderKey) {
for (Set<String> addresses : activeAddressesPerFinder.values()) {
allActive.addAll(addresses);
}
retainTransientFailureEvictionMarkers(allActive);

List<String> stale = new ArrayList<>();
for (String address : endpoints.keySet()) {
Expand Down Expand Up @@ -412,6 +440,7 @@ private void probe(String address) {
case READY:
state.lastReadyAt = clock.instant();
state.consecutiveTransientFailures = 0;
clearTransientFailureEvictionMarker(address);
break;

case IDLE:
Expand Down Expand Up @@ -439,13 +468,13 @@ private void probe(String address) {
Level.FINE,
"Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes",
new Object[] {address, state.consecutiveTransientFailures});
evictEndpoint(address);
evictEndpoint(address, EvictionReason.TRANSIENT_FAILURE);
}
break;

case SHUTDOWN:
logger.log(Level.FINE, "Probe for {0}: channel SHUTDOWN, evicting endpoint", address);
evictEndpoint(address);
evictEndpoint(address, EvictionReason.SHUTDOWN);
break;

default:
Expand Down Expand Up @@ -482,16 +511,26 @@ void checkIdleEviction() {
}

for (String address : toEvict) {
evictEndpoint(address);
evictEndpoint(address, EvictionReason.IDLE);
}
}

/** Evicts an endpoint: stops probing, removes from tracking, shuts down the channel. */
private void evictEndpoint(String address) {
evictEndpoint(address, EvictionReason.STALE);
}

/** Evicts an endpoint and records whether it should still be reported as unhealthy. */
private void evictEndpoint(String address, EvictionReason reason) {
logger.log(Level.FINE, "Evicting endpoint {0}", address);

stopProbing(address);
endpoints.remove(address);
if (reason == EvictionReason.TRANSIENT_FAILURE) {
markTransientFailureEvicted(address);
} else {
clearTransientFailureEvictionMarker(address);
}
endpointCache.evict(address);
}

Expand Down Expand Up @@ -526,6 +565,10 @@ boolean isManaged(String address) {
return endpoints.containsKey(address);
}

boolean wasRecentlyEvictedTransientFailure(String address) {
return transientFailureEvictedAddresses.contains(address);
}

/** Returns the endpoint state for testing. */
@VisibleForTesting
EndpointState getEndpointState(String address) {
Expand Down Expand Up @@ -558,6 +601,7 @@ void shutdown() {
}
}
endpoints.clear();
transientFailureEvictedAddresses.clear();

scheduler.shutdown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) {
return finder;
}

@com.google.common.annotations.VisibleForTesting
void awaitPendingCacheUpdates() throws InterruptedException {
for (ChannelFinderReference ref : channelFinders.values()) {
ChannelFinder finder = ref.get();
if (finder != null) {
finder.awaitPendingUpdates();
}
}
}

/** Records real traffic to the selected endpoint for idle eviction tracking. */
private void onRequestRouted(@Nullable ChannelEndpoint selectedEndpoint) {
if (lifecycleManager == null) {
Expand Down Expand Up @@ -798,25 +808,25 @@ public void onMessage(ResponseT message) {
if (message instanceof PartialResultSet) {
PartialResultSet response = (PartialResultSet) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
transactionId = transactionIdFromMetadata(response);
} else if (message instanceof ResultSet) {
ResultSet response = (ResultSet) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
transactionId = transactionIdFromMetadata(response);
} else if (message instanceof Transaction) {
Transaction response = (Transaction) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
transactionId = transactionIdFromTransaction(response);
} else if (message instanceof CommitResponse) {
CommitResponse response = (CommitResponse) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
}
if (transactionId != null) {
Expand Down
Loading
Loading