Skip to content

Commit

Permalink
[flink] Fix unstable RemoteLookupJoinITCase.testServiceFileCleaned te…
Browse files Browse the repository at this point in the history
…st method. (apache#3653)
  • Loading branch information
zhuangchong authored Jul 2, 2024
1 parent 5598066 commit 9d8aa9e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.query;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
Expand All @@ -35,6 +36,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
Expand Down Expand Up @@ -109,4 +111,9 @@ public InternalRowSerializer createValueSerializer() {
public void close() throws IOException {
client.shutdown();
}

@VisibleForTesting
public CompletableFuture<Void> cancel() {
return client.shutdownFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.Closeable;
Expand Down Expand Up @@ -95,7 +94,7 @@ public void testQueryServiceLookup() throws Exception {
assertThat(query.lookup(row(), 0, row(5))).isNull();

service.close();
query.close();
query.cancel().get();
}

@Test
Expand Down Expand Up @@ -135,7 +134,6 @@ public void testLookupRemoteTable() throws Throwable {
proxy.close();
}

@Disabled // TODO Fix unstable
@Test
public void testServiceFileCleaned() throws Exception {
sql(
Expand All @@ -153,7 +151,7 @@ public void testServiceFileCleaned() throws Exception {
.isEqualTo(11);

client.cancel().get();
query.close();
query.cancel().get();
ServiceManager serviceManager = paimonTable("DIM").store().newServiceManager();
assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,14 @@ private CompletableFuture<KvResponse> getResponse(

public void shutdown() {
try {
networkClient.shutdown().get(10L, TimeUnit.SECONDS);
shutdownFuture().get(60L, TimeUnit.SECONDS);
LOG.info("{} was shutdown successfully.", networkClient.getClientName());
} catch (Exception e) {
LOG.warn(String.format("%s shutdown failed.", networkClient.getClientName()), e);
}
}

public CompletableFuture<Void> shutdownFuture() {
return networkClient.shutdown();
}
}

0 comments on commit 9d8aa9e

Please sign in to comment.