From 9d8aa9e665222aeda8264783c5007b03f83247f3 Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Tue, 2 Jul 2024 15:26:24 +0800 Subject: [PATCH] [flink] Fix unstable RemoteLookupJoinITCase.testServiceFileCleaned test method. (#3653) --- .../org/apache/paimon/flink/query/RemoteTableQuery.java | 7 +++++++ .../org/apache/paimon/flink/RemoteLookupJoinITCase.java | 6 ++---- .../org/apache/paimon/service/client/KvQueryClient.java | 6 +++++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java index fc07e58f91cc..62edd6bae89c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.query; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; @@ -35,6 +36,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP; @@ -109,4 +111,9 @@ public InternalRowSerializer createValueSerializer() { public void close() throws IOException { client.shutdown(); } + + @VisibleForTesting + public CompletableFuture cancel() { + return client.shutdownFuture(); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java index 13c674662c4c..336c58217100 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java @@ -39,7 +39,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.Closeable; @@ -95,7 +94,7 @@ public void testQueryServiceLookup() throws Exception { assertThat(query.lookup(row(), 0, row(5))).isNull(); service.close(); - query.close(); + query.cancel().get(); } @Test @@ -135,7 +134,6 @@ public void testLookupRemoteTable() throws Throwable { proxy.close(); } - @Disabled // TODO Fix unstable @Test public void testServiceFileCleaned() throws Exception { sql( @@ -153,7 +151,7 @@ public void testServiceFileCleaned() throws Exception { .isEqualTo(11); client.cancel().get(); - query.close(); + query.cancel().get(); ServiceManager serviceManager = paimonTable("DIM").store().newServiceManager(); assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse(); } diff --git a/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java index 60c1ac0f8bd3..a1b950c27d70 100644 --- a/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java +++ b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java @@ -110,10 +110,14 @@ private CompletableFuture getResponse( public void shutdown() { try { - networkClient.shutdown().get(10L, TimeUnit.SECONDS); + shutdownFuture().get(60L, TimeUnit.SECONDS); LOG.info("{} was shutdown successfully.", networkClient.getClientName()); } catch (Exception e) { LOG.warn(String.format("%s shutdown failed.", networkClient.getClientName()), e); } } + + public CompletableFuture shutdownFuture() { + return networkClient.shutdown(); + } }