Skip to content

Commit

Permalink
[core] Fix collecting wrong sorted runs when compacting on custom branch
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Mar 6, 2024
1 parent 7062d7a commit 56dc139
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ public AppendOnlyFileStoreWrite newWrite(String commitUser) {
@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH);
}

@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName) {
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
Expand All @@ -104,9 +110,10 @@ public AppendOnlyFileStoreWrite newWrite(
rowType,
pathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true, branchName).withManifestCacheFilter(manifestFilter),
options,
tableName);
tableName,
branchName);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public interface FileStore<T> extends Serializable {

FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter manifestFilter);

FileStoreWrite<T> newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName);

FileStoreCommit newCommit(String commitUser);

FileStoreCommit newCommit(String commitUser, String branchName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public KeyValueFileStoreWrite newWrite(String commitUser) {

@Override
public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH);
}

@Override
public KeyValueFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName) {
IndexMaintainer.Factory<KeyValue> indexFactory = null;
if (bucketMode() == BucketMode.DYNAMIC) {
indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
Expand All @@ -164,11 +170,12 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
pathFactory(),
format2PathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true, branchName).withManifestCacheFilter(manifestFilter),
indexFactory,
options,
keyValueFieldsExtractor,
tableName);
tableName,
branchName);
}

private Map<String, FileStorePathFactory> format2PathFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {

protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
private final String branchName;

protected AbstractFileStoreWrite(
String commitUser,
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<T> indexFactory,
String tableName,
int writerNumberMax) {
int writerNumberMax,
String branchName) {
this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
this.scan = scan;
Expand All @@ -93,6 +95,7 @@ protected AbstractFileStoreWrite(
this.writers = new HashMap<>();
this.tableName = tableName;
this.writerNumberMax = writerNumberMax;
this.branchName = branchName;
}

@Override
Expand Down Expand Up @@ -350,7 +353,7 @@ public WriterContainer<T> createWriterContainer(
}
}

Long latestSnapshotId = snapshotManager.latestSnapshotId();
Long latestSnapshotId = snapshotManager.latestSnapshotId(branchName);
List<DataFileMeta> restoreFiles = new ArrayList<>();
if (!ignorePreviousFiles && latestSnapshotId != null) {
restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ public AppendOnlyFileStoreWrite(
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
String tableName) {
super(commitUser, snapshotManager, scan, options, null, tableName);
String tableName,
String branchName) {
super(commitUser, snapshotManager, scan, options, null, tableName, branchName);
this.fileIO = fileIO;
this.read = read;
this.schemaId = schemaId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ public KeyValueFileStoreWrite(
@Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
CoreOptions options,
KeyValueFieldsExtractor extractor,
String tableName) {
super(commitUser, snapshotManager, scan, options, indexFactory, tableName);
String tableName,
String branchName) {
super(commitUser, snapshotManager, scan, options, indexFactory, tableName, branchName);
this.fileIO = fileIO;
this.keyType = keyType;
this.valueType = valueType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ public MemoryFileStoreWrite(
FileStoreScan scan,
CoreOptions options,
@Nullable IndexMaintainer.Factory<T> indexFactory,
String tableName) {
String tableName,
String branchName) {
super(
commitUser,
snapshotManager,
scan,
indexFactory,
tableName,
options.writeMaxWritersToSpill());
options.writeMaxWritersToSpill(),
branchName);
this.options = options;
this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.io.IOException;
import java.util.function.BiConsumer;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/** {@link FileStoreTable} for append table. */
class AppendOnlyFileStoreTable extends AbstractFileStoreTable {

Expand Down Expand Up @@ -135,9 +137,16 @@ public TableWriteImpl<InternalRow> newWrite(String commitUser) {
@Override
public TableWriteImpl<InternalRow> newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH);
}

@Override
public TableWriteImpl<InternalRow> newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName) {
// if this table is unaware-bucket table, we skip compaction and restored files searching
AppendOnlyFileStoreWrite writer =
store().newWrite(commitUser, manifestFilter).withBucketMode(bucketMode());
store().newWrite(commitUser, manifestFilter, branchName)
.withBucketMode(bucketMode());
return new TableWriteImpl<>(
writer,
createRowKeyExtractor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ default Optional<String> comment() {

TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter manifestFilter);

TableWriteImpl<?> newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName);

@Override
TableCommitImpl newCommit(String commitUser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/** {@link FileStoreTable} for primary key table. */
class PrimaryKeyFileStoreTable extends AbstractFileStoreTable {
Expand Down Expand Up @@ -184,12 +185,18 @@ public TableWriteImpl<KeyValue> newWrite(String commitUser) {
@Override
public TableWriteImpl<KeyValue> newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH);
}

@Override
public TableWriteImpl<KeyValue> newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName) {
TableSchema schema = schema();
CoreOptions options = store().options();
RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options);
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store().newWrite(commitUser, manifestFilter),
store().newWrite(commitUser, manifestFilter, branchName),
createRowKeyExtractor(),
record -> {
InternalRow row = record.row();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
table.newWrite(
commitUser,
(part, bucket) ->
state.stateValueFilter().filter(table.name(), part, bucket))
state.stateValueFilter().filter(table.name(), part, bucket),
table.coreOptions().branchName())
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
.withExecutionMode(isStreamingMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRow partition, int buc
null,
options,
EXTRACTOR,
tablePath.getName())
tablePath.getName(),
"main")
.createWriterContainer(partition, bucket, true)
.writer;
((MemoryOwner) writer)
Expand Down

0 comments on commit 56dc139

Please sign in to comment.