diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java index 5fad72a2aee9..6459cd4ba65f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java @@ -182,7 +182,9 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { return new IncrementalTagStartingScanner( snapshotManager, incrementalBetween.getLeft(), - incrementalBetween.getRight()); + incrementalBetween.getRight(), + scanMode, + options.tagCreationMode()); } } else { return new IncrementalTimeStampStartingScanner( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 2cdf5bff9d26..c6c906c3473b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -18,43 +18,175 @@ package org.apache.paimon.table.source.snapshot; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import java.util.*; + /** {@link StartingScanner} for incremental changes by tag. */ public class IncrementalTagStartingScanner extends AbstractStartingScanner { private final String start; private final String end; + private final ScanMode scanMode; + private final CoreOptions.TagCreationMode tagCreationMode; public IncrementalTagStartingScanner( - SnapshotManager snapshotManager, String start, String end) { + SnapshotManager snapshotManager, + String start, + String end, + ScanMode scanMode, + CoreOptions.TagCreationMode tagCreationMode) { super(snapshotManager); this.start = start; this.end = end; - TagManager tagManager = - new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - Snapshot startingSnapshot = tagManager.taggedSnapshot(start); - if (startingSnapshot != null) { - this.startingSnapshotId = startingSnapshot.id(); - } + this.scanMode = scanMode; + this.tagCreationMode = tagCreationMode; } @Override public Result scan(SnapshotReader reader) { TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - Snapshot tag1 = tagManager.taggedSnapshot(start); - Snapshot tag2 = tagManager.taggedSnapshot(end); - - if (tag2.id() <= tag1.id()) { - throw new IllegalArgumentException( - String.format( - "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s", - end, tag2.id(), start, tag1.id())); + + if (tagCreationMode == CoreOptions.TagCreationMode.BATCH) { + Snapshot tagStart; + Snapshot tagEnd; + try { + tagStart = tagManager.taggedSnapshot(start); + } catch (IllegalArgumentException e) { + tagStart = null; + } + try { + tagEnd = tagManager.taggedSnapshot(end); + } catch (IllegalArgumentException e) { + tagEnd = null; + } + + if (tagStart == null && tagEnd == null) { + throw new NullPointerException( + "The tags of the incremental query cannot all be empty"); + } + Snapshot tagResult = tagEnd == null ? tagStart : tagEnd; + Map, List> grouped = new HashMap<>(); + // 兼容先知场景首次创建的情况 + if (tagStart == null || tagEnd == null) { + List currentSplits = readSplits(reader, tagResult); + for (DataSplit split : currentSplits) { + grouped.computeIfAbsent( + Pair.of(split.partition(), split.bucket()), + k -> new ArrayList<>()) + .addAll(split.dataFiles()); + } + } else { + if (tagEnd.id() <= tagStart.id()) { + throw new IllegalArgumentException( + String.format( + "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s", + end, tagEnd.id(), start, tagStart.id())); + } + SortedMap tags = tagManager.tags(); + for (Snapshot snapshot : tags.keySet()) { + if (snapshot.id() > tagStart.id() && snapshot.id() <= tagEnd.id()) { + List currentSplits = readSplits(reader, snapshot); + for (DataSplit split : currentSplits) { + grouped.computeIfAbsent( + Pair.of(split.partition(), split.bucket()), + k -> new ArrayList<>()) + .addAll(split.dataFiles()); + } + } + } + } + + List result = new ArrayList<>(); + for (Map.Entry, List> entry : + grouped.entrySet()) { + BinaryRow partition = entry.getKey().getLeft(); + int bucket = entry.getKey().getRight(); + for (List files : + reader.splitGenerator().splitForBatch(entry.getValue())) { + result.add( + DataSplit.builder() + .withSnapshot(tagResult.id()) + .withPartition(partition) + .withBucket(bucket) + .withDataFiles(files) + .build()); + } + } + return StartingScanner.fromPlan( + new SnapshotReader.Plan() { + @Override + public Long watermark() { + return null; + } + + @Override + public Long snapshotId() { + return tagResult.id(); + } + + @Override + public List splits() { + return (List) result; + } + }); + } else { + Snapshot tagStart = tagManager.taggedSnapshot(start); + Snapshot tagEnd = tagManager.taggedSnapshot(end); + + if (tagEnd.id() <= tagStart.id()) { + throw new IllegalArgumentException( + String.format( + "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s", + end, tagEnd.id(), start, tagStart.id())); + } + + if (tagEnd.id() <= tagStart.id()) { + throw new IllegalArgumentException( + String.format( + "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s", + end, tagEnd.id(), start, tagStart.id())); + } + return StartingScanner.fromPlan( + reader.withSnapshot(tagEnd).readIncrementalDiff(tagStart)); } + } + + private List readSplits(SnapshotReader reader, Snapshot s) { + switch (scanMode) { + case CHANGELOG: + return readChangeLogSplits(reader, s); + case DELTA: + return readDeltaSplits(reader, s); + default: + throw new UnsupportedOperationException("Unsupported scan kind: " + scanMode); + } + } - return StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1)); + private List readDeltaSplits(SnapshotReader reader, Snapshot s) { + if (s.commitKind() == Snapshot.CommitKind.OVERWRITE) { + // ignore OVERWRITE + return Collections.emptyList(); + } + return (List) reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private List readChangeLogSplits(SnapshotReader reader, Snapshot s) { + if (s.commitKind() == Snapshot.CommitKind.OVERWRITE) { + // ignore OVERWRITE + return Collections.emptyList(); + } + return (List) reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits(); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index dff75ff29b33..033091f1f176 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.FileStoreTable; @@ -54,8 +55,8 @@ */ public class BatchWriteGeneratorTagOperator implements OneInputStreamOperator, - SetupableStreamOperator, - BoundedOneInput { + SetupableStreamOperator, + BoundedOneInput { private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-"; @@ -112,12 +113,24 @@ private void createTag() { BATCH_WRITE_TAG_PREFIX + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); try { - // If the tag already exists, delete the tag + // Check whether the tag exists if (tagManager.tagExists(tagName)) { - tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + // When tag already exists, we want to consider different scenarios. + // If the table type is lookup or full-compaction, and there is no changelog created, + // then the data hasn't changed and compaction doesn't create a tag + CoreOptions.ChangelogProducer changelogProducer = + this.table.coreOptions().changelogProducer(); + if (!((changelogProducer == CoreOptions.ChangelogProducer.LOOKUP + || changelogProducer + == CoreOptions.ChangelogProducer.FULL_COMPACTION) + && snapshot.changelogRecordCount() == 0)) { + tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + tagManager.createTag(snapshot, tagName); + } + } else { + // If the tag does not exist, it is created + tagManager.createTag(snapshot, tagName); } - // Create a new tag - tagManager.createTag(snapshot, tagName); // Expire the tag expireTag(); } catch (Exception e) {