Skip to content

Commit

Permalink
[core] Introduce FileMonitorTable to stream read file changes (#2101)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Oct 10, 2023
1 parent bbb7c4f commit cc3f8b2
Show file tree
Hide file tree
Showing 13 changed files with 442 additions and 70 deletions.
41 changes: 14 additions & 27 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -671,12 +671,13 @@ public class CoreOptions implements Serializable {
"Full compaction will be constantly triggered after delta commits.");

@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<StreamingCompactionType> STREAMING_COMPACT =
key("streaming-compact")
.enumType(StreamingCompactionType.class)
.defaultValue(StreamingCompactionType.NONE)
public static final ConfigOption<StreamScanMode> STREAM_SCAN_MODE =
key("stream-scan-mode")
.enumType(StreamScanMode.class)
.defaultValue(StreamScanMode.NONE)
.withDescription(
"Only used to force TableScan to construct suitable 'StartingUpScanner' and 'FollowUpScanner' dedicated streaming compaction job.");
"Only used to force TableScan to construct suitable 'StartingUpScanner' and 'FollowUpScanner' "
+ "dedicated internal streaming scan.");

public static final ConfigOption<StreamingReadMode> STREAMING_READ_MODE =
key("streaming-read-mode")
Expand Down Expand Up @@ -1615,16 +1616,18 @@ public static StreamingReadMode fromValue(String value) {
}
}

/** Compaction type when trigger a compaction action. */
public enum StreamingCompactionType implements DescribedEnum {
NONE("none", "Not a streaming compaction."),
NORMAL("normal", "Compaction for traditional bucket table."),
BUCKET_UNAWARE("unaware", "Compaction for unaware bucket table.");
/** Inner stream scan mode for some internal requirements. */
public enum StreamScanMode implements DescribedEnum {
NONE("none", "No requirement."),
COMPACT_BUCKET_TABLE("compact-bucket-table", "Compaction for traditional bucket table."),
COMPACT_APPEND_NO_BUCKET(
"compact-append-no-bucket", "Compaction for append table with bucket unaware."),
FILE_MONITOR("file-monitor", "Monitor data file changes.");

private final String value;
private final String description;

StreamingCompactionType(String value, String description) {
StreamScanMode(String value, String description) {
this.value = value;
this.description = description;
}
Expand All @@ -1642,22 +1645,6 @@ public InlineElement getDescription() {
public String getValue() {
return value;
}

@VisibleForTesting
public static StreamingCompactionType fromValue(String value) {
for (StreamingCompactionType formatType : StreamingCompactionType.values()) {
if (formatType.value.equals(value)) {
return formatType;
}
}
throw new IllegalArgumentException(
String.format(
"Invalid format type %s, only support [%s]",
value,
StringUtils.join(
Arrays.stream(StreamingCompactionType.values()).iterator(),
",")));
}
}

/** Specifies this scan type for incremental scan . */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ private Map<String, String> compactScanType() {
return new HashMap<String, String>() {
{
put(
CoreOptions.STREAMING_COMPACT.key(),
CoreOptions.StreamingCompactionType.BUCKET_UNAWARE.getValue());
CoreOptions.STREAM_SCAN_MODE.key(),
CoreOptions.StreamScanMode.COMPACT_APPEND_NO_BUCKET.getValue());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,16 @@ public CoreOptions options() {

protected StartingScanner createStartingScanner(boolean isStreaming) {
SnapshotManager snapshotManager = snapshotReader.snapshotManager();
CoreOptions.StreamingCompactionType type =
options.toConfiguration().get(CoreOptions.STREAMING_COMPACT);
CoreOptions.StreamScanMode type =
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
switch (type) {
case NORMAL:
{
checkArgument(
isStreaming,
"Set 'streaming-compact' in batch mode. This is unexpected.");
return new ContinuousCompactorStartingScanner(snapshotManager);
}
case BUCKET_UNAWARE:
{
return new FullStartingScanner(snapshotManager);
}
case COMPACT_BUCKET_TABLE:
checkArgument(
isStreaming, "Set 'streaming-compact' in batch mode. This is unexpected.");
return new ContinuousCompactorStartingScanner(snapshotManager);
case COMPACT_APPEND_NO_BUCKET:
case FILE_MONITOR:
return new FullStartingScanner(snapshotManager);
}

// read from consumer id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.ContinuousAppendAndCompactFollowUpScanner;
Expand Down Expand Up @@ -185,17 +186,15 @@ private Plan nextPlan() {
}

private FollowUpScanner createFollowUpScanner() {
CoreOptions.StreamingCompactionType type =
options.toConfiguration().get(CoreOptions.STREAMING_COMPACT);
CoreOptions.StreamScanMode type =
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
switch (type) {
case NORMAL:
{
return new ContinuousCompactorFollowUpScanner();
}
case BUCKET_UNAWARE:
{
return new ContinuousAppendAndCompactFollowUpScanner();
}
case COMPACT_BUCKET_TABLE:
return new ContinuousCompactorFollowUpScanner();
case COMPACT_APPEND_NO_BUCKET:
return new ContinuousAppendAndCompactFollowUpScanner();
case FILE_MONITOR:
return new AllDeltaFollowUpScanner();
}

CoreOptions.ChangelogProducer changelogProducer = options.changelogProducer();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.source.snapshot;

import org.apache.paimon.Snapshot;
import org.apache.paimon.table.source.ScanMode;

/** {@link FollowUpScanner} for read all file changes. */
public class AllDeltaFollowUpScanner implements FollowUpScanner {

@Override
public boolean shouldScanSnapshot(Snapshot snapshot) {
return true;
}

@Override
public SnapshotReader.Plan scan(long snapshotId, SnapshotReader snapshotReader) {
return snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshotId).readChanges();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ public interface FollowUpScanner {
Plan scan(long snapshotId, SnapshotReader snapshotReader);

default Plan getOverwriteChangesPlan(long snapshotId, SnapshotReader snapshotReader) {
return snapshotReader.withSnapshot(snapshotId).readOverwrittenChanges();
return snapshotReader.withSnapshot(snapshotId).readChanges();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public interface SnapshotReader {
/** Get splits plan from snapshot. */
Plan read();

/** Get splits plan from an overwritten snapshot. */
Plan readOverwrittenChanges();
/** Get splits plan from file changes. */
Plan readChanges();

Plan readIncrementalDiff(Snapshot before);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,10 @@ public List<BinaryRow> partitions() {
.collect(Collectors.toList());
}

/** Get splits from an overwritten snapshot files. */
@Override
public Plan readOverwrittenChanges() {
public Plan readChanges() {
withMode(ScanMode.DELTA);
FileStoreScan.Plan plan = scan.plan();
long snapshotId = plan.snapshotId();

Snapshot snapshot = snapshotManager.snapshot(snapshotId);
if (snapshot.commitKind() != Snapshot.CommitKind.OVERWRITE) {
throw new IllegalStateException(
"Cannot read overwrite splits from a non-overwrite snapshot.");
}

Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
groupByPartFiles(plan.files(FileKind.DELETE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ public Plan read() {
}

@Override
public Plan readOverwrittenChanges() {
return snapshotReader.readOverwrittenChanges();
public Plan readChanges() {
return snapshotReader.readChanges();
}

@Override
Expand Down
Loading

0 comments on commit cc3f8b2

Please sign in to comment.