Skip to content

Commit

Permalink
Merge pull request #12 from siyangzeng/tc-paimon-0.6-release
Browse files Browse the repository at this point in the history
先知场景功能适配
  • Loading branch information
wxplovecc authored Jan 4, 2024
2 parents 94104cf + d014d21 commit 7e3f523
Show file tree
Hide file tree
Showing 10 changed files with 533 additions and 22 deletions.
1 change: 1 addition & 0 deletions docs/content/maintenance/manage-tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Paimon supports automatic creation of tags in writing job.
You can set `'tag.automatic-creation'` to `process-time` or `watermark`:
- `process-time`: Create TAG based on the time of the machine.
- `watermark`: Create TAG based on the watermark of the Sink input.
- `batch`: In a batch processing scenario, a tag is generated after the current task is completed.

{{< hint info >}}
If you choose Watermark, you may need to specify the time zone of watermark, if watermark is not in the
Expand Down
4 changes: 2 additions & 2 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@
<td><h5>metastore.tag-to-partition.preview</h5></td>
<td style="word-wrap: break-word;">none</td>
<td><p>Enum</p></td>
<td>Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li></ul></td>
<td>Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li><li>"batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.</li></ul></td>
</tr>
<tr>
<td><h5>num-levels</h5></td>
Expand Down Expand Up @@ -573,7 +573,7 @@
<td><h5>tag.automatic-creation</h5></td>
<td style="word-wrap: break-word;">none</td>
<td><p>Enum</p></td>
<td>Whether to create tag automatically. And how to generate tags.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li></ul></td>
<td>Whether to create tag automatically. And how to generate tags.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li><li>"batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.</li></ul></td>
</tr>
<tr>
<td><h5>tag.callback.#.param</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1971,8 +1971,10 @@ public enum TagCreationMode implements DescribedEnum {
"Based on the time of the machine, create TAG once the processing time passes period time plus delay."),
WATERMARK(
"watermark",
"Based on the watermark of the input, create TAG once the watermark passes period time plus delay.");

"Based on the watermark of the input, create TAG once the watermark passes period time plus delay."),
BATCH(
"batch",
"In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.");
private final String value;
private final String description;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
return new IncrementalTagStartingScanner(
snapshotManager,
incrementalBetween.getLeft(),
incrementalBetween.getRight());
incrementalBetween.getRight(),
scanMode,
options.tagCreationMode());
}
} else {
return new IncrementalTimeStampStartingScanner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,175 @@

package org.apache.paimon.table.source.snapshot;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import java.util.*;

/** {@link StartingScanner} for incremental changes by tag. */
public class IncrementalTagStartingScanner extends AbstractStartingScanner {

private final String start;
private final String end;
private final ScanMode scanMode;
private final CoreOptions.TagCreationMode tagCreationMode;

public IncrementalTagStartingScanner(
SnapshotManager snapshotManager, String start, String end) {
SnapshotManager snapshotManager,
String start,
String end,
ScanMode scanMode,
CoreOptions.TagCreationMode tagCreationMode) {
super(snapshotManager);
this.start = start;
this.end = end;
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot startingSnapshot = tagManager.taggedSnapshot(start);
if (startingSnapshot != null) {
this.startingSnapshotId = startingSnapshot.id();
}
this.scanMode = scanMode;
this.tagCreationMode = tagCreationMode;
}

@Override
public Result scan(SnapshotReader reader) {
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot tag1 = tagManager.taggedSnapshot(start);
Snapshot tag2 = tagManager.taggedSnapshot(end);

if (tag2.id() <= tag1.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tag2.id(), start, tag1.id()));

if (tagCreationMode == CoreOptions.TagCreationMode.BATCH) {
Snapshot tagStart;
Snapshot tagEnd;
try {
tagStart = tagManager.taggedSnapshot(start);
} catch (IllegalArgumentException e) {
tagStart = null;
}
try {
tagEnd = tagManager.taggedSnapshot(end);
} catch (IllegalArgumentException e) {
tagEnd = null;
}

if (tagStart == null && tagEnd == null) {
throw new NullPointerException(
"The tags of the incremental query cannot all be empty");
}
Snapshot tagResult = tagEnd == null ? tagStart : tagEnd;
Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new HashMap<>();
// 兼容先知场景首次创建的情况
if (tagStart == null || tagEnd == null) {
List<DataSplit> currentSplits = readSplits(reader, tagResult);
for (DataSplit split : currentSplits) {
grouped.computeIfAbsent(
Pair.of(split.partition(), split.bucket()),
k -> new ArrayList<>())
.addAll(split.dataFiles());
}
} else {
if (tagEnd.id() <= tagStart.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tagEnd.id(), start, tagStart.id()));
}
SortedMap<Snapshot, String> tags = tagManager.tags();
for (Snapshot snapshot : tags.keySet()) {
if (snapshot.id() > tagStart.id() && snapshot.id() <= tagEnd.id()) {
List<DataSplit> currentSplits = readSplits(reader, snapshot);
for (DataSplit split : currentSplits) {
grouped.computeIfAbsent(
Pair.of(split.partition(), split.bucket()),
k -> new ArrayList<>())
.addAll(split.dataFiles());
}
}
}
}

List<DataSplit> result = new ArrayList<>();
for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry :
grouped.entrySet()) {
BinaryRow partition = entry.getKey().getLeft();
int bucket = entry.getKey().getRight();
for (List<DataFileMeta> files :
reader.splitGenerator().splitForBatch(entry.getValue())) {
result.add(
DataSplit.builder()
.withSnapshot(tagResult.id())
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
.build());
}
}
return StartingScanner.fromPlan(
new SnapshotReader.Plan() {
@Override
public Long watermark() {
return null;
}

@Override
public Long snapshotId() {
return tagResult.id();
}

@Override
public List<Split> splits() {
return (List) result;
}
});
} else {
Snapshot tagStart = tagManager.taggedSnapshot(start);
Snapshot tagEnd = tagManager.taggedSnapshot(end);

if (tagEnd.id() <= tagStart.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tagEnd.id(), start, tagStart.id()));
}

if (tagEnd.id() <= tagStart.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tagEnd.id(), start, tagStart.id()));
}
return StartingScanner.fromPlan(
reader.withSnapshot(tagEnd).readIncrementalDiff(tagStart));
}
}

private List<DataSplit> readSplits(SnapshotReader reader, Snapshot s) {
switch (scanMode) {
case CHANGELOG:
return readChangeLogSplits(reader, s);
case DELTA:
return readDeltaSplits(reader, s);
default:
throw new UnsupportedOperationException("Unsupported scan kind: " + scanMode);
}
}

return StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1));
private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s) {
if (s.commitKind() == Snapshot.CommitKind.OVERWRITE) {
// ignore OVERWRITE
return Collections.emptyList();
}
return (List) reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits();
}

@SuppressWarnings({"unchecked", "rawtypes"})
private List<DataSplit> readChangeLogSplits(SnapshotReader reader, Snapshot s) {
if (s.commitKind() == Snapshot.CommitKind.OVERWRITE) {
// ignore OVERWRITE
return Collections.emptyList();
}
return (List) reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private TagPreview(CoreOptions options) {
}

public static TagPreview create(CoreOptions options) {
if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE) {
if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE
&& options.tagToPartitionPreview() != CoreOptions.TagCreationMode.BATCH) {
return new TagPreview(options);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ static TagTimeExtractor createForTagPreview(CoreOptions options) {
static TagTimeExtractor create(CoreOptions.TagCreationMode mode, CoreOptions options) {
switch (mode) {
case NONE:
case BATCH:
return null;
case PROCESS_TIME:
return new ProcessTimeExtractor();
Expand Down
Loading

0 comments on commit 7e3f523

Please sign in to comment.