diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 3fd73d2b96d7..65c9c7302f0c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -94,6 +94,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable { // timestamp when cache expires private transient long nextLoadTime; + protected FunctionContext functionContext; + public FileStoreLookupFunction( Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) { TableScanUtils.streamingReadingValidate(table); @@ -127,6 +129,7 @@ public FileStoreLookupFunction( } public void open(FunctionContext context) throws Exception { + this.functionContext = context; String tmpDirectory = getTmpDirectory(context); open(tmpDirectory); } @@ -352,7 +355,7 @@ private static StreamingRuntimeContext extractStreamingRuntimeContext(Object run * * @return the set of bucket IDs to be cached */ - private Set getRequireCachedBucketIds() { + protected Set getRequireCachedBucketIds() { // TODO: Implement the method when Flink support bucket shuffle for lookup join. return null; }