diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java index 8936863716..6f40796318 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java @@ -58,6 +58,7 @@ import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.CancellableDependency; +import org.apache.hc.core5.concurrent.ComplexCancellable; import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.ContentType; @@ -98,6 +99,8 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler private final HttpAsyncCache responseCache; private final DefaultAsyncCacheRevalidator cacheRevalidator; private final ConditionalRequestBuilder conditionalRequestBuilder; + private final boolean requestCollapsingEnabled; + private final CacheRequestCollapser collapser; AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) { super(config); @@ -105,6 +108,8 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler this.cacheRevalidator = cacheRevalidator; this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(request -> BasicRequestBuilder.copy(request).build()); + this.requestCollapsingEnabled = config.isRequestCollapsingEnabled(); + this.collapser = this.requestCollapsingEnabled ? new CacheRequestCollapser() : null; } AsyncCachingExec( @@ -274,6 +279,96 @@ public void completed(final CacheMatch result) { final CacheHit hit = result != null ? result.hit : null; final CacheHit root = result != null ? result.root : null; if (hit == null) { + if (requestCollapsingEnabled && root == null && entityProducer == null && !requestCacheControl.isOnlyIfCached()) { + final String cacheKey = CacheKeyGenerator.INSTANCE.generateKey(target, request); + final CacheRequestCollapser.Token token = collapser.enter(cacheKey); + if (token.isLeader()) { + handleCacheMiss(requestCacheControl, null, target, request, null, scope, chain, new AsyncExecCallback() { + + @Override + public AsyncDataConsumer handleResponse( + final HttpResponse response, + final EntityDetails entityDetails) throws HttpException, IOException { + try { + return asyncExecCallback.handleResponse(response, entityDetails); + } catch (final HttpException | IOException ex) { + token.complete(); + throw ex; + } + } + + @Override + public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { + try { + asyncExecCallback.handleInformationResponse(response); + } catch (final HttpException | IOException ex) { + token.complete(); + throw ex; + } + } + + @Override + public void completed() { + try { + asyncExecCallback.completed(); + } finally { + token.complete(); + } + } + + @Override + public void failed(final Exception cause) { + try { + asyncExecCallback.failed(cause); + } finally { + token.complete(); + } + } + + }); + } else { + // Stable holder owned by the follower: registered with the outer operation + // exactly once so the cache-lookup dependency installed by the await task + // is never overwritten from the outside. + final ComplexCancellable follower = new ComplexCancellable(); + operation.setDependency(follower); + collapser.await(token, follower, () -> { + if (follower.isCancelled()) { + return; + } + follower.setDependency(responseCache.match(target, request, new FutureCallback() { + + @Override + public void completed(final CacheMatch result) { + final CacheHit hit = result != null ? result.hit : null; + final CacheHit root = result != null ? result.root : null; + if (hit == null) { + handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback); + } else { + final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry); + if (LOG.isDebugEnabled()) { + LOG.debug("{} response cache control: {}", exchangeId, responseCacheControl); + } + context.setResponseCacheControl(responseCacheControl); + handleCacheHit(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback); + } + } + + @Override + public void failed(final Exception cause) { + asyncExecCallback.failed(cause); + } + + @Override + public void cancelled() { + asyncExecCallback.failed(new InterruptedIOException()); + } + + })); + }); + } + return; + } handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback); } else { final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry); diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheConfig.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheConfig.java index 8d31913e6a..eb6c769188 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheConfig.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheConfig.java @@ -120,6 +120,9 @@ public class CacheConfig implements Cloneable { */ public static final int DEFAULT_ASYNCHRONOUS_WORKERS = 1; + /** Default setting for the request-collapsing hint. */ + public static final boolean DEFAULT_REQUEST_COLLAPSING_ENABLED = false; + public static final CacheConfig DEFAULT = new Builder().build(); private final long maxObjectSize; @@ -133,6 +136,7 @@ public class CacheConfig implements Cloneable { private final int asynchronousWorkers; private final boolean neverCacheHTTP10ResponsesWithQuery; private final boolean staleIfErrorEnabled; + private final boolean requestCollapsingEnabled; /** @@ -153,7 +157,8 @@ public class CacheConfig implements Cloneable { final int asynchronousWorkers, final boolean neverCacheHTTP10ResponsesWithQuery, final boolean neverCacheHTTP11ResponsesWithQuery, - final boolean staleIfErrorEnabled) { + final boolean staleIfErrorEnabled, + final boolean requestCollapsingEnabled) { super(); this.maxObjectSize = maxObjectSize; this.maxCacheEntries = maxCacheEntries; @@ -167,6 +172,7 @@ public class CacheConfig implements Cloneable { this.neverCacheHTTP10ResponsesWithQuery = neverCacheHTTP10ResponsesWithQuery; this.neverCacheHTTP11ResponsesWithQuery = neverCacheHTTP11ResponsesWithQuery; this.staleIfErrorEnabled = staleIfErrorEnabled; + this.requestCollapsingEnabled = requestCollapsingEnabled; } /** @@ -301,6 +307,21 @@ public int getAsynchronousWorkers() { return asynchronousWorkers; } + /** + * Returns whether the caching module should attempt to collapse concurrent + * requests for the same cache key so that only one request goes to the + * backend while the others wait and then re-check the cache. + *

+ * This is a hint. Individual caching implementations may choose to honour + * it or ignore it; the asynchronous caching exec honours it, while the + * classic caching exec currently ignores it. + * + * @since 5.7 + */ + public boolean isRequestCollapsingEnabled() { + return requestCollapsingEnabled; + } + @Override protected CacheConfig clone() throws CloneNotSupportedException { return (CacheConfig) super.clone(); @@ -323,7 +344,8 @@ public static Builder copy(final CacheConfig config) { .setAsynchronousWorkers(config.getAsynchronousWorkers()) .setNeverCacheHTTP10ResponsesWithQueryString(config.isNeverCacheHTTP10ResponsesWithQuery()) .setNeverCacheHTTP11ResponsesWithQueryString(config.isNeverCacheHTTP11ResponsesWithQuery()) - .setStaleIfErrorEnabled(config.isStaleIfErrorEnabled()); + .setStaleIfErrorEnabled(config.isStaleIfErrorEnabled()) + .setRequestCollapsingEnabled(config.isRequestCollapsingEnabled()); } public static class Builder { @@ -340,6 +362,7 @@ public static class Builder { private boolean neverCacheHTTP10ResponsesWithQuery; private boolean neverCacheHTTP11ResponsesWithQuery; private boolean staleIfErrorEnabled; + private boolean requestCollapsingEnabled; Builder() { this.maxObjectSize = DEFAULT_MAX_OBJECT_SIZE_BYTES; @@ -352,6 +375,7 @@ public static class Builder { this.freshnessCheckEnabled = true; this.asynchronousWorkers = DEFAULT_ASYNCHRONOUS_WORKERS; this.staleIfErrorEnabled = false; + this.requestCollapsingEnabled = DEFAULT_REQUEST_COLLAPSING_ENABLED; } /** @@ -518,6 +542,23 @@ public Builder setNeverCacheHTTP11ResponsesWithQueryString( return this; } + /** + * Enables request collapsing for cacheable requests. When enabled, concurrent + * requests for the same cache key are coalesced so that only one request goes + * to the backend while the others wait and then re-check the cache. + *

+ * This setting is a hint. Individual caching implementations may honour it or + * ignore it; the asynchronous caching exec honours it, while the classic + * caching exec currently ignores it. + * + * @return this instance. + * @since 5.7 + */ + public Builder setRequestCollapsingEnabled(final boolean requestCollapsingEnabled) { + this.requestCollapsingEnabled = requestCollapsingEnabled; + return this; + } + public CacheConfig build() { return new CacheConfig( maxObjectSize, @@ -531,7 +572,8 @@ public CacheConfig build() { asynchronousWorkers, neverCacheHTTP10ResponsesWithQuery, neverCacheHTTP11ResponsesWithQuery, - staleIfErrorEnabled); + staleIfErrorEnabled, + requestCollapsingEnabled); } } @@ -551,6 +593,7 @@ public String toString() { .append(", neverCacheHTTP10ResponsesWithQuery=").append(this.neverCacheHTTP10ResponsesWithQuery) .append(", neverCacheHTTP11ResponsesWithQuery=").append(this.neverCacheHTTP11ResponsesWithQuery) .append(", staleIfErrorEnabled=").append(this.staleIfErrorEnabled) + .append(", requestCollapsingEnabled=").append(this.requestCollapsingEnabled) .append("]"); return builder.toString(); } diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java new file mode 100644 index 0000000000..6766d55630 --- /dev/null +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java @@ -0,0 +1,158 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.CancellableDependency; +import org.apache.hc.core5.util.Args; + +/** + * Coordinates concurrent requests for the same cache key so that only one request + * goes to the backend while others wait for it to complete and then re-check the cache. + *

+ * Each {@link AsyncCachingExec} owns its own instance; collapse state is never shared + * across executors or clients. + */ +@Internal +final class CacheRequestCollapser { + + static final class Token { + + private final ConcurrentHashMap inflight; + private final String key; + private final Entry entry; + private final boolean leader; + + private Token(final ConcurrentHashMap inflight, final String key, final Entry entry, final boolean leader) { + this.inflight = inflight; + this.key = key; + this.entry = entry; + this.leader = leader; + } + + boolean isLeader() { + return leader; + } + + void complete() { + if (entry.completed.compareAndSet(false, true)) { + inflight.remove(key, entry); + entry.drain(); + } + } + + } + + private static final class Waiter implements Cancellable { + + private final AtomicBoolean cancelled; + private final Runnable task; + + private Waiter(final Runnable task) { + this.cancelled = new AtomicBoolean(false); + this.task = task; + } + + @Override + public boolean cancel() { + return cancelled.compareAndSet(false, true); + } + + void runIfNotCancelled() { + if (!cancelled.get()) { + task.run(); + } + } + + } + + private static final class Entry { + + private final AtomicBoolean completed; + private final ConcurrentLinkedQueue waiters; + + private Entry() { + this.completed = new AtomicBoolean(false); + this.waiters = new ConcurrentLinkedQueue<>(); + } + + private void await(final CancellableDependency holder, final Runnable task) { + if (completed.get()) { + task.run(); + return; + } + final Waiter waiter = new Waiter(task); + // Install the waiter into the holder before publishing it to the queue so + // a concurrent drain cannot run the task before cancellation is wired up. + holder.setDependency(waiter); + waiters.add(waiter); + if (completed.get()) { + drain(); + } + } + + private void drain() { + for (; ; ) { + final Waiter waiter = waiters.poll(); + if (waiter == null) { + return; + } + waiter.runIfNotCancelled(); + } + } + + } + + private final ConcurrentHashMap inflight; + + CacheRequestCollapser() { + this.inflight = new ConcurrentHashMap<>(); + } + + Token enter(final String key) { + Args.notEmpty(key, "Key"); + final Entry created = new Entry(); + final Entry existing = inflight.putIfAbsent(key, created); + if (existing == null) { + return new Token(inflight, key, created, true); + } + return new Token(inflight, key, existing, false); + } + + void await(final Token token, final CancellableDependency holder, final Runnable task) { + Args.notNull(token, "Token"); + Args.notNull(holder, "Holder"); + Args.notNull(task, "Task"); + token.entry.await(holder, task); + } + +} diff --git a/httpclient5-cache/src/test/java/org/apache/hc/client5/http/cache/example/AsyncClientCacheRequestCollapsing.java b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/cache/example/AsyncClientCacheRequestCollapsing.java new file mode 100644 index 0000000000..7042b55f7a --- /dev/null +++ b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/cache/example/AsyncClientCacheRequestCollapsing.java @@ -0,0 +1,119 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.cache.example; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.cache.CacheContextBuilder; +import org.apache.hc.client5.http.cache.HttpCacheContext; +import org.apache.hc.client5.http.cache.RequestCacheControl; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.cache.CacheConfig; +import org.apache.hc.client5.http.impl.cache.CachingHttpAsyncClients; +import org.apache.hc.client5.http.impl.cache.HeapResourceFactory; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.io.CloseMode; + +/** + * This is an example demonstrating how to enable request collapsing in the async HTTP cache. + * When enabled, concurrent requests for the same cache key are coalesced so that only one + * request goes to the backend while the others wait and then re-check the cache. + */ +public class AsyncClientCacheRequestCollapsing { + + public static void main(final String[] args) throws Exception { + + final HttpHost target = new HttpHost("https", "www.apache.org"); + + try (final CloseableHttpAsyncClient httpclient = CachingHttpAsyncClients.custom() + .setCacheConfig(CacheConfig.custom() + .setMaxObjectSize(200000) + .setHeuristicCachingEnabled(true) + .setRequestCollapsingEnabled(true) + .build()) + .setResourceFactory(HeapResourceFactory.INSTANCE) + .build()) { + + httpclient.start(); + + final int burst = 5; + final List> futures = new ArrayList<>(burst); + + for (int i = 0; i < burst; i++) { + final SimpleHttpRequest httpget = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/") + .build(); + + // One context per request: HttpCacheContext is not thread-safe. + final HttpCacheContext context = CacheContextBuilder.create() + .setCacheControl(RequestCacheControl.DEFAULT) + .build(); + + System.out.println("Executing request " + httpget.getMethod() + " " + httpget.getUri()); + futures.add(httpclient.execute( + SimpleRequestProducer.create(httpget), + SimpleResponseConsumer.create(), + context, + new FutureCallback() { + + @Override + public void completed(final SimpleHttpResponse response) { + System.out.println(httpget + "->" + new StatusLine(response)); + System.out.println("Cache status: " + context.getCacheResponseStatus()); + } + + @Override + public void failed(final Exception ex) { + System.out.println(httpget + "->" + ex); + } + + @Override + public void cancelled() { + System.out.println(httpget + " cancelled"); + } + + })); + } + + for (final Future future : futures) { + future.get(); + } + + httpclient.close(CloseMode.GRACEFUL); + } + } +} diff --git a/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java new file mode 100644 index 0000000000..474b6b4614 --- /dev/null +++ b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java @@ -0,0 +1,215 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.cache.CacheResponseStatus; +import org.apache.hc.client5.http.cache.HttpCacheContext; +import org.apache.hc.client5.http.cache.RequestCacheControl; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.utils.DateUtils; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.io.CloseMode; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestAsyncCachingExecRequestCollapsing { + + private static final class RoundResult { + private final int originHits; + private final int cacheMisses; + private final int cacheHits; + + private RoundResult(final int originHits, final int cacheMisses, final int cacheHits) { + this.originHits = originHits; + this.cacheMisses = cacheMisses; + this.cacheHits = cacheHits; + } + } + + @Test + void testRequestCollapsingPreventsThunderingHerdOnColdMiss() throws Exception { + final AtomicInteger originHits = new AtomicInteger(0); + + final HttpServer server = HttpServer.create(new InetSocketAddress(0), 0); + server.createContext("/", exchange -> handleOrigin(exchange, originHits)); + final ExecutorService executorService = Executors.newCachedThreadPool(); + server.setExecutor(executorService); + server.start(); + + try { + final int port = server.getAddress().getPort(); + final HttpHost target = new HttpHost("http", "localhost", port); + final int concurrent = 20; + + originHits.set(0); + final RoundResult baseline = runRound(target, concurrent, false, originHits); + Assertions.assertEquals(concurrent, baseline.originHits, "Baseline must hit origin N times"); + Assertions.assertEquals(concurrent, baseline.cacheMisses, "Baseline must be all CACHE_MISS on cold miss"); + Assertions.assertEquals(0, baseline.cacheHits, "Baseline must have no CACHE_HIT on cold miss"); + + originHits.set(0); + final RoundResult collapsed = runRound(target, concurrent, true, originHits); + Assertions.assertEquals(1, collapsed.originHits, "Collapsing must allow only one origin request"); + Assertions.assertEquals(1, collapsed.cacheMisses, "Collapsing must have exactly one CACHE_MISS leader"); + Assertions.assertEquals(concurrent - 1, collapsed.cacheHits, "Collapsing must serve followers from cache"); + } finally { + server.stop(0); + executorService.shutdownNow(); + } + } + + private static void handleOrigin(final HttpExchange exchange, final AtomicInteger originHits) throws IOException { + originHits.incrementAndGet(); + + // Keep the origin "busy" so concurrent client requests overlap and all see a cold cache. + try { + Thread.sleep(250); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + final byte[] body = "OK".getBytes(StandardCharsets.US_ASCII); + + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=us-ascii"); + exchange.getResponseHeaders().add("Cache-Control", "public, max-age=60"); + exchange.getResponseHeaders().add("Date", DateUtils.formatStandardDate(Instant.now())); + + exchange.sendResponseHeaders(200, body.length); + try (final OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + + private static RoundResult runRound( + final HttpHost target, + final int concurrent, + final boolean requestCollapsingEnabled, + final AtomicInteger originHits) throws Exception { + + final CacheConfig cacheConfig = CacheConfig.custom() + .setHeuristicCachingEnabled(false) + .setRequestCollapsingEnabled(requestCollapsingEnabled) + .build(); + + final AtomicInteger cacheMisses = new AtomicInteger(0); + final AtomicInteger cacheHits = new AtomicInteger(0); + + final CloseableHttpAsyncClient client = CachingHttpAsyncClients.custom() + .setCacheConfig(cacheConfig) + .setResourceFactory(HeapResourceFactory.INSTANCE) + .build(); + + client.start(); + + try { + final List> futures = new ArrayList<>(concurrent); + final CountDownLatch done = new CountDownLatch(concurrent); + final AtomicInteger failures = new AtomicInteger(0); + + for (int i = 0; i < concurrent; i++) { + final SimpleHttpRequest request = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/") + .build(); + + // Use one context per request. + final HttpCacheContext context = HttpCacheContext.create(); + context.setRequestCacheControl(RequestCacheControl.DEFAULT); + + futures.add(client.execute( + SimpleRequestProducer.create(request), + SimpleResponseConsumer.create(), + context, + new FutureCallback() { + + @Override + public void completed(final SimpleHttpResponse result) { + final CacheResponseStatus status = context.getCacheResponseStatus(); + if (status == CacheResponseStatus.CACHE_MISS) { + cacheMisses.incrementAndGet(); + } else if (status == CacheResponseStatus.CACHE_HIT) { + cacheHits.incrementAndGet(); + } else { + // For this test we only expect HIT or MISS. + failures.incrementAndGet(); + } + done.countDown(); + } + + @Override + public void failed(final Exception ex) { + failures.incrementAndGet(); + done.countDown(); + } + + @Override + public void cancelled() { + failures.incrementAndGet(); + done.countDown(); + } + + })); + } + + Assertions.assertTrue(done.await(30, TimeUnit.SECONDS), "Requests did not complete in time"); + Assertions.assertEquals(0, failures.get(), "Unexpected failures / cache statuses"); + + // Also ensure futures are all done / propagate any hidden exception. + for (final Future f : futures) { + f.get(5, TimeUnit.SECONDS); + } + + return new RoundResult(originHits.get(), cacheMisses.get(), cacheHits.get()); + } finally { + client.close(CloseMode.GRACEFUL); + } + } + +} diff --git a/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestCacheRequestCollapser.java b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestCacheRequestCollapser.java new file mode 100644 index 0000000000..db4769ab6b --- /dev/null +++ b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestCacheRequestCollapser.java @@ -0,0 +1,222 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.ComplexCancellable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestCacheRequestCollapser { + + @Test + void testSingleLeaderForSameKey() throws Exception { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + final int threads = 32; + + final ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + final CountDownLatch ready = new CountDownLatch(threads); + final CountDownLatch start = new CountDownLatch(1); + + final AtomicInteger leaders = new AtomicInteger(0); + final List> futures = new ArrayList<>(threads); + + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(() -> { + ready.countDown(); + start.await(5, TimeUnit.SECONDS); + final CacheRequestCollapser.Token token = collapser.enter(key); + if (token.isLeader()) { + leaders.incrementAndGet(); + } + return token; + })); + } + + Assertions.assertTrue(ready.await(5, TimeUnit.SECONDS)); + start.countDown(); + + final List tokens = new ArrayList<>(threads); + for (final Future f : futures) { + tokens.add(f.get(5, TimeUnit.SECONDS)); + } + + Assertions.assertEquals(1, leaders.get(), "Expected exactly one leader"); + + CacheRequestCollapser.Token leaderToken = null; + for (final CacheRequestCollapser.Token t : tokens) { + if (t.isLeader()) { + leaderToken = t; + break; + } + } + Assertions.assertNotNull(leaderToken); + leaderToken.complete(); + + // After completion, the next enter must produce a fresh leader. + final CacheRequestCollapser.Token next = collapser.enter(key); + Assertions.assertTrue(next.isLeader()); + next.complete(); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testAwaitRunsAfterComplete() throws Exception { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + Assertions.assertTrue(leader.isLeader()); + + final CacheRequestCollapser.Token follower = collapser.enter(key); + Assertions.assertFalse(follower.isLeader()); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + Assertions.assertEquals(0, runs.get()); + + leader.complete(); + + // drain() runs synchronously inside complete(), so the task has already executed. + Assertions.assertEquals(1, runs.get()); + } + + @Test + void testCancelledWaiterDoesNotRun() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token follower = collapser.enter(key); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + Assertions.assertTrue(holder.cancel()); + + leader.complete(); + + Assertions.assertEquals(0, runs.get(), "Cancelled waiter must not run"); + } + + @Test + void testCompleteIsIdempotent() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token follower = collapser.enter(key); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + + leader.complete(); + leader.complete(); + leader.complete(); + + Assertions.assertEquals(1, runs.get(), "Waiters must run exactly once"); + } + + @Test + void testLeaderCompletesBetweenEnterAndAwait() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token follower = collapser.enter(key); + Assertions.assertFalse(follower.isLeader()); + + leader.complete(); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + + Assertions.assertEquals(1, runs.get(), "Task must run synchronously when the leader is already complete"); + } + + @Test + void testFollowerCancelWhileWaiting() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token follower = collapser.enter(key); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + + Assertions.assertTrue(holder.cancel(), "Outer cancel via the holder must succeed"); + Assertions.assertTrue(holder.isCancelled()); + + leader.complete(); + + Assertions.assertEquals(0, runs.get(), "Task must not run after the holder has been cancelled"); + } + + @Test + void testLeaderFailureReleasesFollowers() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token f1 = collapser.enter(key); + final CacheRequestCollapser.Token f2 = collapser.enter(key); + final CacheRequestCollapser.Token f3 = collapser.enter(key); + + final AtomicInteger runs = new AtomicInteger(0); + collapser.await(f1, new ComplexCancellable(), runs::incrementAndGet); + collapser.await(f2, new ComplexCancellable(), runs::incrementAndGet); + collapser.await(f3, new ComplexCancellable(), runs::incrementAndGet); + + // Simulate the leader failure path: callback.failed() ends with token.complete(). + leader.complete(); + + Assertions.assertEquals(3, runs.get(), "All followers must be released when the leader completes (success or failure)"); + + // After release the key is free and the next caller becomes a fresh leader. + final CacheRequestCollapser.Token next = collapser.enter(key); + Assertions.assertTrue(next.isLeader()); + next.complete(); + } + +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/AsyncClientCacheRequestCollapsingSmokeTest.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/AsyncClientCacheRequestCollapsingSmokeTest.java new file mode 100644 index 0000000000..d49a0a2eb0 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/AsyncClientCacheRequestCollapsingSmokeTest.java @@ -0,0 +1,412 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.extension.async; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.cache.CacheConfig; +import org.apache.hc.client5.http.impl.cache.CachingHttpAsyncClientBuilder; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.io.CloseMode; + +public class AsyncClientCacheRequestCollapsingSmokeTest { + + private static final int REQUEST_COUNT = 20; + + public static void main(final String[] args) throws Exception { + final AtomicInteger cacheableHits = new AtomicInteger(); + final AtomicInteger nonCacheableHits = new AtomicInteger(); + final AtomicInteger varyHits = new AtomicInteger(); + final AtomicInteger postHits = new AtomicInteger(); + final AtomicInteger onlyIfCachedHits = new AtomicInteger(); + final AtomicInteger flakyHits = new AtomicInteger(); + final AtomicInteger cancelHits = new AtomicInteger(); + + final CountDownLatch cancelLeaderAccepted = new CountDownLatch(1); + final CountDownLatch cancelLeaderRelease = new CountDownLatch(1); + + final ExecutorService executorService = Executors.newFixedThreadPool(REQUEST_COUNT); + final HttpServer server = HttpServer.create(new InetSocketAddress(0), 0); + server.setExecutor(executorService); + + server.createContext("/cacheable", exchange -> { + final int count = cacheableHits.incrementAndGet(); + sleep(500); + send(exchange, 200, "payload-" + count, "public, max-age=60", null); + }); + + server.createContext("/non-cacheable", exchange -> { + final int count = nonCacheableHits.incrementAndGet(); + sleep(250); + send(exchange, 200, "non-cacheable-" + count, "no-store", null); + }); + + server.createContext("/vary", exchange -> { + final int count = varyHits.incrementAndGet(); + final String lang = exchange.getRequestHeaders().getFirst("Accept-Language"); + sleep(250); + send(exchange, 200, "variant-" + lang + "-" + count, "public, max-age=60", "Accept-Language"); + }); + + server.createContext("/post", exchange -> { + final int count = postHits.incrementAndGet(); + sleep(250); + send(exchange, 200, "post-" + count, "public, max-age=60", null); + }); + + server.createContext("/only-if-cached", exchange -> { + onlyIfCachedHits.incrementAndGet(); + send(exchange, 200, "must-not-be-called", "public, max-age=60", null); + }); + + server.createContext("/flaky", exchange -> { + final int count = flakyHits.incrementAndGet(); + sleep(300); + if (count == 1) { + send(exchange, 500, "boom", "no-store", null); + } else { + send(exchange, 200, "recovered-" + count, "public, max-age=60", null); + } + }); + + server.createContext("/cancel", exchange -> { + final int count = cancelHits.incrementAndGet(); + cancelLeaderAccepted.countDown(); + + try { + cancelLeaderRelease.await(5, TimeUnit.SECONDS); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + send(exchange, 200, "cancel-" + count, "public, max-age=60", null); + }); + + try { + server.start(); + + final String baseUri = "http://localhost:" + server.getAddress().getPort(); + + assertCacheableColdMissesCollapse(baseUri + "/cacheable", cacheableHits); + assertFailureDoesNotTrapFollowers(baseUri + "/flaky", flakyHits); + assertCancellationDoesNotTrapFollowers( + baseUri + "/cancel", + cancelHits, + cancelLeaderAccepted, + cancelLeaderRelease); + assertNonCacheableIsNotIncorrectlyCached(baseUri + "/non-cacheable", nonCacheableHits); + assertVaryDoesNotMixVariants(baseUri + "/vary", varyHits); + assertEntityRequestsAreNotCollapsed(baseUri + "/post", postHits); + assertOnlyIfCachedDoesNotHitOrigin(baseUri + "/only-if-cached", onlyIfCachedHits); + + System.out.println("All request collapsing smoke checks passed"); + } finally { + server.stop(0); + executorService.shutdownNow(); + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Server executor did not terminate"); + } + } + } + + private static void assertCacheableColdMissesCollapse( + final String uri, + final AtomicInteger originHits) throws Exception { + + originHits.set(0); + + try (final ClientResource clientResource = new ClientResource(true)) { + final List> futures = new ArrayList<>(); + + for (int i = 0; i < REQUEST_COUNT; i++) { + futures.add(clientResource.execute(SimpleRequestBuilder.get(uri).build())); + } + + for (final Future future : futures) { + final SimpleHttpResponse response = future.get(5, TimeUnit.SECONDS); + assertStatus(response, 200); + } + } + + assertEquals("cacheable collapsed origin hits", 1, originHits.get()); + System.out.println("cacheable concurrent GETs collapsed to 1 origin hit"); + } + + private static void assertFailureDoesNotTrapFollowers( + final String uri, + final AtomicInteger originHits) throws Exception { + + originHits.set(0); + + try (final ClientResource clientResource = new ClientResource(true)) { + final Future first = clientResource.execute(SimpleRequestBuilder.get(uri).build()); + final Future second = clientResource.execute(SimpleRequestBuilder.get(uri).build()); + + final SimpleHttpResponse firstResponse = first.get(5, TimeUnit.SECONDS); + final SimpleHttpResponse secondResponse = second.get(5, TimeUnit.SECONDS); + + assertTrue( + "failure smoke must complete both futures", + firstResponse.getCode() >= 500 || secondResponse.getCode() >= 500 || originHits.get() > 1); + + final SimpleHttpResponse retry = clientResource.execute(SimpleRequestBuilder.get(uri).build()) + .get(5, TimeUnit.SECONDS); + assertStatus(retry, 200); + } + + assertTrue("failure must not leave cache key permanently blocked", originHits.get() >= 2); + System.out.println("leader failure did not trap followers"); + } + + private static void assertCancellationDoesNotTrapFollowers( + final String uri, + final AtomicInteger originHits, + final CountDownLatch leaderAccepted, + final CountDownLatch leaderRelease) throws Exception { + + originHits.set(0); + + try (final ClientResource clientResource = new ClientResource(true)) { + final Future leader = clientResource.execute(SimpleRequestBuilder.get(uri).build()); + + assertTrue("leader request must reach origin", leaderAccepted.await(5, TimeUnit.SECONDS)); + + final Future follower = clientResource.execute(SimpleRequestBuilder.get(uri).build()); + + leader.cancel(true); + leaderRelease.countDown(); + + try { + follower.get(5, TimeUnit.SECONDS); + } catch (final Exception ex) { + // Acceptable here. The smoke check only verifies there is no hang and no retained key. + } + + final SimpleHttpResponse retry = clientResource.execute(SimpleRequestBuilder.get(uri).build()) + .get(5, TimeUnit.SECONDS); + assertStatus(retry, 200); + } + + assertTrue("cancellation must not leave cache key permanently blocked", originHits.get() >= 1); + System.out.println("leader cancellation did not trap followers"); + } + + private static void assertNonCacheableIsNotIncorrectlyCached( + final String uri, + final AtomicInteger originHits) throws Exception { + + originHits.set(0); + + try (final ClientResource clientResource = new ClientResource(true)) { + final SimpleHttpResponse first = clientResource.execute(SimpleRequestBuilder.get(uri).build()) + .get(5, TimeUnit.SECONDS); + final SimpleHttpResponse second = clientResource.execute(SimpleRequestBuilder.get(uri).build()) + .get(5, TimeUnit.SECONDS); + + assertStatus(first, 200); + assertStatus(second, 200); + + assertTrue( + "non-cacheable response must not be reused from cache", + !first.getBodyText().equals(second.getBodyText())); + } + + assertEquals("non-cacheable origin hits", 2, originHits.get()); + System.out.println("non-cacheable response was not incorrectly cached"); + } + + private static void assertVaryDoesNotMixVariants( + final String uri, + final AtomicInteger originHits) throws Exception { + + originHits.set(0); + + try (final ClientResource clientResource = new ClientResource(true)) { + final SimpleHttpRequest english = SimpleRequestBuilder.get(uri) + .addHeader("Accept-Language", "en") + .build(); + final SimpleHttpRequest spanish = SimpleRequestBuilder.get(uri) + .addHeader("Accept-Language", "es") + .build(); + + final Future englishFuture = clientResource.execute(english); + final Future spanishFuture = clientResource.execute(spanish); + + final SimpleHttpResponse englishResponse = englishFuture.get(5, TimeUnit.SECONDS); + final SimpleHttpResponse spanishResponse = spanishFuture.get(5, TimeUnit.SECONDS); + + assertStatus(englishResponse, 200); + assertStatus(spanishResponse, 200); + + assertTrue("English variant must be preserved", englishResponse.getBodyText().contains("variant-en")); + assertTrue("Spanish variant must be preserved", spanishResponse.getBodyText().contains("variant-es")); + } + + assertTrue("Vary requests should require distinct origin variants", originHits.get() >= 2); + System.out.println("Vary responses did not mix variants"); + } + + private static void assertEntityRequestsAreNotCollapsed( + final String uri, + final AtomicInteger originHits) throws Exception { + + originHits.set(0); + + try (final ClientResource clientResource = new ClientResource(true)) { + final List> futures = new ArrayList<>(); + + for (int i = 0; i < REQUEST_COUNT; i++) { + final SimpleHttpRequest request = SimpleRequestBuilder.post(uri) + .setBody("request-" + i, ContentType.TEXT_PLAIN) + .build(); + futures.add(clientResource.execute(request)); + } + + for (final Future future : futures) { + assertStatus(future.get(5, TimeUnit.SECONDS), 200); + } + } + + assertEquals("entity requests must not collapse", REQUEST_COUNT, originHits.get()); + System.out.println("entity requests were not collapsed"); + } + + private static void assertOnlyIfCachedDoesNotHitOrigin( + final String uri, + final AtomicInteger originHits) throws Exception { + + originHits.set(0); + + try (final ClientResource clientResource = new ClientResource(true)) { + final SimpleHttpRequest request = SimpleRequestBuilder.get(uri) + .addHeader(HttpHeaders.CACHE_CONTROL, "only-if-cached") + .build(); + + final SimpleHttpResponse response = clientResource.execute(request).get(5, TimeUnit.SECONDS); + + assertEquals("only-if-cached cold miss must not hit origin", 0, originHits.get()); + assertStatus(response, 504); + } + + System.out.println("only-if-cached cold miss did not hit origin"); + } + + private static void send( + final HttpExchange exchange, + final int status, + final String body, + final String cacheControl, + final String vary) throws IOException { + + final byte[] bytes = body.getBytes(StandardCharsets.UTF_8); + + if (cacheControl != null) { + exchange.getResponseHeaders().add(HttpHeaders.CACHE_CONTROL, cacheControl); + } + if (vary != null) { + exchange.getResponseHeaders().add(HttpHeaders.VARY, vary); + } + exchange.getResponseHeaders().add(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8"); + + exchange.sendResponseHeaders(status, bytes.length); + try (final OutputStream outputStream = exchange.getResponseBody()) { + outputStream.write(bytes); + } + } + + private static void sleep(final long millis) { + try { + Thread.sleep(millis); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + private static void assertStatus(final SimpleHttpResponse response, final int expectedCode) { + assertEquals("response code", expectedCode, response.getCode()); + } + + private static void assertEquals(final String message, final int expected, final int actual) { + if (expected != actual) { + throw new IllegalStateException(message + ": expected <" + expected + "> but was <" + actual + ">"); + } + } + + private static void assertTrue(final String message, final boolean condition) { + if (!condition) { + throw new IllegalStateException(message); + } + } + + private static final class ClientResource implements AutoCloseable { + + private final CloseableHttpAsyncClient client; + + ClientResource(final boolean requestCollapsingEnabled) { + this.client = CachingHttpAsyncClientBuilder.create() + .setCacheConfig(CacheConfig.custom() + .setRequestCollapsingEnabled(requestCollapsingEnabled) + .build()) + .build(); + this.client.start(); + } + + Future execute(final SimpleHttpRequest request) { + return this.client.execute( + SimpleRequestProducer.create(request), + SimpleResponseConsumer.create(), + null); + } + + @Override + public void close() { + this.client.close(CloseMode.IMMEDIATE); + } + + } + +} \ No newline at end of file