diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 80c732f46e..30457e6689 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -94,7 +94,7 @@ public abstract class AbstractMessageListenerContainer protected final ReentrantLock lifecycleLock = new ReentrantLock(); // NOSONAR - protected final AtomicBoolean enforceRebalanceRequested = new AtomicBoolean(); + private final AtomicBoolean enforceRebalanceRequested = new AtomicBoolean(); private final Set pauseRequestedPartitions = ConcurrentHashMap.newKeySet(); @@ -193,6 +193,14 @@ protected AbstractMessageListenerContainer(ConsumerFactory } } + /** + * To be used only with {@link ConcurrentMessageListenerContainerRef}. + */ + AbstractMessageListenerContainer() { + this.containerProperties = null; + this.consumerFactory = null; + } + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; @@ -278,15 +286,27 @@ public boolean isRunning() { return this.running; } - protected void setFenced(boolean fenced) { + void setFenced(boolean fenced) { this.fenced = fenced; } + protected boolean isFenced() { + return this.fenced; + } + @Deprecated(since = "3.2", forRemoval = true) protected boolean isPaused() { return this.paused; } + protected boolean isEnforceRebalanceRequested() { + return this.enforceRebalanceRequested.get(); + } + + protected void setEnforceRebalanceRequested(boolean enforceRebalance) { + this.enforceRebalanceRequested.set(enforceRebalance); + } + @Override public boolean isPartitionPauseRequested(TopicPartition topicPartition) { return this.pauseRequestedPartitions.contains(topicPartition); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 06b136f434..b4d2098acc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -305,13 +305,17 @@ private KafkaMessageListenerContainer constructContainer(ContainerProperti @Nullable TopicPartitionOffset[] topicPartitions, int i) { KafkaMessageListenerContainer container; + ConcurrentMessageListenerContainerRef concurrentMessageListenerContainerRef = + new ConcurrentMessageListenerContainerRef<>(this, this.lifecycleLock); if (topicPartitions == null) { - container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties); // NOSONAR + container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this, + this.consumerFactory, containerProperties); // NOSONAR } else { - container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, // NOSONAR - containerProperties, partitionSubset(containerProperties, i)); + container = new KafkaMessageListenerContainer<>(concurrentMessageListenerContainerRef, this, + this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); // NOSONAR } + concurrentMessageListenerContainerRef.setKafkaMessageListenerContainer(container); return container; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerRef.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerRef.java new file mode 100644 index 0000000000..7bace96a64 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerRef.java @@ -0,0 +1,413 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.event.ConsumerStoppedEvent; +import org.springframework.lang.Nullable; + +/** + * Reference of {@link ConcurrentMessageListenerContainer} to be passed to the {@link KafkaMessageListenerContainer}. + * This container is used for internal purpose. Detects if the {@link KafkaMessageListenerContainer} is fenced and + * forbids `stop` calls on {@link ConcurrentMessageListenerContainer} + * + * @param the key type. + * @param the value type. + * @author Lokesh Alamuri + */ +class ConcurrentMessageListenerContainerRef extends AbstractMessageListenerContainer { + + protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); // NOSONAR + + private final ConcurrentMessageListenerContainer concurrentMessageListenerContainer; + + private final ReentrantLock lifecycleLock; + + private KafkaMessageListenerContainer kafkaMessageListenerContainer; + + ConcurrentMessageListenerContainerRef(ConcurrentMessageListenerContainer concurrentMessageListenerContainer, + ReentrantLock lifecycleLock) { + super(); + this.concurrentMessageListenerContainer = concurrentMessageListenerContainer; + this.lifecycleLock = lifecycleLock; + } + + void setKafkaMessageListenerContainer(KafkaMessageListenerContainer kafkaMessageListenerContainer) { + this.kafkaMessageListenerContainer = kafkaMessageListenerContainer; + } + + @Override + public void setupMessageListener(Object messageListener) { + throw new UnsupportedOperationException("This container doesn't support setting up MessageListener"); + } + + @Override + public Map> metrics() { + return this.concurrentMessageListenerContainer.metrics(); + } + + @Override + public ContainerProperties getContainerProperties() { + return this.concurrentMessageListenerContainer.getContainerProperties(); + } + + @Override + public Collection getAssignedPartitions() { + return this.concurrentMessageListenerContainer.getAssignedPartitions(); + } + + @Override + public Map> getAssignmentsByClientId() { + return this.concurrentMessageListenerContainer.getAssignmentsByClientId(); + } + + @Override + public void enforceRebalance() { + this.concurrentMessageListenerContainer.enforceRebalance(); + } + + @Override + public void pause() { + this.concurrentMessageListenerContainer.pause(); + } + + @Override + public void resume() { + this.concurrentMessageListenerContainer.resume(); + } + + @Override + public void pausePartition(TopicPartition topicPartition) { + this.concurrentMessageListenerContainer.pausePartition(topicPartition); + } + + @Override + public void resumePartition(TopicPartition topicPartition) { + this.concurrentMessageListenerContainer.resumePartition(topicPartition); + } + + @Override + public boolean isPartitionPauseRequested(TopicPartition topicPartition) { + return this.concurrentMessageListenerContainer.isPartitionPauseRequested(topicPartition); + } + + @Override + public boolean isPartitionPaused(TopicPartition topicPartition) { + return this.concurrentMessageListenerContainer.isPartitionPaused(topicPartition); + } + + @Override + public boolean isPauseRequested() { + return this.concurrentMessageListenerContainer.isPauseRequested(); + } + + @Override + public boolean isContainerPaused() { + return this.concurrentMessageListenerContainer.isContainerPaused(); + } + + @Override + public String getGroupId() { + return this.concurrentMessageListenerContainer.getGroupId(); + } + + @Override + public String getListenerId() { + return this.concurrentMessageListenerContainer.getListenerId(); + } + + @Override + public String getMainListenerId() { + return this.concurrentMessageListenerContainer.getMainListenerId(); + } + + @Override + public byte[] getListenerInfo() { + return this.concurrentMessageListenerContainer.getListenerInfo(); + } + + @Override + public boolean isChildRunning() { + return this.concurrentMessageListenerContainer.isChildRunning(); + } + + @Override + public boolean isInExpectedState() { + return this.concurrentMessageListenerContainer.isInExpectedState(); + } + + @Override + public void stopAbnormally(Runnable callback) { + this.lifecycleLock.lock(); + try { + if (!this.kafkaMessageListenerContainer.isFenced()) { + // kafkaMessageListenerContainer is not fenced. Allow stopAbnormally call on + // concurrentMessageListenerContainer + this.concurrentMessageListenerContainer.stopAbnormally(callback); + } + else if (this.concurrentMessageListenerContainer.isFenced() && + !this.concurrentMessageListenerContainer.isRunning()) { + // kafkaMessageListenerContainer is fenced and concurrentMessageListenerContainer is not running. Allow + // callback to run + callback.run(); + } + else { + this.logger.error(() -> String.format("Suppressed `stopAbnormal` operation called by " + + "MessageListenerContainer [" + this.kafkaMessageListenerContainer.getBeanName() + "]")); + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + @Override + protected void doStop(Runnable callback, boolean normal) { + this.lifecycleLock.lock(); + try { + if (!this.kafkaMessageListenerContainer.isFenced()) { + // kafkaMessageListenerContainer is not fenced. Allow doStop call on + // concurrentMessageListenerContainer + this.concurrentMessageListenerContainer.doStop(callback, normal); + } + else if (this.concurrentMessageListenerContainer.isFenced() && + !this.concurrentMessageListenerContainer.isRunning()) { + // kafkaMessageListenerContainer is fenced and concurrentMessageListenerContainer is not running. Allow + // callback to run + callback.run(); + } + else { + this.logger.error(() -> String.format("Suppressed `doStop` operation called by " + + "MessageListenerContainer [" + this.kafkaMessageListenerContainer.getBeanName() + "]")); + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + @Override + public MessageListenerContainer getContainerFor(String topic, int partition) { + return this.concurrentMessageListenerContainer.getContainerFor(topic, partition); + } + + @Override + public void childStopped(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) { + this.concurrentMessageListenerContainer.childStopped(child, reason); + } + + @Override + public void childStarted(MessageListenerContainer child) { + this.concurrentMessageListenerContainer.childStarted(child); + } + + @Override + protected void doStart() { + this.concurrentMessageListenerContainer.doStart(); + } + + @Override + public boolean isRunning() { + return this.concurrentMessageListenerContainer.isRunning(); + } + + @Override + public boolean isAutoStartup() { + return this.concurrentMessageListenerContainer.isAutoStartup(); + } + + @Override + public void setAutoStartup(boolean autoStartup) { + throw new UnsupportedOperationException("This container doesn't support `setAutoStartup`"); + } + + @Override + public void stop(Runnable callback) { + this.lifecycleLock.lock(); + try { + if (!this.kafkaMessageListenerContainer.isFenced()) { + // kafkaMessageListenerContainer is not fenced. Allow stop call on + // concurrentMessageListenerContainer + this.concurrentMessageListenerContainer.stop(callback); + } + else if (this.concurrentMessageListenerContainer.isFenced() && + !this.concurrentMessageListenerContainer.isRunning()) { + // kafkaMessageListenerContainer is fenced and concurrentMessageListenerContainer is not running. Allow + // callback to run + callback.run(); + } + else { + this.logger.error(() -> String.format("Suppressed `stop` operation called by " + + "MessageListenerContainer [" + this.kafkaMessageListenerContainer.getBeanName() + "]")); + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + @Nullable + protected ApplicationContext getApplicationContext() { + return this.concurrentMessageListenerContainer.getApplicationContext(); + } + + /** + * Get the event publisher. + * @return the publisher + */ + @Nullable + public ApplicationEventPublisher getApplicationEventPublisher() { + return this.concurrentMessageListenerContainer.getApplicationEventPublisher(); + } + + /** + * Get the {@link CommonErrorHandler}. + * @return the handler. + * @since 2.8 + */ + @Nullable + public CommonErrorHandler getCommonErrorHandler() { + return this.concurrentMessageListenerContainer.getCommonErrorHandler(); + } + + protected boolean isStoppedNormally() { + return this.concurrentMessageListenerContainer.isStoppedNormally(); + } + + protected void setStoppedNormally(boolean stoppedNormally) { + this.concurrentMessageListenerContainer.setStoppedNormally(stoppedNormally); + } + + protected void setRunning(boolean running) { + this.concurrentMessageListenerContainer.setRunning(running); + } + + protected boolean isEnforceRebalanceRequested() { + return this.concurrentMessageListenerContainer.isEnforceRebalanceRequested(); + } + + protected void setEnforceRebalanceRequested(boolean enforceRebalance) { + this.concurrentMessageListenerContainer.setEnforceRebalanceRequested(enforceRebalance); + } + + /** + * Return the currently configured {@link AfterRollbackProcessor}. + * @return the after rollback processor. + * @since 2.2.14 + */ + public AfterRollbackProcessor getAfterRollbackProcessor() { + return this.concurrentMessageListenerContainer.getAfterRollbackProcessor(); + } + + public boolean isChangeConsumerThreadName() { + return this.concurrentMessageListenerContainer.isChangeConsumerThreadName(); + } + + /** + * Set to true to instruct the container to change the consumer thread name during + * initialization. + * @param changeConsumerThreadName true to change. + * @since 3.0.1 + * @see #setThreadNameSupplier(Function) + */ + public void setChangeConsumerThreadName(boolean changeConsumerThreadName) { + this.concurrentMessageListenerContainer.setChangeConsumerThreadName(changeConsumerThreadName); + } + + /** + * Return the {@link KafkaAdmin}, used to find the cluster id for observation, if + * present. + * @return the kafkaAdmin + * @since 3.0.5 + */ + @Nullable + public KafkaAdmin getKafkaAdmin() { + return this.concurrentMessageListenerContainer.getKafkaAdmin(); + } + + public void setKafkaAdmin(KafkaAdmin kafkaAdmin) { + this.concurrentMessageListenerContainer.setKafkaAdmin(kafkaAdmin); + } + + protected RecordInterceptor getRecordInterceptor() { + return this.concurrentMessageListenerContainer.getRecordInterceptor(); + } + + /** + * Set an interceptor to be called before calling the record listener. + * Does not apply to batch listeners. + * @param recordInterceptor the interceptor. + * @since 2.2.7 + * @see #setInterceptBeforeTx(boolean) + */ + public void setRecordInterceptor(RecordInterceptor recordInterceptor) { + this.concurrentMessageListenerContainer.setRecordInterceptor(recordInterceptor); + } + + protected BatchInterceptor getBatchInterceptor() { + return this.concurrentMessageListenerContainer.getBatchInterceptor(); + } + + /** + * Set an interceptor to be called before calling the record listener. + * @param batchInterceptor the interceptor. + * @since 2.6.6 + * @see #setInterceptBeforeTx(boolean) + */ + public void setBatchInterceptor(BatchInterceptor batchInterceptor) { + this.concurrentMessageListenerContainer.setBatchInterceptor(batchInterceptor); + } + + protected boolean isInterceptBeforeTx() { + return this.concurrentMessageListenerContainer.isInterceptBeforeTx(); + } + + /** + * When false, invoke the interceptor after the transaction starts. + * @param interceptBeforeTx false to intercept within the transaction. + * Default true since 2.8. + * @since 2.3.4 + * @see #setRecordInterceptor(RecordInterceptor) + * @see #setBatchInterceptor(BatchInterceptor) + */ + public void setInterceptBeforeTx(boolean interceptBeforeTx) { + this.concurrentMessageListenerContainer.setInterceptBeforeTx(interceptBeforeTx); + } + + /** + * Return this or a parent container if this has a parent. + * @return the parent or this. + * @since 2.2.1 + */ + protected AbstractMessageListenerContainer parentOrThis() { + return this; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bbc8ae8ccb..0a50992e70 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -181,6 +181,8 @@ public class KafkaMessageListenerContainer // NOSONAR line count private final AbstractMessageListenerContainer thisOrParentContainer; + private AbstractMessageListenerContainer thisOrParentContainerRef; + private final TopicPartitionOffset[] topicPartitions; private String clientIdSuffix; @@ -205,6 +207,20 @@ public KafkaMessageListenerContainer(ConsumerFactory consu this(null, consumerFactory, containerProperties, (TopicPartitionOffset[]) null); } + /** + * Construct an instance with the supplied configuration properties. + * @param containerRef a delegating container referece (if this is a sub-container). + * @param container a delegating container (if this is a sub-container). + * @param consumerFactory the consumer factory. + * @param containerProperties the container properties. + */ + KafkaMessageListenerContainer(AbstractMessageListenerContainer containerRef, + AbstractMessageListenerContainer container, ConsumerFactory consumerFactory, + ContainerProperties containerProperties) { + this(container, consumerFactory, containerProperties); + this.thisOrParentContainerRef = containerRef == null ? this.thisOrParentContainer : containerRef; + } + /** * Construct an instance with the supplied configuration properties. * @param container a delegating container (if this is a sub-container). @@ -218,6 +234,23 @@ public KafkaMessageListenerContainer(ConsumerFactory consu this(container, consumerFactory, containerProperties, (TopicPartitionOffset[]) null); } + /** + * Construct an instance with the supplied configuration properties and specific + * topics/partitions/initialOffsets. + * @param containerRef a delegating container referece (if this is a sub-container). + * @param container a delegating container (if this is a sub-container). + * @param consumerFactory the consumer factory. + * @param containerProperties the container properties. + * @param topicPartitions the topics/partitions; duplicates are eliminated. + */ + KafkaMessageListenerContainer(@Nullable AbstractMessageListenerContainer containerRef, + @Nullable AbstractMessageListenerContainer container, + ConsumerFactory consumerFactory, ContainerProperties containerProperties, + @Nullable TopicPartitionOffset... topicPartitions) { + this(container, consumerFactory, containerProperties, topicPartitions); + this.thisOrParentContainerRef = containerRef == null ? this.thisOrParentContainer : containerRef; + } + /** * Construct an instance with the supplied configuration properties and specific * topics/partitions/initialOffsets. @@ -310,7 +343,7 @@ public boolean isInExpectedState() { @Override public void enforceRebalance() { - this.thisOrParentContainer.enforceRebalanceRequested.set(true); + this.thisOrParentContainer.setEnforceRebalanceRequested(true); consumerWakeIfNecessary(); } @@ -1449,7 +1482,7 @@ private void doProcessCommits() { records.add(kvConsumerRecord); } this.commonErrorHandler.handleRemaining(cfe, records, this.consumer, - KafkaMessageListenerContainer.this.thisOrParentContainer); + KafkaMessageListenerContainer.this.thisOrParentContainerRef); } } } @@ -1711,7 +1744,7 @@ private void sleepFor(Duration duration) { private void enforceRebalanceIfNecessary() { try { - if (KafkaMessageListenerContainer.this.thisOrParentContainer.enforceRebalanceRequested.get()) { + if (KafkaMessageListenerContainer.this.thisOrParentContainer.isEnforceRebalanceRequested()) { String enforcedRebalanceReason = String.format("Enforced rebalance requested for container: %s", KafkaMessageListenerContainer.this.getListenerId()); this.logger.info(enforcedRebalanceReason); @@ -1719,7 +1752,7 @@ private void enforceRebalanceIfNecessary() { } } finally { - KafkaMessageListenerContainer.this.thisOrParentContainer.enforceRebalanceRequested.set(false); + KafkaMessageListenerContainer.this.thisOrParentContainer.setEnforceRebalanceRequested(false); } } @@ -1926,7 +1959,7 @@ protected void handleConsumerException(Exception e) { try { if (this.commonErrorHandler != null) { this.commonErrorHandler.handleOtherException(e, this.consumer, - KafkaMessageListenerContainer.this.thisOrParentContainer, this.isBatchListener); + KafkaMessageListenerContainer.this.thisOrParentContainerRef, this.isBatchListener); } else { this.logger.error(e, "Consumer exception"); @@ -2415,12 +2448,12 @@ private void invokeBatchErrorHandler(final ConsumerRecords records, || rte instanceof CommitFailedException) { this.commonErrorHandler.handleBatch(rte, records, this.consumer, - KafkaMessageListenerContainer.this.thisOrParentContainer, + KafkaMessageListenerContainer.this.thisOrParentContainerRef, () -> invokeBatchOnMessageWithRecordsOrList(records, list)); } else { ConsumerRecords afterHandling = this.commonErrorHandler.handleBatchAndReturnRemaining(rte, - records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, + records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainerRef, () -> invokeBatchOnMessageWithRecordsOrList(records, list)); if (afterHandling != null && !afterHandling.isEmpty()) { this.remainingRecords = afterHandling; @@ -2845,13 +2878,13 @@ private void invokeErrorHandler(final ConsumerRecord cRecord, records.add(iterator.next()); } this.commonErrorHandler.handleRemaining(rte, records, this.consumer, - KafkaMessageListenerContainer.this.thisOrParentContainer); + KafkaMessageListenerContainer.this.thisOrParentContainerRef); } else { boolean handled = false; try { handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer, - KafkaMessageListenerContainer.this.thisOrParentContainer); + KafkaMessageListenerContainer.this.thisOrParentContainerRef); } catch (Exception ex) { this.logger.error(ex, "ErrorHandler threw unexpected exception"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index 0a59a00b8b..1fcf136372 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -172,7 +172,6 @@ public void handleOtherException(Exception thrownException, Consumer consu }); container.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(errorContainer.get()).isSameAs(container); container.stop(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index b699f68098..7b956c718d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -49,6 +49,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.invocation.InvocationOnMock; @@ -933,7 +934,9 @@ protected Consumer createKafkaConsumer(String groupId, String c firstLatch.countDown(); - assertThat(listenerThreadNames).containsAnyOf("testAuto-0", "testAuto-1"); + Condition listenerThreadNameCondition = new Condition<>(s -> s.contains("testAuto"), + "Listener thread name has to be prefixed with testAuto"); + assertThat(listenerThreadNames).have(listenerThreadNameCondition); assertThat(concurrentContainerStopLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(container.isInExpectedState()).isTrue(); @@ -1095,4 +1098,297 @@ protected Consumer createKafkaConsumer(String groupId, String c this.logger.info("Stop containerStartStop"); } + @Test + public void testFencedContainerFailed() throws Exception { + this.logger.info("Start testFencedContainerFailed"); + Map props = KafkaTestUtils.consumerProps("test1", "true", + embeddedKafka); + AtomicReference overrides = new AtomicReference<>(); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { + @Override + protected Consumer createKafkaConsumer(String groupId, String clientIdPrefix, + String clientIdSuffixArg, Properties properties) { + overrides.set(properties); + return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties); + } + }; + ContainerProperties containerProps = new ContainerProperties(topic1); + containerProps.setLogContainerConfig(true); + containerProps.setClientId("client"); + containerProps.setAckMode(ContainerProperties.AckMode.RECORD); + + final CountDownLatch secondRunLatch = new CountDownLatch(5); + final Set listenerThreadNames = new ConcurrentSkipListSet<>(); + final List payloads = new ArrayList<>(); + final CountDownLatch processingLatch = new CountDownLatch(1); + final CountDownLatch firstLatch = new CountDownLatch(1); + + AtomicBoolean first = new AtomicBoolean(true); + + containerProps.setMessageListener((MessageListener) message -> { + if (first.getAndSet(false)) { + try { + firstLatch.await(100, TimeUnit.SECONDS); + throw new NullPointerException(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message); + listenerThreadNames.add(Thread.currentThread().getName()); + payloads.add(message.value()); + secondRunLatch.countDown(); + processingLatch.countDown(); + }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + container.setConcurrency(2); + container.setBeanName("testAuto"); + container.setChangeConsumerThreadName(true); + container.setCommonErrorHandler(new CommonContainerStoppingErrorHandler()); + + BlockingQueue events = new LinkedBlockingQueue<>(); + CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1); + CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1); + + container.setApplicationEventPublisher(e -> { + events.add((KafkaEvent) e); + if (e instanceof ConcurrentContainerStoppedEvent) { + concurrentContainerStopLatch.countDown(); + } + if (e instanceof ConsumerStoppedEvent) { + consumerStoppedEventLatch.countDown(); + } + }); + + container.start(); + + MessageListenerContainer childContainer0 = container.getContainers().get(0); + MessageListenerContainer childContainer1 = container.getContainers().get(1); + + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + assertThat(container.getAssignedPartitions()).hasSize(2); + Map> assignments = container.getAssignmentsByClientId(); + assertThat(assignments).hasSize(2); + assertThat(assignments.get("client-0")).isNotNull(); + assertThat(assignments.get("client-1")).isNotNull(); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic1); + template.sendDefault(0, 0, "foo"); + template.sendDefault(1, 2, "bar"); + template.sendDefault(0, 0, "baz"); + template.sendDefault(1, 2, "qux"); + template.flush(); + + assertThat(container.metrics()).isNotNull(); + assertThat(container.isInExpectedState()).isTrue(); + assertThat(childContainer0.isRunning()).isTrue(); + assertThat(childContainer1.isRunning()).isTrue(); + assertThat(container.isChildRunning()).isTrue(); + + assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue(); + + container.stop(); + + assertThat(container.isChildRunning()).isTrue(); + assertThat(container.isRunning()).isFalse(); + assertThat(childContainer0.isRunning()).isFalse(); + assertThat(childContainer1.isRunning()).isFalse(); + + assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue(); + + assertThat(container.isChildRunning()).isTrue(); + + Condition listenerThreadNameCondition = new Condition<>(s -> s.contains("testAuto"), + "Listener thread name has to be prefixed with testAuto"); + assertThat(listenerThreadNames).have(listenerThreadNameCondition); + + assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse(); + + + template.sendDefault(0, 0, "FOO"); + template.sendDefault(1, 2, "BAR"); + template.sendDefault(0, 0, "BAZ"); + template.sendDefault(1, 2, "QUX"); + template.flush(); + + // permitted since stop is called prior. + container.start(); + + assertThat(container.isRunning()).isTrue(); + assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning())) + .isTrue(); + + firstLatch.countDown(); + + assertThat(container.isRunning()).isTrue(); + assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning())) + .isTrue(); + assertThat(secondRunLatch.await(30, TimeUnit.SECONDS)).isTrue(); + container.stop(); + assertThat(container.isRunning()).isFalse(); + this.logger.info("Stop testFencedContainerFailed"); + } + + @Test + public void testFencedContainerFailedWithCustomErrorHandler() throws Exception { + this.logger.info("Start testFencedContainerFailed"); + Map props = KafkaTestUtils.consumerProps("test1", "true", + embeddedKafka); + AtomicReference overrides = new AtomicReference<>(); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { + @Override + protected Consumer createKafkaConsumer(String groupId, String clientIdPrefix, + String clientIdSuffixArg, Properties properties) { + overrides.set(properties); + return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties); + } + }; + ContainerProperties containerProps = new ContainerProperties(topic1); + containerProps.setLogContainerConfig(true); + containerProps.setClientId("client"); + containerProps.setAckMode(ContainerProperties.AckMode.RECORD); + + final CountDownLatch secondRunLatch = new CountDownLatch(5); + final Set listenerThreadNames = new ConcurrentSkipListSet<>(); + final List payloads = new ArrayList<>(); + final CountDownLatch processingLatch = new CountDownLatch(1); + final CountDownLatch firstLatch = new CountDownLatch(1); + + AtomicBoolean first = new AtomicBoolean(true); + + containerProps.setMessageListener((MessageListener) message -> { + if (first.getAndSet(false)) { + try { + firstLatch.await(100, TimeUnit.SECONDS); + throw new NullPointerException(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message); + listenerThreadNames.add(Thread.currentThread().getName()); + payloads.add(message.value()); + secondRunLatch.countDown(); + processingLatch.countDown(); + }); + + ConcurrentMessageListenerContainer container = + new ConcurrentMessageListenerContainer<>(cf, containerProps); + container.setConcurrency(2); + container.setBeanName("testAuto"); + container.setChangeConsumerThreadName(true); + + final CountDownLatch erroHandlerLatch = new CountDownLatch(1); + final CountDownLatch erroHandlerLatchInCallBack = new CountDownLatch(1); + + container.setCommonErrorHandler(new CommonErrorHandler() { + @Override + public boolean seeksAfterHandling() { + return true; + } + + @Override + public void handleRemaining(Exception thrownException, List> records, + Consumer consumer, MessageListenerContainer container) { + container.stop(() -> erroHandlerLatchInCallBack.countDown()); + erroHandlerLatch.countDown(); + } + }); + + BlockingQueue events = new LinkedBlockingQueue<>(); + CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1); + CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1); + + container.setApplicationEventPublisher(e -> { + events.add((KafkaEvent) e); + if (e instanceof ConcurrentContainerStoppedEvent) { + concurrentContainerStopLatch.countDown(); + } + if (e instanceof ConsumerStoppedEvent) { + consumerStoppedEventLatch.countDown(); + } + }); + + container.start(); + + MessageListenerContainer childContainer0 = container.getContainers().get(0); + MessageListenerContainer childContainer1 = container.getContainers().get(1); + + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + assertThat(container.getAssignedPartitions()).hasSize(2); + Map> assignments = container.getAssignmentsByClientId(); + assertThat(assignments).hasSize(2); + assertThat(assignments.get("client-0")).isNotNull(); + assertThat(assignments.get("client-1")).isNotNull(); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic1); + template.sendDefault(0, 0, "foo"); + template.sendDefault(1, 2, "bar"); + template.sendDefault(0, 0, "baz"); + template.sendDefault(1, 2, "qux"); + template.flush(); + + assertThat(container.metrics()).isNotNull(); + assertThat(container.isInExpectedState()).isTrue(); + assertThat(childContainer0.isRunning()).isTrue(); + assertThat(childContainer1.isRunning()).isTrue(); + assertThat(container.isChildRunning()).isTrue(); + + assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue(); + + container.stop(); + + assertThat(container.isChildRunning()).isTrue(); + assertThat(container.isRunning()).isFalse(); + assertThat(childContainer0.isRunning()).isFalse(); + assertThat(childContainer1.isRunning()).isFalse(); + + assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue(); + + assertThat(container.isChildRunning()).isTrue(); + + Condition listenerThreadNameCondition = new Condition<>(s -> s.contains("testAuto"), + "Listener thread name has to be prefixed with testAuto"); + assertThat(listenerThreadNames).have(listenerThreadNameCondition); + + + assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse(); + + template.sendDefault(0, 0, "FOO"); + template.sendDefault(1, 2, "BAR"); + template.sendDefault(0, 0, "BAZ"); + template.sendDefault(1, 2, "QUX"); + template.flush(); + + // permitted since stop is called prior. + container.start(); + + assertThat(container.isRunning()).isTrue(); + assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning())) + .isTrue(); + + firstLatch.countDown(); + + assertThat(erroHandlerLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(erroHandlerLatchInCallBack.await(30, TimeUnit.SECONDS)).isFalse(); + + assertThat(container.isRunning()).isTrue(); + assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning())) + .isTrue(); + assertThat(secondRunLatch.await(30, TimeUnit.SECONDS)).isTrue(); + container.stop(); + assertThat(container.isRunning()).isFalse(); + this.logger.info("Stop testFencedContainerFailed"); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerEnforceRebalanceTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerEnforceRebalanceTests.java index 8c6215ec7a..2315f1193f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerEnforceRebalanceTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerEnforceRebalanceTests.java @@ -64,11 +64,11 @@ void enforceRebalance(@Autowired Config config, @Autowired KafkaTemplate) listenerContainer).enforceRebalanceRequested).isTrue(); + assertThat(((ConcurrentMessageListenerContainer) listenerContainer).isEnforceRebalanceRequested()).isTrue(); // The test is expecting partition revoke once and assign twice. assertThat(config.partitionRevokedLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(config.partitionAssignedLatch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(((ConcurrentMessageListenerContainer) listenerContainer).enforceRebalanceRequested).isFalse(); + assertThat(((ConcurrentMessageListenerContainer) listenerContainer).isEnforceRebalanceRequested()).isFalse(); listenerContainer.pause(); await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(listenerContainer.isPauseRequested()).isTrue()); await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(listenerContainer.isContainerPaused()).isTrue());