Skip to content

Commit

Permalink
[BUG] org.opensearch.client.RestClientSingleHostIntegTests.testReques…
Browse files Browse the repository at this point in the history
…tResetAndAbort is flaky (opensearch-project#9206)

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored and linuxpi committed Aug 14, 2023
1 parent 321dffa commit d468b78
Showing 1 changed file with 64 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.http.message.BasicHeader;
Expand All @@ -73,6 +74,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -298,37 +300,70 @@ public void testRequestResetAndAbort() throws Exception {
httpGet.reset();
assertFalse(httpGet.isAborted());

Future<ClassicHttpResponse> future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null);
httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future);
httpGet.abort();
final Phaser phaser = new Phaser(2);
phaser.register();

try {
future.get();
fail("expected cancellation exception");
} catch (CancellationException e) {
// expected
Future<ClassicHttpResponse> future = client.execute(
getRequestProducer(httpGet, httpHost),
getResponseConsumer(phaser),
null
);
httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future);
httpGet.abort();

try {
phaser.arriveAndDeregister();
future.get();
fail("expected cancellation exception");
} catch (CancellationException e) {
// expected
}
assertTrue(future.isCancelled());
} finally {
// Forcing termination since the AsyncResponseConsumer may not be reached,
// the request is aborted right before
phaser.forceTermination();
}
assertTrue(future.isCancelled());
}
{
httpGet.reset();
Future<ClassicHttpResponse> future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null);
assertFalse(httpGet.isAborted());
httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future);
httpGet.abort();
assertTrue(httpGet.isAborted());
final Phaser phaser = new Phaser(2);
phaser.register();

try {
assertTrue(future.isCancelled());
future.get();
throw new AssertionError("exception should have been thrown");
} catch (CancellationException e) {
// expected
httpGet.reset();
Future<ClassicHttpResponse> future = client.execute(
getRequestProducer(httpGet, httpHost),
getResponseConsumer(phaser),
null
);
assertFalse(httpGet.isAborted());
httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future);
httpGet.abort();
assertTrue(httpGet.isAborted());
try {
phaser.arriveAndDeregister();
assertTrue(future.isCancelled());
future.get();
throw new AssertionError("exception should have been thrown");
} catch (CancellationException e) {
// expected
}
} finally {
// Forcing termination since the AsyncResponseConsumer may not be reached,
// the request is aborted right before
phaser.forceTermination();
}
}
{
httpGet.reset();
assertFalse(httpGet.isAborted());
Future<ClassicHttpResponse> future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null);
final Phaser phaser = new Phaser(0);
Future<ClassicHttpResponse> future = client.execute(
getRequestProducer(httpGet, httpHost),
getResponseConsumer(phaser),
null
);
assertFalse(httpGet.isAborted());
assertEquals(200, future.get().getCode());
assertFalse(future.isCancelled());
Expand Down Expand Up @@ -554,8 +589,15 @@ private Response bodyTest(RestClient restClient, String method, int statusCode,
return esResponse;
}

private AsyncResponseConsumer<ClassicHttpResponse> getResponseConsumer() {
return new HeapBufferedAsyncResponseConsumer(1024);
private AsyncResponseConsumer<ClassicHttpResponse> getResponseConsumer(Phaser phaser) {
phaser.register();
return new HeapBufferedAsyncResponseConsumer(1024) {
@Override
protected ClassicHttpResponse buildResult(HttpResponse response, byte[] entity, ContentType contentType) {
phaser.arriveAndAwaitAdvance();
return super.buildResult(response, entity, contentType);
}
};
}

private HttpUriRequestProducer getRequestProducer(HttpUriRequestBase request, HttpHost host) {
Expand Down

0 comments on commit d468b78

Please sign in to comment.