From 119a8945c75f0351dcc4aadfd5a3866131e25c0d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 1 Jul 2023 02:23:45 +0800 Subject: [PATCH] [fix][io] Close the kafka source connector got stuck (#20698) Motivation: https://github.com/apache/pulsar/discussions/19880#discussioncomment-6026807 When Kafka connector is closing, it waits for the `runnerThread` to stop, but the task-close is running at the same thread, so it will be stuck. Modifications: run `close` in another thread. --- pulsar-io/kafka/pom.xml | 6 ++++++ .../pulsar/io/kafka/KafkaAbstractSource.java | 14 ++++++++------ .../io/kafka/source/KafkaAbstractSourceTest.java | 6 +++++- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml index 2d8fc93ed12ff..d6a6668f191c6 100644 --- a/pulsar-io/kafka/pom.xml +++ b/pulsar-io/kafka/pom.xml @@ -109,6 +109,12 @@ test + + org.awaitility + awaitility + test + + diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 8d2cbd8e74e14..012e4143744e8 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -190,12 +190,14 @@ public void start() { }); runnerThread.setUncaughtExceptionHandler( (t, e) -> { - LOG.error("[{}] Error while consuming records", t.getName(), e); - try { - this.close(); - } catch (InterruptedException ex) { - // The interrupted exception is thrown by the runnerThread itself. Ignore it. - } + new Thread(() -> { + LOG.error("[{}] Error while consuming records", t.getName(), e); + try { + this.close(); + } catch (Exception ex) { + LOG.error("[{}] Close kafka source error", t.getName(), e); + } + }, "Kafka Source Close Task Thread").start(); }); runnerThread.setName("Kafka Source Thread"); runnerThread.start(); diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index bc06c3e1935b4..402727f4ec015 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -31,6 +31,7 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; +import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -173,7 +174,10 @@ public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Excep Field runningField = KafkaAbstractSource.class.getDeclaredField("running"); runningField.setAccessible(true); - Assert.assertFalse((boolean) runningField.get(source)); + Awaitility.await().untilAsserted(() -> { + Assert.assertFalse((boolean) runningField.get(source)); + Assert.assertNull(consumerField.get(source)); + }); } private File getFile(String name) {