From 499afd97bd8e64c38b7b389227ba48cbd1ce8e4b Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:22:19 +0800 Subject: [PATCH] [flink] Fix that dim table cannot refresh overwrite changes (#4558) --- .../table/source/DataTableStreamScan.java | 32 +++++++++++++------ .../flink/lookup/FileStoreLookupFunction.java | 2 +- .../flink/lookup/LookupDataTableScan.java | 13 ++++++++ .../paimon/flink/lookup/ReopenException.java | 29 +++++++++++++++++ .../apache/paimon/flink/LookupJoinITCase.java | 29 +++++++++++++++++ 5 files changed, 94 insertions(+), 11 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ReopenException.java diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java index a68c7b1cb46d..e8c4ddfa1c7c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java @@ -194,16 +194,16 @@ private Plan nextPlan() { return SnapshotNotExistPlan.INSTANCE; } - // first check changes of overwrite - if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE - && supportStreamingReadOverwrite) { - LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId); - SnapshotReader.Plan overwritePlan = - followUpScanner.getOverwriteChangesPlan(snapshot, snapshotReader); - currentWatermark = overwritePlan.watermark(); - nextSnapshotId++; - return overwritePlan; - } else if (followUpScanner.shouldScanSnapshot(snapshot)) { + // first try to get overwrite changes + if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) { + SnapshotReader.Plan overwritePlan = handleOverwriteSnapshot(snapshot); + if (overwritePlan != null) { + nextSnapshotId++; + return overwritePlan; + } + } + + if (followUpScanner.shouldScanSnapshot(snapshot)) { LOG.debug("Find snapshot id {}.", nextSnapshotId); SnapshotReader.Plan plan = followUpScanner.scan(snapshot, snapshotReader); currentWatermark = plan.watermark(); @@ -228,6 +228,18 @@ private boolean shouldDelaySnapshot(long snapshotId) { return false; } + @Nullable + protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) { + if (supportStreamingReadOverwrite) { + LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId); + SnapshotReader.Plan overwritePlan = + followUpScanner.getOverwriteChangesPlan(snapshot, snapshotReader); + currentWatermark = overwritePlan.watermark(); + return overwritePlan; + } + return null; + } + protected FollowUpScanner createFollowUpScanner() { CoreOptions.StreamScanMode type = options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 9503960fbe17..e3f2fe110c6c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -249,7 +249,7 @@ public Collection lookup(RowData keyRow) { rows.add(new FlinkRowData(matchedRow)); } return rows; - } catch (OutOfRangeException e) { + } catch (OutOfRangeException | ReopenException e) { reopen(); return lookup(keyRow); } catch (Exception e) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java index 908884a573c0..48cb64e70be1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.table.source.DataTableStreamScan; import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner; @@ -29,6 +30,8 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.utils.SnapshotManager; +import javax.annotation.Nullable; + import static org.apache.paimon.CoreOptions.StartupMode; import static org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamScanMode; @@ -58,6 +61,16 @@ public LookupDataTableScan( this.lookupScanMode = lookupScanMode; } + @Override + @Nullable + protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) { + SnapshotReader.Plan plan = super.handleOverwriteSnapshot(snapshot); + if (plan != null) { + return plan; + } + throw new ReopenException(); + } + @Override protected StartingScanner createStartingScanner(boolean isStreaming) { return startupMode != CoreOptions.StartupMode.COMPACTED_FULL diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ReopenException.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ReopenException.java new file mode 100644 index 000000000000..7149d591f8df --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ReopenException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +/** Signals that dim table source need to reopen. */ +public class ReopenException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public ReopenException() { + super(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index 3e9ba2194aed..a6abde57b80c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Collections; import java.util.List; @@ -977,4 +978,32 @@ public void testPartialCacheBucketKeyOrder(LookupCacheMode mode) throws Exceptio iterator.close(); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testOverwriteDimTable(boolean isPkTable) throws Exception { + sql( + "CREATE TABLE DIM (i INT %s, v int, pt STRING) " + + "PARTITIONED BY (pt) WITH ('continuous.discovery-interval'='1 ms')", + isPkTable ? "PRIMARY KEY NOT ENFORCED" : ""); + + BlockingIterator iterator = + streamSqlBlockIter( + "SELECT T.i, D.v, D.pt FROM T LEFT JOIN DIM FOR SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i"); + + sql("INSERT INTO DIM VALUES (1, 11, 'A'), (2, 22, 'B')"); + sql("INSERT INTO T VALUES (1), (2)"); + + List result = iterator.collect(2); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, "A"), Row.of(2, 22, "B")); + + sql("INSERT OVERWRITE DIM PARTITION (pt='B') VALUES (3, 33)"); + Thread.sleep(2000); // wait refresh + sql("INSERT INTO T VALUES (3)"); + + result = iterator.collect(1); + assertThat(result).containsExactlyInAnyOrder(Row.of(3, 33, "B")); + + iterator.close(); + } }