Skip to content

Commit

Permalink
[flink] Fix that dim table cannot refresh overwrite changes (apache#4558
Browse files Browse the repository at this point in the history
)
  • Loading branch information
yuzelin authored Nov 21, 2024
1 parent 187825a commit 499afd9
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,16 @@ private Plan nextPlan() {
return SnapshotNotExistPlan.INSTANCE;
}

// first check changes of overwrite
if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE
&& supportStreamingReadOverwrite) {
LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId);
SnapshotReader.Plan overwritePlan =
followUpScanner.getOverwriteChangesPlan(snapshot, snapshotReader);
currentWatermark = overwritePlan.watermark();
nextSnapshotId++;
return overwritePlan;
} else if (followUpScanner.shouldScanSnapshot(snapshot)) {
// first try to get overwrite changes
if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
SnapshotReader.Plan overwritePlan = handleOverwriteSnapshot(snapshot);
if (overwritePlan != null) {
nextSnapshotId++;
return overwritePlan;
}
}

if (followUpScanner.shouldScanSnapshot(snapshot)) {
LOG.debug("Find snapshot id {}.", nextSnapshotId);
SnapshotReader.Plan plan = followUpScanner.scan(snapshot, snapshotReader);
currentWatermark = plan.watermark();
Expand All @@ -228,6 +228,18 @@ private boolean shouldDelaySnapshot(long snapshotId) {
return false;
}

@Nullable
protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) {
if (supportStreamingReadOverwrite) {
LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId);
SnapshotReader.Plan overwritePlan =
followUpScanner.getOverwriteChangesPlan(snapshot, snapshotReader);
currentWatermark = overwritePlan.watermark();
return overwritePlan;
}
return null;
}

protected FollowUpScanner createFollowUpScanner() {
CoreOptions.StreamScanMode type =
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public Collection<RowData> lookup(RowData keyRow) {
rows.add(new FlinkRowData(matchedRow));
}
return rows;
} catch (OutOfRangeException e) {
} catch (OutOfRangeException | ReopenException e) {
reopen();
return lookup(keyRow);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.table.source.DataTableStreamScan;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
Expand All @@ -29,6 +30,8 @@
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import static org.apache.paimon.CoreOptions.StartupMode;
import static org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamScanMode;

Expand Down Expand Up @@ -58,6 +61,16 @@ public LookupDataTableScan(
this.lookupScanMode = lookupScanMode;
}

@Override
@Nullable
protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) {
SnapshotReader.Plan plan = super.handleOverwriteSnapshot(snapshot);
if (plan != null) {
return plan;
}
throw new ReopenException();
}

@Override
protected StartingScanner createStartingScanner(boolean isStreaming) {
return startupMode != CoreOptions.StartupMode.COMPACTED_FULL
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;

/** Signals that dim table source need to reopen. */
public class ReopenException extends RuntimeException {

private static final long serialVersionUID = 1L;

public ReopenException() {
super();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -977,4 +978,32 @@ public void testPartialCacheBucketKeyOrder(LookupCacheMode mode) throws Exceptio

iterator.close();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testOverwriteDimTable(boolean isPkTable) throws Exception {
sql(
"CREATE TABLE DIM (i INT %s, v int, pt STRING) "
+ "PARTITIONED BY (pt) WITH ('continuous.discovery-interval'='1 ms')",
isPkTable ? "PRIMARY KEY NOT ENFORCED" : "");

BlockingIterator<Row, Row> iterator =
streamSqlBlockIter(
"SELECT T.i, D.v, D.pt FROM T LEFT JOIN DIM FOR SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i");

sql("INSERT INTO DIM VALUES (1, 11, 'A'), (2, 22, 'B')");
sql("INSERT INTO T VALUES (1), (2)");

List<Row> result = iterator.collect(2);
assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, "A"), Row.of(2, 22, "B"));

sql("INSERT OVERWRITE DIM PARTITION (pt='B') VALUES (3, 33)");
Thread.sleep(2000); // wait refresh
sql("INSERT INTO T VALUES (3)");

result = iterator.collect(1);
assertThat(result).containsExactlyInAnyOrder(Row.of(3, 33, "B"));

iterator.close();
}
}

0 comments on commit 499afd9

Please sign in to comment.