Skip to content

Commit

Permalink
[flink][lookup] Use directly executor and shutdown executor in close
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs committed May 22, 2024
1 parent f243c72 commit e833a96
Showing 1 changed file with 30 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.paimon.utils.TypeUtils;
import org.apache.paimon.utils.UserDefinedSeqComparator;

import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -123,7 +125,7 @@ public FullCacheLookupTable(Context context) {
String.format(
"%s-lookup-refresh",
Thread.currentThread().getName())))
: null;
: MoreExecutors.newDirectExecutorService();
this.cachedException = new AtomicReference<>();
this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT);
}
Expand Down Expand Up @@ -196,33 +198,29 @@ public void refresh() throws Exception {
}
doRefresh();
} else {
if (refreshAsync) {
Future<?> currentFuture = null;
try {
currentFuture =
refreshExecutor.submit(
() -> {
try {
doRefresh();
} catch (Exception e) {
LOG.error(
"Refresh lookup table {} failed",
context.table.name(),
e);
cachedException.set(e);
}
});
} catch (RejectedExecutionException ignored) {
LOG.warn(
"Add refresh task for lookup table {} failed",
context.table.name(),
ignored);
}
if (currentFuture != null) {
refreshFuture = currentFuture;
}
} else {
doRefresh();
Future<?> currentFuture = null;
try {
currentFuture =
refreshExecutor.submit(
() -> {
try {
doRefresh();
} catch (Exception e) {
LOG.error(
"Refresh lookup table {} failed",
context.table.name(),
e);
cachedException.set(e);
}
});
} catch (RejectedExecutionException ignored) {
LOG.warn(
"Add refresh task for lookup table {} failed",
context.table.name(),
ignored);
}
if (currentFuture != null) {
refreshFuture = currentFuture;
}
}
}
Expand Down Expand Up @@ -292,11 +290,12 @@ public Predicate projectedPredicate() {

@Override
public void close() throws IOException {
stateFactory.close();
if (refreshExecutor != null) {
try {
stateFactory.close();
FileIOUtils.deleteDirectory(context.tempPath);
} finally {
refreshExecutor.shutdown();
}
FileIOUtils.deleteDirectory(context.tempPath);
}

/** Bulk loader for the table. */
Expand Down

0 comments on commit e833a96

Please sign in to comment.