diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 8e937ffe1a86..9ed83a9bb457 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -53,22 +53,17 @@ public abstract class FullCacheLookupTable implements LookupTable { protected final Context context; - protected final RocksDBStateFactory stateFactory; protected final RowType projectedType; @Nullable protected final FieldsComparator userDefinedSeqComparator; protected final int appendUdsFieldNumber; + protected RocksDBStateFactory stateFactory; private LookupStreamingReader reader; private Predicate specificPartition; - public FullCacheLookupTable(Context context) throws IOException { + public FullCacheLookupTable(Context context) { this.context = context; - this.stateFactory = - new RocksDBStateFactory( - context.tempPath.toString(), - context.table.coreOptions().toConfiguration(), - null); FileStoreTable table = context.table; List sequenceFields = new ArrayList<>(); if (table.primaryKeys().size() > 0) { @@ -104,8 +99,15 @@ public void specificPartitionFilter(Predicate filter) { this.specificPartition = filter; } - @Override - public void open() throws Exception { + protected void openStateFactory() throws Exception { + this.stateFactory = + new RocksDBStateFactory( + context.tempPath.toString(), + context.table.coreOptions().toConfiguration(), + null); + } + + protected void bootstrap() throws Exception { Predicate scanPredicate = PredicateBuilder.andNullable(context.tablePredicate, specificPartition); this.reader = new LookupStreamingReader(context.table, context.projection, scanPredicate); @@ -198,7 +200,7 @@ public interface TableBulkLoader { void finish() throws IOException; } - static FullCacheLookupTable create(Context context, long lruCacheSize) throws IOException { + static FullCacheLookupTable create(Context context, long lruCacheSize) { List primaryKeys = context.table.primaryKeys(); if (primaryKeys.isEmpty()) { return new NoPrimaryKeyLookupTable(context, lruCacheSize); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java index 7f5d036b183d..eaad549eef77 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java @@ -42,7 +42,7 @@ public class NoPrimaryKeyLookupTable extends FullCacheLookupTable { private RocksDBListState state; - public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) throws IOException { + public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) { super(context); this.lruCacheSize = lruCacheSize; List fieldNames = projectedType.getFieldNames(); @@ -52,6 +52,7 @@ public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) throws IOExce @Override public void open() throws Exception { + openStateFactory(); this.state = stateFactory.listState( "join-key-index", @@ -59,7 +60,7 @@ public void open() throws Exception { TypeUtils.project(projectedType, joinKeyRow.indexMapping())), InternalSerializers.create(projectedType), lruCacheSize); - super.open(); + bootstrap(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java index 889e1e35ba8e..375b934615b7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java @@ -47,8 +47,7 @@ public class PrimaryKeyLookupTable extends FullCacheLookupTable { protected RocksDBValueState tableState; - public PrimaryKeyLookupTable(Context context, long lruCacheSize, List joinKey) - throws IOException { + public PrimaryKeyLookupTable(Context context, long lruCacheSize, List joinKey) { super(context); this.lruCacheSize = lruCacheSize; List fieldNames = projectedType.getFieldNames(); @@ -71,6 +70,12 @@ public PrimaryKeyLookupTable(Context context, long lruCacheSize, List jo @Override public void open() throws Exception { + openStateFactory(); + createTableState(); + bootstrap(); + } + + protected void createTableState() throws IOException { this.tableState = stateFactory.valueState( "table", @@ -78,7 +83,6 @@ public void open() throws Exception { TypeUtils.project(projectedType, primaryKeyRow.indexMapping())), InternalSerializers.create(projectedType), lruCacheSize); - super.open(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java index d4fb22c4b76a..f551f17ccbc4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java @@ -38,7 +38,7 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable { private RocksDBSetState indexState; - public SecondaryIndexLookupTable(Context context, long lruCacheSize) throws IOException { + public SecondaryIndexLookupTable(Context context, long lruCacheSize) { super(context, lruCacheSize / 2, context.table.primaryKeys()); List fieldNames = projectedType.getFieldNames(); int[] secKeyMapping = context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray(); @@ -47,6 +47,8 @@ public SecondaryIndexLookupTable(Context context, long lruCacheSize) throws IOEx @Override public void open() throws Exception { + openStateFactory(); + createTableState(); this.indexState = stateFactory.setState( "sec-index", @@ -55,7 +57,7 @@ public void open() throws Exception { InternalSerializers.create( TypeUtils.project(projectedType, primaryKeyRow.indexMapping())), lruCacheSize); - super.open(); + bootstrap(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 065ace85a6e9..d9cb58b43ec8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -118,6 +118,10 @@ public void testPkTable() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); + // test re-open + table.close(); + table.open(); + // test bulk load error { TableBulkLoader bulkLoader = table.createBulkLoader(); @@ -175,6 +179,10 @@ public void testPkTableWithSequenceField() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); + // test re-open + table.close(); + table.open(); + List> records = new ArrayList<>(); for (int i = 1; i <= 10; i++) { InternalRow row = row(i, 11 * i, 111 * i); @@ -219,6 +227,10 @@ public void testPkTablePkFilter() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); + // test re-open + table.close(); + table.open(); + table.refresh(singletonList(row(1, 11, 111)).iterator()); List result = table.get(row(1)); assertThat(result).hasSize(1); @@ -250,6 +262,10 @@ public void testPkTableNonPkFilter() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); + // test re-open + table.close(); + table.open(); + table.refresh(singletonList(row(1, 11, 111)).iterator()); List result = table.get(row(1)); assertThat(result).hasSize(1); @@ -274,6 +290,10 @@ public void testSecKeyTable() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); + // test re-open + table.close(); + table.open(); + // test bulk load 100_000 records List> records = new ArrayList<>(); Random rnd = new Random(); @@ -319,6 +339,10 @@ public void testSecKeyTableWithSequenceField() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); + // test re-open + table.close(); + table.open(); + List> records = new ArrayList<>(); Random rnd = new Random(); Map> secKeyToPk = new HashMap<>(); @@ -367,6 +391,10 @@ public void testSecKeyTablePkFilter() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); + // test re-open + table.close(); + table.open(); + table.refresh(singletonList(row(1, 11, 111)).iterator()); List result = table.get(row(11)); assertThat(result).hasSize(1); @@ -407,6 +435,10 @@ public void testNoPrimaryKeyTable() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); + // test re-open + table.close(); + table.open(); + // test bulk load 100_000 records List> records = new ArrayList<>(); Random rnd = new Random(); @@ -450,6 +482,10 @@ public void testNoPrimaryKeyTableFilter() throws Exception { table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); + // test re-open + table.close(); + table.open(); + table.refresh(singletonList(row(1, 11, 333)).iterator()); List result = table.get(row(11)); assertThat(result).hasSize(0); @@ -508,6 +544,11 @@ public void testPartialLookupTableWithProjection() throws Exception { tempDir.toFile(), ImmutableList.of("pk1", "pk2")); table.open(); + + // test re-open + table.close(); + table.open(); + List result = table.get(row(1, -1)); assertThat(result).hasSize(0); @@ -535,6 +576,10 @@ public void testPartialLookupTableJoinKeyOrder() throws Exception { ImmutableList.of("pk2", "pk1")); table.open(); + // test re-open + table.close(); + table.open(); + List result = table.get(row(-1, 1)); assertThat(result).hasSize(0);