Skip to content

Commit

Permalink
fix: fix tests (#112)
Browse files Browse the repository at this point in the history
There were two issues preventing the tests from running:

1) The tests use the 'latest' image tag for MQ - and watch for a
particular message code in the log to recognise when the queue
manager is ready. Changes in a recent version of MQ meant that
the log output has changed, so the tests were waiting forever for
a log message that is no longer output.
I've updated the message code to match the current MQ behaviour.

2) Fix for a regression that caused a NullPointerException when
the MQSourceTaskIT test cleanup method ran - by renaming the
locally-scoped SourceTask variable to use the class-scoped
variable that the cleanup depends on.

Signed-off-by: Dale Lane <[email protected]>
  • Loading branch information
dalelane authored May 5, 2023
1 parent c06d421 commit 6dd0d47
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public String getConnectionName() {
private void waitForQueueManagerStartup() throws TimeoutException {
final WaitingConsumer logConsumer = new WaitingConsumer();
mqContainer.followOutput(logConsumer);
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I"));
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5806I"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void verifyJmsConnClosed() throws Exception {
private void waitForQueueManagerStartup() throws TimeoutException {
final WaitingConsumer logConsumer = new WaitingConsumer();
mqContainer.followOutput(logConsumer);
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I"));
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5806I"));
}

private void putAllMessagesToQueue(final List<MQMessage> messages) throws MQException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,15 @@ public void verifyMessageBatchGroupCommits() throws Exception {

@Test
public void verifyMessageBatchRollback() throws Exception {
final MQSourceTask newConnectTask = new MQSourceTask();
connectTask = new MQSourceTask();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
connectorConfigProps.put("mq.batch.size", "10");

newConnectTask.start(connectorConfigProps);
connectTask.start(connectorConfigProps);

// Test overview:
//
Expand All @@ -289,19 +289,19 @@ public void verifyMessageBatchRollback() throws Exception {
final List<SourceRecord> kafkaMessages;

// first batch should successfully retrieve messages 01-10
kafkaMessages = newConnectTask.poll();
kafkaMessages = connectTask.poll();
assertEquals(10, kafkaMessages.size());
newConnectTask.commit();
newConnectTask.commit();
connectTask.commit();
connectTask.commit();

// second batch (11-20) should fail because of message 16
final ConnectException exc = assertThrows(ConnectException.class, () -> {
newConnectTask.poll();
connectTask.poll();
});
assertTrue(exc.getMessage().equals("Unsupported JMS message type"));

// there should be 20 messages left on the MQ queue (messages 11-30)
newConnectTask.stop();
connectTask.stop();
final List<Message> remainingMQMessages = getAllMessagesFromQueue(MQ_QUEUE);
assertEquals(20, remainingMQMessages.size());
}
Expand Down

0 comments on commit 6dd0d47

Please sign in to comment.