Skip to content

Commit

Permalink
PROTON-2850 Fix some sporadic test failures that appear in CI
Browse files Browse the repository at this point in the history
  • Loading branch information
tabish121 committed Sep 16, 2024
1 parent ef61b41 commit e662d7f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -2458,23 +2459,31 @@ public void testCannotReadFromStreamDeliveredBeforeConnectionDrop() throws Excep
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.dropAfterLastHandler();
peer.expectDisposition().optional();
peer.dropAfterLastHandler(1);
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Test started, peer listening on: {}", remoteURI);

final CountDownLatch disconnected = new CountDownLatch(1);

Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
ConnectionOptions options = new ConnectionOptions();
options.disconnectedHandler((c, e) -> disconnected.countDown());
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
final Receiver receiver = connection.openReceiver("test-queue");
final Delivery delivery = receiver.receive();

peer.waitForScriptToComplete();

assertNotNull(delivery);

// Data already read so it will be already available for read.
assertTrue(disconnected.await(5, TimeUnit.SECONDS));

// Data already read so it will be already available for read but should be unable to
// since the connection has dropped.
assertNotEquals(-1, delivery.rawInputStream().read());

connection.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ public void testOpenStreamSenderAppliesConfiguredSessionOutgoingWindow() throws
}
}

@SuppressWarnings("resource")
@Test
public void testSendCustomMessageWithMultipleAmqpValueSections() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
Expand Down Expand Up @@ -397,7 +396,6 @@ public void testSendCustomMessageWithMultipleAmqpValueSections() throws Exceptio
}
}

@SuppressWarnings("resource")
@Test
public void testClearBodySectionsIsNoOpForStreamSenderMessage() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
Expand Down Expand Up @@ -1478,7 +1476,6 @@ void testCompleteStreamClosureCausesTransferCompleted() throws Exception {
}
}

@SuppressWarnings("resource")
@Test
void testRawOutputStreamFromMessageWritesUnmodifiedBytes() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
Expand Down Expand Up @@ -1677,7 +1674,6 @@ public void testStreamSenderMessageWithDeliveryAnnotations() throws Exception {
}
}

@SuppressWarnings("resource")
@Test
public void testStreamSenderWritesFooterAfterStreamClosed() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
Expand Down Expand Up @@ -1893,7 +1889,6 @@ void testAutoFlushDuringMessageSendThatExceedConfiguredBufferLimitSessionCreditL
});

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
Expand All @@ -1904,6 +1899,9 @@ void testAutoFlushDuringMessageSendThatExceedConfiguredBufferLimitSessionCreditL
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(5).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).accept();

// Initiate message flow by now granting the first credit
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
Expand Down Expand Up @@ -2037,11 +2035,13 @@ void testConcurrentMessageSendsBlocksBehindSendWaitingForCredit() throws Excepti
});

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(2).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();

// Initiate message flow by now granting the first credit
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();

assertTrue(send2Completed.await(10, TimeUnit.SECONDS));

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -2108,7 +2108,6 @@ void testConcurrentMessageSendWaitingOnSplitFramedSendToCompleteIsSentAfterCredi
});

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(2).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
Expand All @@ -2117,6 +2116,9 @@ void testConcurrentMessageSendWaitingOnSplitFramedSendToCompleteIsSentAfterCredi
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(4).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();

// Initiate message flow by now granting the first credits
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();

assertTrue(send2Completed.await(10, TimeUnit.SECONDS));

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
Expand All @@ -2133,7 +2135,6 @@ void testConcurrentMessageSendWaitingOnSplitFramedSendToCompleteIsSentAfterCredi
}
}

@SuppressWarnings("resource")
@Test
void testMessageSendWhileStreamSendIsOpenShouldBlock() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
Expand Down Expand Up @@ -2199,7 +2200,6 @@ void testMessageSendWhileStreamSendIsOpenShouldBlock() throws Exception {
}
}

@SuppressWarnings("resource")
@Test
public void testStreamSenderSessionCannotCreateNewResources() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
Expand Down

0 comments on commit e662d7f

Please sign in to comment.