diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AbstractLookupReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AbstractLookupReader.java new file mode 100644 index 000000000000..c1059a4d796c --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AbstractLookupReader.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.SplitsParallelReadUtil; +import org.apache.paimon.mergetree.compact.ConcatRecordReader; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.ReaderSupplier; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FunctionWithIOException; +import org.apache.paimon.utils.TypeUtils; + +import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.IntUnaryOperator; +import java.util.stream.IntStream; + +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM; +import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; + +/** Reader to load data into {@link LookupTable}. */ +abstract class AbstractLookupReader { + + protected final Table table; + protected final int[] projection; + protected final ReadBuilder readBuilder; + @Nullable protected final Predicate projectedPredicate; + + protected static final List> TIME_TRAVEL_OPTIONS = + Arrays.asList( + CoreOptions.SCAN_TIMESTAMP_MILLIS, + CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, + CoreOptions.SCAN_SNAPSHOT_ID, + CoreOptions.SCAN_TAG_NAME, + CoreOptions.SCAN_VERSION); + + protected AbstractLookupReader( + Table table, + int[] projection, + @Nullable Predicate predicate, + Set requireCachedBucketIds) { + this.table = unsetTimeTravelOptions(table); + this.projection = projection; + this.readBuilder = + this.table + .newReadBuilder() + .withProjection(projection) + .withFilter(predicate) + .withBucketFilter( + requireCachedBucketIds == null + ? null + : requireCachedBucketIds::contains); + if (predicate != null) { + List fieldNames = table.rowType().getFieldNames(); + List primaryKeys = table.primaryKeys(); + + // for pk table: only filter by pk, the stream is upsert instead of changelog + // for non-pk table: filter all + IntUnaryOperator operator = + i -> { + int index = Ints.indexOf(projection, i); + boolean safeFilter = + primaryKeys.isEmpty() || primaryKeys.contains(fieldNames.get(i)); + return safeFilter ? index : -1; + }; + + int[] fieldIdxToProjectionIdx = + IntStream.range(0, table.rowType().getFieldCount()).map(operator).toArray(); + + this.projectedPredicate = + transformFieldMapping(predicate, fieldIdxToProjectionIdx).orElse(null); + } else { + this.projectedPredicate = null; + } + } + + protected Table unsetTimeTravelOptions(Table origin) { + FileStoreTable fileStoreTable = (FileStoreTable) origin; + Map newOptions = new HashMap<>(fileStoreTable.options()); + TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove); + + CoreOptions.StartupMode startupMode = CoreOptions.fromMap(newOptions).startupMode(); + if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) { + startupMode = CoreOptions.StartupMode.LATEST_FULL; + } + newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString()); + + TableSchema newSchema = fileStoreTable.schema().copy(newOptions); + return fileStoreTable.copy(newSchema); + } + + protected abstract List nextBatchSplits(); + + protected abstract Long nextSnapshotId(); + + public RecordReader nextBatch(boolean useParallelism) throws Exception { + List splits = nextBatchSplits(); + return nextBatch(useParallelism, splits); + } + + public RecordReader nextBatch(boolean useParallelism, List splits) + throws Exception { + + CoreOptions options = CoreOptions.fromMap(table.options()); + FunctionWithIOException> readerSupplier = + split -> readBuilder.newRead().createReader(split); + + RowType readType = TypeUtils.project(table.rowType(), projection); + + RecordReader reader; + if (useParallelism) { + reader = + SplitsParallelReadUtil.parallelExecute( + readType, + readerSupplier, + splits, + options.pageSize(), + new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM)); + } else { + List> readers = new ArrayList<>(); + for (Split split : splits) { + readers.add(() -> readerSupplier.apply(split)); + } + reader = ConcatRecordReader.create(readers); + } + + if (projectedPredicate != null) { + reader = reader.filter(projectedPredicate::test); + } + return reader; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 01ebbde20154..17d23830ed12 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -25,7 +25,6 @@ import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode; import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.FlinkRowWrapper; -import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.FileStoreTable; @@ -98,8 +97,6 @@ public class FileStoreLookupFunction implements Serializable, Closeable { public FileStoreLookupFunction( Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) { - TableScanUtils.streamingReadingValidate(table); - this.table = table; this.partitionLoader = DynamicPartitionLoader.of(table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 28b0da0d150c..af1b2f2a4d3b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -62,6 +62,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.paimon.CoreOptions.ChangelogProducer.NONE; +import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT; @@ -83,7 +85,7 @@ public abstract class FullCacheLookupTable implements LookupTable { private final int maxPendingSnapshotCount; private final FileStoreTable table; private Future refreshFuture; - private LookupStreamingReader reader; + private AbstractLookupReader reader; private Predicate specificPartition; public FullCacheLookupTable(Context context) { @@ -149,12 +151,20 @@ protected void openStateFactory() throws Exception { protected void bootstrap() throws Exception { Predicate scanPredicate = PredicateBuilder.andNullable(context.tablePredicate, specificPartition); + + CoreOptions options = CoreOptions.fromMap(table.options()); this.reader = - new LookupStreamingReader( - context.table, - context.projection, - scanPredicate, - context.requiredCachedBucketIds); + (options.mergeEngine() == PARTIAL_UPDATE && options.changelogProducer() == NONE) + ? new LookupBatchReader( + context.table, + context.projection, + scanPredicate, + context.requiredCachedBucketIds) + : new LookupStreamingReader( + context.table, + context.projection, + scanPredicate, + context.requiredCachedBucketIds); BinaryExternalSortBuffer bulkLoadSorter = RocksDBState.createBulkLoadSorter( IOManager.create(context.tempPath.toString()), context.table.coreOptions()); @@ -236,7 +246,7 @@ public void refresh() throws Exception { } private void doRefresh() throws Exception { - while (true) { + if (reader instanceof LookupBatchReader) { try (RecordReaderIterator batch = new RecordReaderIterator<>(reader.nextBatch(false))) { if (!batch.hasNext()) { @@ -244,6 +254,16 @@ private void doRefresh() throws Exception { } refresh(batch); } + } else { + while (true) { + try (RecordReaderIterator batch = + new RecordReaderIterator<>(reader.nextBatch(false))) { + if (!batch.hasNext()) { + return; + } + refresh(batch); + } + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupBatchReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupBatchReader.java new file mode 100644 index 000000000000..dbbeabf55739 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupBatchReader.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.table.source.snapshot.SnapshotReader; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Set; + +/** A batch reader to load data into {@link LookupTable}. */ +public class LookupBatchReader extends AbstractLookupReader { + private TableScan scan; + private Long nextSnapshotId; + + public LookupBatchReader( + Table table, + int[] projection, + @Nullable Predicate predicate, + Set requireCachedBucketIds) { + super(table, projection, predicate, requireCachedBucketIds); + } + + @Override + protected List nextBatchSplits() { + SnapshotReader.Plan plan = (SnapshotReader.Plan) readBuilder.newScan().plan(); + nextSnapshotId = plan.snapshotId() == null ? 1 : plan.snapshotId() + 1; + return readBuilder.newScan().plan().splits(); + } + + @Nullable + public Long nextSnapshotId() { + return nextSnapshotId; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java index ceb40c1a864f..b2a5a19460d3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java @@ -18,144 +18,31 @@ package org.apache.paimon.flink.lookup; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.io.SplitsParallelReadUtil; -import org.apache.paimon.mergetree.compact.ConcatRecordReader; -import org.apache.paimon.options.ConfigOption; -import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.reader.ReaderSupplier; -import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.FunctionWithIOException; -import org.apache.paimon.utils.TypeUtils; - -import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.function.IntUnaryOperator; -import java.util.stream.IntStream; - -import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM; -import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; /** A streaming reader to load data into {@link LookupTable}. */ -public class LookupStreamingReader { - - private final Table table; - private final int[] projection; - private final ReadBuilder readBuilder; - @Nullable private final Predicate projectedPredicate; +public class LookupStreamingReader extends AbstractLookupReader { private final StreamTableScan scan; - private static final List> TIME_TRAVEL_OPTIONS = - Arrays.asList( - CoreOptions.SCAN_TIMESTAMP_MILLIS, - CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, - CoreOptions.SCAN_SNAPSHOT_ID, - CoreOptions.SCAN_TAG_NAME, - CoreOptions.SCAN_VERSION); - public LookupStreamingReader( Table table, int[] projection, @Nullable Predicate predicate, Set requireCachedBucketIds) { - this.table = unsetTimeTravelOptions(table); - this.projection = projection; - this.readBuilder = - this.table - .newReadBuilder() - .withProjection(projection) - .withFilter(predicate) - .withBucketFilter( - requireCachedBucketIds == null - ? null - : requireCachedBucketIds::contains); + super(table, projection, predicate, requireCachedBucketIds); scan = readBuilder.newStreamScan(); - - if (predicate != null) { - List fieldNames = table.rowType().getFieldNames(); - List primaryKeys = table.primaryKeys(); - - // for pk table: only filter by pk, the stream is upsert instead of changelog - // for non-pk table: filter all - IntUnaryOperator operator = - i -> { - int index = Ints.indexOf(projection, i); - boolean safeFilter = - primaryKeys.isEmpty() || primaryKeys.contains(fieldNames.get(i)); - return safeFilter ? index : -1; - }; - - int[] fieldIdxToProjectionIdx = - IntStream.range(0, table.rowType().getFieldCount()).map(operator).toArray(); - - this.projectedPredicate = - transformFieldMapping(predicate, fieldIdxToProjectionIdx).orElse(null); - } else { - this.projectedPredicate = null; - } } - private Table unsetTimeTravelOptions(Table origin) { - FileStoreTable fileStoreTable = (FileStoreTable) origin; - Map newOptions = new HashMap<>(fileStoreTable.options()); - TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove); - - CoreOptions.StartupMode startupMode = CoreOptions.fromMap(newOptions).startupMode(); - if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) { - startupMode = CoreOptions.StartupMode.LATEST_FULL; - } - newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString()); - - TableSchema newSchema = fileStoreTable.schema().copy(newOptions); - return fileStoreTable.copy(newSchema); - } - - public RecordReader nextBatch(boolean useParallelism) throws Exception { - List splits = scan.plan().splits(); - CoreOptions options = CoreOptions.fromMap(table.options()); - FunctionWithIOException> readerSupplier = - split -> readBuilder.newRead().createReader(split); - - RowType readType = TypeUtils.project(table.rowType(), projection); - - RecordReader reader; - if (useParallelism) { - reader = - SplitsParallelReadUtil.parallelExecute( - readType, - readerSupplier, - splits, - options.pageSize(), - new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM)); - } else { - List> readers = new ArrayList<>(); - for (Split split : splits) { - readers.add(() -> readerSupplier.apply(split)); - } - reader = ConcatRecordReader.create(readers); - } - - if (projectedPredicate != null) { - reader = reader.filter(projectedPredicate::test); - } - return reader; + public List nextBatchSplits() { + return scan.plan().splits(); } @Nullable diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 5b35cb131393..5adcff1f2fa7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** ITCase for lookup join. */ public class LookupJoinITCase extends CatalogITCaseBase { @@ -517,21 +516,6 @@ public void testRepeatRefresh(LookupCacheMode cacheMode) throws Exception { iterator.close(); } - @Test - public void testLookupPartialUpdateIllegal() { - sql( - "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH" - + " ('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')"); - String query = - "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i"; - assertThatThrownBy(() -> sEnv.executeSql(query)) - .hasRootCauseMessage( - "Partial update streaming" - + " reading is not supported. " - + "You can use 'lookup' or 'full-compaction' changelog producer to support streaming reading. " - + "('input' changelog producer is also supported, but only returns input records.)"); - } - @Test public void testLookupPartialUpdate() throws Exception { testLookupPartialUpdate("none"); @@ -564,6 +548,40 @@ private void testLookupPartialUpdate(String compression) throws Exception { sql("TRUNCATE TABLE T"); } + @Test + public void testLookupPartialUpdateNoneChangelog() throws Exception { + sql( + "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms','merge-engine' = 'partial-update','changelog-producer'='none')"); + + sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)"); + String query = + "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO T VALUES (1), (2), (3)"); + List result = iterator.collect(3); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111, 1111), + Row.of(2, 22, 222, 2222), + Row.of(3, null, null, null)); + + sql("INSERT INTO DIM VALUES (2, 44, cast(NULL as int), 4444),(3,33,333,3333)"); + sql("INSERT INTO DIM VALUES (2, cast(NULL as int), 444, 4444)"); + Thread.sleep(2000); // wait refresh + sql("INSERT INTO T VALUES (1), (2), (3),(4)"); + + result = iterator.collect(4); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111, 1111), + Row.of(2, 44, 444, 4444), + Row.of(3, 33, 333, 3333), + Row.of(4, null, null, null)); + + iterator.close(); + } + @ParameterizedTest @EnumSource(LookupCacheMode.class) public void testRetryLookup(LookupCacheMode cacheMode) throws Exception {