Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wxplovecc committed Sep 12, 2023
1 parent e6bd00c commit 3761866
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
<td><h5>incremental-between-scan-mode</h5></td>
<td style="word-wrap: break-word;">delta</td>
<td><p>Enum</p></td>
<td>Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot, 'delta' for scan newly changed files of a snapshot 'changelog' scan changelog files of a snapshot.<br /><br />Possible values:<ul><li>"delta": Scan newly changed files of a snapshot.</li><li>"changelog": Scan changelog files of a snapshot.</li></ul></td>
<td>Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot, 'delta' for scan newly changed files between snapshots, 'changelog' scan changelog files between snapshots.<br /><br />Possible values:<ul><li>"delta": Scan newly changed files between snapshots.</li><li>"changelog": Scan changelog files between snapshots.</li></ul></td>
</tr>
<tr>
<td><h5>incremental-between-timestamp</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,10 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<IncrementalBetweenScanMode> INCREMENTAL_BETWEEN_SCAN_MODE =
key("incremental-between-scan-mode")
.enumType(IncrementalBetweenScanMode.class)
.defaultValue(IncrementalBetweenScanMode.DELATA)
.defaultValue(IncrementalBetweenScanMode.DELTA)
.withDescription(
"Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot, "
+ "'delta' for scan newly changed files of a snapshot 'changelog' scan changelog files of a snapshot.");
+ "'delta' for scan newly changed files between snapshots, 'changelog' scan changelog files between snapshots.");

public static final ConfigOption<String> INCREMENTAL_BETWEEN_TIMESTAMP =
key("incremental-between-timestamp")
Expand Down Expand Up @@ -1641,8 +1641,8 @@ public static StreamingCompactionType fromValue(String value) {

/** Specifies this scan type for incremental scan . */
public enum IncrementalBetweenScanMode implements DescribedEnum {
DELATA("delta", "Scan newly changed files of a snapshot."),
CHANGELOG("changelog", "Scan changelog files of a snapshot.");
DELTA("delta", "Scan newly changed files between snapshots."),
CHANGELOG("changelog", "Scan changelog files between snapshots.");

private final String value;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
options.incrementalBetweenScanMode();
ScanMode scanMode;
switch (scanType) {
case DELATA:
case DELTA:
scanMode = ScanMode.DELTA;
break;
case CHANGELOG:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ public class IncrementalStartingScanner extends AbstractStartingScanner {

private long endingSnapshotId;

public IncrementalStartingScanner(SnapshotManager snapshotManager, long start, long end) {
private ScanMode scanMode;

public IncrementalStartingScanner(
SnapshotManager snapshotManager, long start, long end, ScanMode scanMode) {
super(snapshotManager);
this.startingSnapshotId = start;
this.endingSnapshotId = end;
this.scanMode = scanMode;
}

@Override
public Result scan(SnapshotReader reader) {
Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new HashMap<>();
for (long i = startingSnapshotId + 1; i < endingSnapshotId + 1; i++) {
List<DataSplit> splits = readDeltaSplits(reader, snapshotManager.snapshot(i));
List<DataSplit> splits = readSplits(reader, snapshotManager.snapshot(i));
for (DataSplit split : splits) {
grouped.computeIfAbsent(
Pair.of(split.partition(), split.bucket()), k -> new ArrayList<>())
Expand Down Expand Up @@ -98,6 +102,17 @@ public List<Split> splits() {
});
}

private List<DataSplit> readSplits(SnapshotReader reader, Snapshot s) {
switch (scanMode) {
case CHANGELOG:
return readChangeLogSplits(reader, s);
case DELTA:
return readDeltaSplits(reader, s);
default:
throw new UnsupportedOperationException("Unsupported scan kind: " + scanMode);
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s) {
if (s.commitKind() != CommitKind.APPEND) {
Expand All @@ -106,4 +121,13 @@ private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s) {
}
return (List) reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits();
}

@SuppressWarnings({"unchecked", "rawtypes"})
private List<DataSplit> readChangeLogSplits(SnapshotReader reader, Snapshot s) {
if (s.commitKind() == CommitKind.OVERWRITE) {
// ignore OVERWRITE
return Collections.emptyList();
}
return (List) reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@
package org.apache.paimon.table.source.snapshot;

import org.apache.paimon.Snapshot;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;

/** {@link StartingScanner} for incremental changes by timestamp. */
public class IncrementalTimeStampStartingScanner extends AbstractStartingScanner {

private final long startTimestamp;
private final long endTimestamp;
private final ScanMode scanMode;

public IncrementalTimeStampStartingScanner(
SnapshotManager snapshotManager, long startTimestamp, long endTimestamp) {
SnapshotManager snapshotManager,
long startTimestamp,
long endTimestamp,
ScanMode scanMode) {
super(snapshotManager);
this.startTimestamp = startTimestamp;
this.endTimestamp = endTimestamp;
this.scanMode = scanMode;
Snapshot startingSnapshot = snapshotManager.earlierOrEqualTimeMills(startTimestamp);
if (startingSnapshot != null) {
this.startingSnapshotId = startingSnapshot.id();
Expand All @@ -51,7 +57,8 @@ public Result scan(SnapshotReader reader) {
Snapshot endSnapshot = snapshotManager.earlierOrEqualTimeMills(endTimestamp);
Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() : endSnapshot.id();
IncrementalStartingScanner incrementalStartingScanner =
new IncrementalStartingScanner(snapshotManager, startSnapshotId, endSnapshotId);
new IncrementalStartingScanner(
snapshotManager, startSnapshotId, endSnapshotId, scanMode);
return incrementalStartingScanner.scan(reader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,21 @@ public void testScan() throws Exception {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);

IncrementalStartingScanner deltaScanner =
new IncrementalStartingScanner(1L, 4L, ScanMode.DELTA);
new IncrementalStartingScanner(snapshotManager, 1L, 4L, ScanMode.DELTA);
StartingScanner.ScannedResult deltaResult =
(StartingScanner.ScannedResult) deltaScanner.scan(snapshotManager, snapshotReader);
(StartingScanner.ScannedResult) deltaScanner.scan(snapshotReader);
assertThat(deltaResult.currentSnapshotId()).isEqualTo(4);
assertThat(getResult(table.newRead(), toSplits(deltaResult.splits())))
.hasSameElementsAs(Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I 3|40|500"));

IncrementalStartingScanner changeLogScanner =
new IncrementalStartingScanner(2L, 4L, ScanMode.CHANGELOG);
new IncrementalStartingScanner(snapshotManager, 1L, 4L, ScanMode.CHANGELOG);
StartingScanner.ScannedResult changeLogResult =
(StartingScanner.ScannedResult)
changeLogScanner.scan(snapshotManager, snapshotReader);
(StartingScanner.ScannedResult) changeLogScanner.scan(snapshotReader);
assertThat(changeLogResult.currentSnapshotId()).isEqualTo(4);
assertThat(getResult(table.newRead(), toSplits(changeLogResult.splits())))
.hasSameElementsAs(Arrays.asList("+U 3|40|500"));
.hasSameElementsAs(
Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I 3|40|400", "+U 3|40|500"));

write.close();
commit.close();
Expand Down

0 comments on commit 3761866

Please sign in to comment.