diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index 59e6088dd2353..b2fde37577426 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -103,6 +103,12 @@
jjwt-jackson
+
+ org.awaitility
+ awaitility
+ test
+
+
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index ba6498ba5b771..e7934891cc340 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -188,7 +188,16 @@ public void start() {
}
});
runnerThread.setUncaughtExceptionHandler(
- (t, e) -> LOG.error("[{}] Error while consuming records", t.getName(), e));
+ (t, e) -> {
+ new Thread(() -> {
+ LOG.error("[{}] Error while consuming records", t.getName(), e);
+ try {
+ this.close();
+ } catch (Exception ex) {
+ LOG.error("[{}] Close kafka source error", t.getName(), e);
+ }
+ }, "Kafka Source Close Task Thread").start();
+ });
runnerThread.setName("Kafka Source Thread");
runnerThread.start();
}
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 4eb30447fbf35..ef185d6af0377 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -21,7 +21,10 @@
import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.Field;
+import java.util.Collection;
import java.util.Collections;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -29,6 +32,8 @@
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -154,6 +159,28 @@ public final void loadFromSaslYamlFileTest() throws IOException {
assertEquals(config.getSslTruststorePassword(), "cert_pwd");
}
+ @Test
+ public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Exception {
+ KafkaAbstractSource source = new DummySource();
+ Consumer consumer = mock(Consumer.class);
+ Mockito.doThrow(new RuntimeException("Uncaught exception")).when(consumer)
+ .subscribe(Mockito.any(Collection.class));
+
+ Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
+ consumerField.setAccessible(true);
+ consumerField.set(source, consumer);
+
+ source.start();
+
+ Field runningField = KafkaAbstractSource.class.getDeclaredField("running");
+ runningField.setAccessible(true);
+
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse((boolean) runningField.get(source));
+ Assert.assertNull(consumerField.get(source));
+ });
+ }
+
private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());