Skip to content

Commit

Permalink
PROTON-2780 Fix race in test that leads to unexpected performative
Browse files Browse the repository at this point in the history
Fix a test and add one new one to cover a similar scenario, ensure the
test stages frames to avoid race on flow vs dispoistion.
  • Loading branch information
tabish121 committed Dec 8, 2023
1 parent d4cf3ed commit e007fc9
Showing 1 changed file with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1079,17 +1079,18 @@ public void testStreamDeliveryRawInputStreamReadOpensSessionWindowForAdditionalI
// from the incoming delivery and the session window opens which allows the second chunk to
// arrive and again the session window will be opened as that chunk is moved to the reader's
// buffer for return from the read request.
final byte[] combinedPayloads = new byte[payload1.length + payload2.length];

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload2).queue();
.withPayload(payload2).queue().afterDelay(100);
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);

byte[] combinedPayloads = new byte[payload1.length + payload2.length];
rawStream.read(combinedPayloads);

assertTrue(Arrays.equals(payload1, 0, payload1.length, combinedPayloads, 0, payload1.length));
Expand All @@ -1110,6 +1111,79 @@ public void testStreamDeliveryRawInputStreamReadOpensSessionWindowForAdditionalI
}
}

@Test
public void testStreamDeliveryRawInputStreamReadInChunksOpensSessionWindowForAdditionalInput() throws Exception {
final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
final byte[] payload1 = createEncodedMessage(new Data(body1));
final byte[] payload2 = createEncodedMessage(new Data(body2));

try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().withMaxFrameSize(1000).respond();
peer.expectBegin().withIncomingWindow(1).respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(true)
.withMessageFormat(0)
.withPayload(payload1).queue();
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Test started, peer listening on: {}", remoteURI);

Client container = Client.create();
ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000);
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000);
StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions);
StreamDelivery delivery = receiver.receive();
assertNotNull(delivery);
InputStream rawStream = delivery.rawInputStream();
assertNotNull(rawStream);

// An initial frame has arrived but more than that is requested so the first chuck is pulled
// from the incoming delivery and the session window opens which allows the second chunk to
// arrive and again the session window will be opened as that chunk is moved to the reader's
// buffer for return from the read request.
final byte[] chunk1 = new byte[payload1.length];
final byte[] chunk2 = new byte[payload2.length];

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withMessageFormat(0)
.withPayload(payload2).queue().afterDelay(100);
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);

rawStream.read(chunk1);
rawStream.read(chunk2);

assertTrue(Arrays.equals(payload1, 0, payload1.length, chunk1, 0, payload1.length));
assertTrue(Arrays.equals(payload2, 0, payload2.length, chunk2, 0, payload2.length));

rawStream.close();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();

receiver.openFuture().get();
receiver.closeAsync().get();
connection.closeAsync().get();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
public void testStreamDeliveryRawInputStreamBlockedReadBytesAborted() throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
Expand Down

0 comments on commit e007fc9

Please sign in to comment.