Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Support branch batch/streaming read and write #2748

Merged
merged 21 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,17 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
* Base {@link FileStore} implementation.
*
* @param <T> type of record to read and write.
*/
public abstract class AbstractFileStore<T> implements FileStore<T> {

public static final String DEFAULT_MAIN_BRANCH = "main";
FangYongs marked this conversation as resolved.
Show resolved Hide resolved

protected final FileIO fileIO;
protected final SchemaManager schemaManager;
protected final long schemaId;
Expand Down Expand Up @@ -169,6 +173,10 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {

@Override
public FileStoreCommitImpl newCommit(String commitUser) {
return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
}

public FileStoreCommitImpl newCommit(String commitUser, String branchName) {
return new FileStoreCommitImpl(
fileIO,
schemaManager,
Expand All @@ -186,6 +194,7 @@ public FileStoreCommitImpl newCommit(String commitUser) {
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
newKeyComparator(),
branchName,
newStatsFileHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ public BucketMode bucketMode() {

@Override
public AppendOnlyFileStoreScan newScan() {
return newScan(false);
return newScan(DEFAULT_MAIN_BRANCH);
}

public AppendOnlyFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
}

@Override
Expand Down Expand Up @@ -99,12 +103,12 @@ public AppendOnlyFileStoreWrite newWrite(
rowType,
pathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
options,
tableName);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite) {
private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -138,7 +142,8 @@ public void pushdown(Predicate predicate) {
manifestListFactory(forWrite),
options.bucket(),
forWrite,
options.scanManifestParallelism());
options.scanManifestParallelism(),
branchName);
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public interface FileStore<T> extends Serializable {

FileStoreScan newScan();

FileStoreScan newScan(String branchName);

ManifestList.Factory manifestListFactory();

ManifestFile.Factory manifestFileFactory();
Expand All @@ -79,6 +81,8 @@ public interface FileStore<T> extends Serializable {

FileStoreCommit newCommit(String commitUser);

FileStoreCommit newCommit(String commitUser, String branchName);

SnapshotDeletion newSnapshotDeletion();

TagManager newTagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ public BucketMode bucketMode() {

@Override
public KeyValueFileStoreScan newScan() {
return newScan(false);
return newScan(DEFAULT_MAIN_BRANCH);
}

public KeyValueFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
}

@Override
Expand Down Expand Up @@ -159,7 +163,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
pathFactory(),
format2PathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
indexFactory,
options,
keyValueFieldsExtractor,
Expand All @@ -182,7 +186,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
return pathFactoryMap;
}

private KeyValueFileStoreScan newScan(boolean forWrite) {
private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -212,7 +216,8 @@ public void pushdown(Predicate keyFilter) {
manifestListFactory(forWrite),
options.bucket(),
forWrite,
options.scanManifestParallelism());
options.scanManifestParallelism(),
branchName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private final Integer scanManifestParallelism;

private ScanMetrics scanMetrics = null;
private String branchName;

public AbstractFileStoreScan(
RowType partitionType,
Expand All @@ -95,7 +96,8 @@ public AbstractFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism) {
Integer scanManifestParallelism,
String branchName) {
this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType);
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
Expand All @@ -107,6 +109,7 @@ public AbstractFileStoreScan(
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
this.scanManifestParallelism = scanManifestParallelism;
this.branchName = branchName;
}

@Override
Expand Down Expand Up @@ -247,7 +250,7 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
if (manifests == null) {
snapshot =
specifiedSnapshot == null
? snapshotManager.latestSnapshot()
? snapshotManager.latestSnapshot(branchName)
: specifiedSnapshot;
if (snapshot == null) {
manifests = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public AppendOnlyFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism) {
Integer scanManifestParallelism,
String branchName) {
super(
partitionType,
bucketFilter,
Expand All @@ -57,7 +58,8 @@ public AppendOnlyFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism);
scanManifestParallelism,
branchName);
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schemaId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
* Default implementation of {@link FileStoreCommit}.
*
Expand Down Expand Up @@ -112,6 +114,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final int manifestMergeMinCount;
private final boolean dynamicPartitionOverwrite;
@Nullable private final Comparator<InternalRow> keyComparator;
private final String branchName;

@Nullable private Lock lock;
private boolean ignoreEmptyCommit;
Expand All @@ -137,6 +140,7 @@ public FileStoreCommitImpl(
int manifestMergeMinCount,
boolean dynamicPartitionOverwrite,
@Nullable Comparator<InternalRow> keyComparator,
String branchName,
StatsFileHandler statsFileHandler) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
Expand All @@ -155,6 +159,8 @@ public FileStoreCommitImpl(
this.manifestMergeMinCount = manifestMergeMinCount;
this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
this.keyComparator = keyComparator;
this.branchName = branchName;

this.lock = null;
this.ignoreEmptyCommit = true;
this.commitMetrics = null;
Expand Down Expand Up @@ -233,7 +239,7 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot();
latestSnapshot = snapshotManager.latestSnapshot(branchName);
if (latestSnapshot != null) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
Expand All @@ -254,6 +260,7 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
safeLatestSnapshotId,
branchName,
null);
generatedSnapshot += 1;
}
Expand Down Expand Up @@ -283,6 +290,7 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
safeLatestSnapshotId,
branchName,
null);
generatedSnapshot += 1;
}
Expand Down Expand Up @@ -428,6 +436,7 @@ public void overwrite(
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
null,
branchName,
null);
generatedSnapshot += 1;
}
Expand Down Expand Up @@ -523,6 +532,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) {
Collections.emptyMap(),
Snapshot.CommitKind.ANALYZE,
null,
branchName,
statsFileName);
}

Expand Down Expand Up @@ -596,10 +606,11 @@ private int tryCommit(
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
@Nullable Long safeLatestSnapshotId,
String branchName,
@Nullable String statsFileName) {
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName);
cnt++;
if (tryCommitOnce(
tableFiles,
Expand All @@ -611,6 +622,7 @@ private int tryCommit(
commitKind,
latestSnapshot,
safeLatestSnapshotId,
branchName,
statsFileName)) {
break;
}
Expand Down Expand Up @@ -672,6 +684,7 @@ private int tryOverwrite(
Snapshot.CommitKind.OVERWRITE,
latestSnapshot,
null,
branchName,
null)) {
break;
}
Expand All @@ -690,10 +703,14 @@ public boolean tryCommitOnce(
Snapshot.CommitKind commitKind,
@Nullable Snapshot latestSnapshot,
@Nullable Long safeLatestSnapshotId,
String branchName,
@Nullable String newStatsFileName) {
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshotId)
: snapshotManager.branchSnapshotPath(branchName, newSnapshotId);

if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
Expand Down Expand Up @@ -775,7 +792,7 @@ public boolean tryCommitOnce(
newIndexManifest = indexManifest;
}

long latestSchemaId = schemaManager.latest().get().id();
long latestSchemaId = schemaManager.latest(branchName).get().id();

// write new stats or inherit from the previous snapshot
String statsFileName = null;
Expand Down Expand Up @@ -840,7 +857,7 @@ public boolean tryCommitOnce(
boolean committed =
fileIO.writeFileUtf8(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshotId);
snapshotManager.commitLatestHint(newSnapshotId, branchName);
}
return committed;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {

private Predicate keyFilter;
private Predicate valueFilter;
private String branchName;
TaoZex marked this conversation as resolved.
Show resolved Hide resolved

public KeyValueFileStoreScan(
RowType partitionType,
Expand All @@ -51,7 +52,8 @@ public KeyValueFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism) {
Integer scanManifestParallelism,
String branchName) {
super(
partitionType,
bucketFilter,
Expand All @@ -61,7 +63,8 @@ public KeyValueFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism);
scanManifestParallelism,
branchName);
this.fieldKeyStatsConverters =
new FieldStatsConverters(
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.Preconditions.checkState;
Expand Down Expand Up @@ -91,8 +92,16 @@ public SchemaManager withLock(@Nullable Lock lock) {

/** @return latest schema. */
public Optional<TableSchema> latest() {
return latest(DEFAULT_MAIN_BRANCH);
}

public Optional<TableSchema> latest(String branchName) {
Path directoryPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? schemaDirectory()
: branchSchemaDirectory(branchName);
try {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX)
.reduce(Math::max)
.map(this::schema);
} catch (IOException e) {
Expand Down Expand Up @@ -482,6 +491,10 @@ public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
}

public Path branchSchemaDirectory(String branchName) {
return new Path(getBranchPath(tableRoot, branchName) + "/schema");
}

public Path branchSchemaPath(String branchName, long schemaId) {
return new Path(
getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId);
Expand Down
Loading