Skip to content

Commit

Permalink
1. Enhance the tag read capability in offline scenarios. This is an i…
Browse files Browse the repository at this point in the history
…nternal custom request

2. Update the tag generation override policy in offline scenarios
  • Loading branch information
siyang.zeng committed Jan 4, 2024
1 parent bf6e8d6 commit d014d21
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
return new IncrementalTagStartingScanner(
snapshotManager,
incrementalBetween.getLeft(),
incrementalBetween.getRight());
incrementalBetween.getRight(),
scanMode,
options.tagCreationMode());
}
} else {
return new IncrementalTimeStampStartingScanner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,175 @@

package org.apache.paimon.table.source.snapshot;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import java.util.*;

/** {@link StartingScanner} for incremental changes by tag. */
public class IncrementalTagStartingScanner extends AbstractStartingScanner {

private final String start;
private final String end;
private final ScanMode scanMode;
private final CoreOptions.TagCreationMode tagCreationMode;

public IncrementalTagStartingScanner(
SnapshotManager snapshotManager, String start, String end) {
SnapshotManager snapshotManager,
String start,
String end,
ScanMode scanMode,
CoreOptions.TagCreationMode tagCreationMode) {
super(snapshotManager);
this.start = start;
this.end = end;
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot startingSnapshot = tagManager.taggedSnapshot(start);
if (startingSnapshot != null) {
this.startingSnapshotId = startingSnapshot.id();
}
this.scanMode = scanMode;
this.tagCreationMode = tagCreationMode;
}

@Override
public Result scan(SnapshotReader reader) {
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot tag1 = tagManager.taggedSnapshot(start);
Snapshot tag2 = tagManager.taggedSnapshot(end);

if (tag2.id() <= tag1.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tag2.id(), start, tag1.id()));

if (tagCreationMode == CoreOptions.TagCreationMode.BATCH) {
Snapshot tagStart;
Snapshot tagEnd;
try {
tagStart = tagManager.taggedSnapshot(start);
} catch (IllegalArgumentException e) {
tagStart = null;
}
try {
tagEnd = tagManager.taggedSnapshot(end);
} catch (IllegalArgumentException e) {
tagEnd = null;
}

if (tagStart == null && tagEnd == null) {
throw new NullPointerException(
"The tags of the incremental query cannot all be empty");
}
Snapshot tagResult = tagEnd == null ? tagStart : tagEnd;
Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new HashMap<>();
// 兼容先知场景首次创建的情况
if (tagStart == null || tagEnd == null) {
List<DataSplit> currentSplits = readSplits(reader, tagResult);
for (DataSplit split : currentSplits) {
grouped.computeIfAbsent(
Pair.of(split.partition(), split.bucket()),
k -> new ArrayList<>())
.addAll(split.dataFiles());
}
} else {
if (tagEnd.id() <= tagStart.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tagEnd.id(), start, tagStart.id()));
}
SortedMap<Snapshot, String> tags = tagManager.tags();
for (Snapshot snapshot : tags.keySet()) {
if (snapshot.id() > tagStart.id() && snapshot.id() <= tagEnd.id()) {
List<DataSplit> currentSplits = readSplits(reader, snapshot);
for (DataSplit split : currentSplits) {
grouped.computeIfAbsent(
Pair.of(split.partition(), split.bucket()),
k -> new ArrayList<>())
.addAll(split.dataFiles());
}
}
}
}

List<DataSplit> result = new ArrayList<>();
for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry :
grouped.entrySet()) {
BinaryRow partition = entry.getKey().getLeft();
int bucket = entry.getKey().getRight();
for (List<DataFileMeta> files :
reader.splitGenerator().splitForBatch(entry.getValue())) {
result.add(
DataSplit.builder()
.withSnapshot(tagResult.id())
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
.build());
}
}
return StartingScanner.fromPlan(
new SnapshotReader.Plan() {
@Override
public Long watermark() {
return null;
}

@Override
public Long snapshotId() {
return tagResult.id();
}

@Override
public List<Split> splits() {
return (List) result;
}
});
} else {
Snapshot tagStart = tagManager.taggedSnapshot(start);
Snapshot tagEnd = tagManager.taggedSnapshot(end);

if (tagEnd.id() <= tagStart.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tagEnd.id(), start, tagStart.id()));
}

if (tagEnd.id() <= tagStart.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tagEnd.id(), start, tagStart.id()));
}
return StartingScanner.fromPlan(
reader.withSnapshot(tagEnd).readIncrementalDiff(tagStart));
}
}

private List<DataSplit> readSplits(SnapshotReader reader, Snapshot s) {
switch (scanMode) {
case CHANGELOG:
return readChangeLogSplits(reader, s);
case DELTA:
return readDeltaSplits(reader, s);
default:
throw new UnsupportedOperationException("Unsupported scan kind: " + scanMode);
}
}

return StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1));
private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s) {
if (s.commitKind() == Snapshot.CommitKind.OVERWRITE) {
// ignore OVERWRITE
return Collections.emptyList();
}
return (List) reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits();
}

@SuppressWarnings({"unchecked", "rawtypes"})
private List<DataSplit> readChangeLogSplits(SnapshotReader reader, Snapshot s) {
if (s.commitKind() == Snapshot.CommitKind.OVERWRITE) {
// ignore OVERWRITE
return Collections.emptyList();
}
return (List) reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -54,8 +55,8 @@
*/
public class BatchWriteGeneratorTagOperator<CommitT, GlobalCommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
SetupableStreamOperator,
BoundedOneInput {
SetupableStreamOperator,
BoundedOneInput {

private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-";

Expand Down Expand Up @@ -112,12 +113,24 @@ private void createTag() {
BATCH_WRITE_TAG_PREFIX
+ localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
try {
// If the tag already exists, delete the tag
// Check whether the tag exists
if (tagManager.tagExists(tagName)) {
tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
// When tag already exists, we want to consider different scenarios.
// If the table type is lookup or full-compaction, and there is no changelog created,
// then the data hasn't changed and compaction doesn't create a tag
CoreOptions.ChangelogProducer changelogProducer =
this.table.coreOptions().changelogProducer();
if (!((changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
|| changelogProducer
== CoreOptions.ChangelogProducer.FULL_COMPACTION)
&& snapshot.changelogRecordCount() == 0)) {
tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
tagManager.createTag(snapshot, tagName);
}
} else {
// If the tag does not exist, it is created
tagManager.createTag(snapshot, tagName);
}
// Create a new tag
tagManager.createTag(snapshot, tagName);
// Expire the tag
expireTag();
} catch (Exception e) {
Expand Down

0 comments on commit d014d21

Please sign in to comment.