Skip to content

Commit

Permalink
PROTON-2845 Expand the test peer APIs for handling dispositions
Browse files Browse the repository at this point in the history
Expands the test peer API for scripting sends of and matching of
dispositions.
  • Loading branch information
tabish121 committed Aug 5, 2024
1 parent e9de35d commit e65f05b
Show file tree
Hide file tree
Showing 6 changed files with 415 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ void testStreamSenderMessageCanOperatesWithinTransaction() throws Exception {
}

@Test
public void testAcceptAndRejectInSameTransaction() throws Exception {
public void testAcceptAndReleaseInSameTransaction() throws Exception {
final byte[] txnId = new byte[] { 0, 1, 2, 3 };

try (ProtonTestServer peer = new ProtonTestServer()) {
Expand Down Expand Up @@ -1603,4 +1603,63 @@ public void testAcceptAndRejectInSameTransaction() throws Exception {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
public void testModifiedDispositionInTransaction() throws Exception {
final byte[] txnId = new byte[] { 0, 1, 2, 3 };

try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofReceiver().respond();
peer.expectFlow();
peer.start();

final URI remoteURI = peer.getServerURI();
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));

LOG.info("Test started, peer listening on: {}", remoteURI);

Client container = Client.create();
Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
Session session = connection.openSession();
ReceiverOptions options = new ReceiverOptions().autoAccept(false).autoSettle(false);
Receiver receiver = session.openReceiver("test-queue", options).openFuture().get();

peer.expectCoordinatorAttach().respond();
peer.remoteFlow().withLinkCredit(2).queue();
peer.expectDeclare().accept(txnId);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withDeliveryTag(new byte[] { 1 })
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
peer.expectDisposition().withSettled(true)
.withState()
.transactional()
.withTxnId(txnId)
.withModified(true, true);
peer.expectDischarge().withFail(false).withTxnId(txnId).accept();
peer.expectDetach().respond();
peer.expectClose().respond();

session.beginTransaction();

final Delivery delivery = receiver.receive(100, TimeUnit.MILLISECONDS);

assertNotNull(delivery);
assertFalse(delivery.settled());
assertNull(delivery.state());

delivery.modified(true, true);

session.commitTransaction();
receiver.closeAsync();
connection.closeAsync().get();

peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.qpid.protonj2.test.driver.actions;

import java.util.Map;

import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
import org.apache.qpid.protonj2.test.driver.SessionTracker;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Accepted;
Expand All @@ -31,6 +33,7 @@
import org.apache.qpid.protonj2.test.driver.codec.transport.Disposition;
import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
import org.apache.qpid.protonj2.test.driver.codec.transport.Role;
import org.apache.qpid.protonj2.test.driver.codec.util.TypeMapper;

/**
* AMQP Disposition injection action which can be added to a driver for write at a specific time or
Expand Down Expand Up @@ -169,6 +172,11 @@ public DispositionInjectAction modified(boolean failed, boolean undeliverableHer
return DispositionInjectAction.this;
}

public DispositionInjectAction modified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) {
withState(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
return DispositionInjectAction.this;
}

public TransactionalStateBuilder transactional() {
TransactionalStateBuilder builder = new TransactionalStateBuilder(DispositionInjectAction.this);
withState(builder.getState());
Expand Down Expand Up @@ -236,6 +244,21 @@ public TransactionalStateBuilder withRejected(String condition, String descripti
return this;
}

public TransactionalStateBuilder withRejected(Symbol condition, String description) {
withOutcome(new Rejected().setError(new ErrorCondition(condition, description)));
return this;
}

public TransactionalStateBuilder withRejected(String condition, String description, Map<String, Object> info) {
withOutcome(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description, TypeMapper.toSymbolKeyedMap(info))));
return this;
}

public TransactionalStateBuilder withRejected(Symbol condition, String description, Map<Symbol, Object> info) {
withOutcome(new Rejected().setError(new ErrorCondition(condition, description, info)));
return this;
}

public TransactionalStateBuilder withModified() {
withOutcome(new Modified());
return this;
Expand All @@ -250,5 +273,10 @@ public TransactionalStateBuilder withModified(boolean failed, boolean undelivera
withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
return this;
}

public TransactionalStateBuilder withModified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) {
withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
import org.apache.qpid.protonj2.test.driver.codec.transport.ReceiverSettleMode;
import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer;
import org.apache.qpid.protonj2.test.driver.codec.util.TypeMapper;

/**
* AMQP Close injection action which can be added to a driver for write at a specific time or
Expand Down Expand Up @@ -347,6 +348,10 @@ protected abstract class SectionBuilder {
public TransferInjectAction also() {
return TransferInjectAction.this;
}

public TransferInjectAction and() {
return TransferInjectAction.this;
}
}

public final class HeaderBuilder extends SectionBuilder {
Expand Down Expand Up @@ -582,18 +587,38 @@ public TransferInjectAction rejected(String condition, String description) {
return TransferInjectAction.this;
}

public TransferInjectAction rejected(Symbol condition, String description) {
withState(new Rejected().setError(new ErrorCondition(condition, description)));
return TransferInjectAction.this;
}

public TransferInjectAction rejected(String condition, String description, Map<String, Object> info) {
withState(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description, TypeMapper.toSymbolKeyedMap(info))));
return TransferInjectAction.this;
}

public TransferInjectAction rejected(Symbol condition, String description, Map<Symbol, Object> info) {
withState(new Rejected().setError(new ErrorCondition(condition, description, info)));
return TransferInjectAction.this;
}

public TransferInjectAction modified() {
withState(new Modified());
return TransferInjectAction.this;
}

public TransferInjectAction modified(boolean failed) {
withState(new Modified());
withState(new Modified().setDeliveryFailed(failed));
return TransferInjectAction.this;
}

public TransferInjectAction modified(boolean failed, boolean undeliverableHere) {
withState(new Modified());
withState(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
return TransferInjectAction.this;
}

public TransferInjectAction modified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) {
withState(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
return TransferInjectAction.this;
}

Expand Down Expand Up @@ -664,6 +689,21 @@ public TransactionalStateBuilder withRejected(String condition, String descripti
return this;
}

public TransactionalStateBuilder withRejected(Symbol condition, String description) {
withOutcome(new Rejected().setError(new ErrorCondition(condition, description)));
return this;
}

public TransactionalStateBuilder withRejected(String condition, String description, Map<String, Object> info) {
withOutcome(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description, TypeMapper.toSymbolKeyedMap(info))));
return this;
}

public TransactionalStateBuilder withRejected(Symbol condition, String description, Map<Symbol, Object> info) {
withOutcome(new Rejected().setError(new ErrorCondition(condition, description, info)));
return this;
}

public TransactionalStateBuilder withModified() {
withOutcome(new Modified());
return this;
Expand All @@ -678,6 +718,11 @@ public TransactionalStateBuilder withModified(boolean failed, boolean undelivera
withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
return this;
}

public TransactionalStateBuilder modified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) {
withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
return this;
}
}

public final class MessageBuilder extends SectionBuilder {
Expand Down
Loading

0 comments on commit e65f05b

Please sign in to comment.