diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 565c36047474b..8d2cbd8e74e14 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -189,7 +189,14 @@ public void start() { } }); runnerThread.setUncaughtExceptionHandler( - (t, e) -> LOG.error("[{}] Error while consuming records", t.getName(), e)); + (t, e) -> { + LOG.error("[{}] Error while consuming records", t.getName(), e); + try { + this.close(); + } catch (InterruptedException ex) { + // The interrupted exception is thrown by the runnerThread itself. Ignore it. + } + }); runnerThread.setName("Kafka Source Thread"); runnerThread.start(); } diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 612cf0bc6d2b1..bc06c3e1935b4 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -20,7 +20,10 @@ import com.google.common.collect.ImmutableMap; +import java.util.Collection; import java.util.Collections; +import java.lang.reflect.Field; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.security.auth.SecurityProtocol; @@ -28,6 +31,7 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -153,6 +157,25 @@ public final void loadFromSaslYamlFileTest() throws IOException { assertEquals(config.getSslTruststorePassword(), "cert_pwd"); } + @Test + public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Exception { + KafkaAbstractSource source = new DummySource(); + Consumer consumer = mock(Consumer.class); + Mockito.doThrow(new RuntimeException("Uncaught exception")).when(consumer) + .subscribe(Mockito.any(Collection.class)); + + Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); + consumerField.setAccessible(true); + consumerField.set(source, consumer); + + source.start(); + + Field runningField = KafkaAbstractSource.class.getDeclaredField("running"); + runningField.setAccessible(true); + + Assert.assertFalse((boolean) runningField.get(source)); + } + private File getFile(String name) { ClassLoader classLoader = getClass().getClassLoader(); return new File(classLoader.getResource(name).getFile());