Skip to content

Commit

Permalink
[Core] Support branch batch/streaming read and write (apache#2748)
Browse files Browse the repository at this point in the history
[Core] Support branch batch/streaming read and write
  • Loading branch information
TaoZex authored Feb 7, 2024
1 parent e8aa707 commit 9bd6020
Show file tree
Hide file tree
Showing 25 changed files with 451 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
* Base {@link FileStore} implementation.
*
Expand Down Expand Up @@ -169,6 +171,10 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {

@Override
public FileStoreCommitImpl newCommit(String commitUser) {
return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
}

public FileStoreCommitImpl newCommit(String commitUser, String branchName) {
return new FileStoreCommitImpl(
fileIO,
schemaManager,
Expand All @@ -186,6 +192,7 @@ public FileStoreCommitImpl newCommit(String commitUser) {
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
newKeyComparator(),
branchName,
newStatsFileHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/** {@link FileStore} for reading and writing {@link InternalRow}. */
public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
Expand Down Expand Up @@ -69,7 +70,11 @@ public BucketMode bucketMode() {

@Override
public AppendOnlyFileStoreScan newScan() {
return newScan(false);
return newScan(DEFAULT_MAIN_BRANCH);
}

public AppendOnlyFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
}

@Override
Expand Down Expand Up @@ -99,12 +104,12 @@ public AppendOnlyFileStoreWrite newWrite(
rowType,
pathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
options,
tableName);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite) {
private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -138,7 +143,8 @@ public void pushdown(Predicate predicate) {
manifestListFactory(forWrite),
options.bucket(),
forWrite,
options.scanManifestParallelism());
options.scanManifestParallelism(),
branchName);
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public interface FileStore<T> extends Serializable {

FileStoreScan newScan();

FileStoreScan newScan(String branchName);

ManifestList.Factory manifestListFactory();

ManifestFile.Factory manifestFileFactory();
Expand All @@ -79,6 +81,8 @@ public interface FileStore<T> extends Serializable {

FileStoreCommit newCommit(String commitUser);

FileStoreCommit newCommit(String commitUser, String branchName);

SnapshotDeletion newSnapshotDeletion();

TagManager newTagManager();
Expand Down
14 changes: 10 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** {@link FileStore} for querying and updating {@link KeyValue}s. */
Expand Down Expand Up @@ -107,7 +108,11 @@ public BucketMode bucketMode() {

@Override
public KeyValueFileStoreScan newScan() {
return newScan(false);
return newScan(DEFAULT_MAIN_BRANCH);
}

public KeyValueFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
}

@Override
Expand Down Expand Up @@ -159,7 +164,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
pathFactory(),
format2PathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
indexFactory,
options,
keyValueFieldsExtractor,
Expand All @@ -182,7 +187,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
return pathFactoryMap;
}

private KeyValueFileStoreScan newScan(boolean forWrite) {
private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -212,7 +217,8 @@ public void pushdown(Predicate keyFilter) {
manifestListFactory(forWrite),
options.bucket(),
forWrite,
options.scanManifestParallelism());
options.scanManifestParallelism(),
branchName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private final Integer scanManifestParallelism;

private ScanMetrics scanMetrics = null;
private String branchName;

public AbstractFileStoreScan(
RowType partitionType,
Expand All @@ -94,7 +95,8 @@ public AbstractFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism) {
Integer scanManifestParallelism,
String branchName) {
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
Expand All @@ -105,6 +107,7 @@ public AbstractFileStoreScan(
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
this.scanManifestParallelism = scanManifestParallelism;
this.branchName = branchName;
}

@Override
Expand Down Expand Up @@ -245,7 +248,7 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
if (manifests == null) {
snapshot =
specifiedSnapshot == null
? snapshotManager.latestSnapshot()
? snapshotManager.latestSnapshot(branchName)
: specifiedSnapshot;
if (snapshot == null) {
manifests = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public AppendOnlyFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism) {
Integer scanManifestParallelism,
String branchName) {
super(
partitionType,
bucketFilter,
Expand All @@ -59,7 +60,8 @@ public AppendOnlyFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism);
scanManifestParallelism,
branchName);
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schemaId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
* Default implementation of {@link FileStoreCommit}.
*
Expand Down Expand Up @@ -112,6 +114,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final int manifestMergeMinCount;
private final boolean dynamicPartitionOverwrite;
@Nullable private final Comparator<InternalRow> keyComparator;
private final String branchName;

@Nullable private Lock lock;
private boolean ignoreEmptyCommit;
Expand All @@ -137,6 +140,7 @@ public FileStoreCommitImpl(
int manifestMergeMinCount,
boolean dynamicPartitionOverwrite,
@Nullable Comparator<InternalRow> keyComparator,
String branchName,
StatsFileHandler statsFileHandler) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
Expand All @@ -155,6 +159,8 @@ public FileStoreCommitImpl(
this.manifestMergeMinCount = manifestMergeMinCount;
this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
this.keyComparator = keyComparator;
this.branchName = branchName;

this.lock = null;
this.ignoreEmptyCommit = true;
this.commitMetrics = null;
Expand Down Expand Up @@ -233,7 +239,7 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot();
latestSnapshot = snapshotManager.latestSnapshot(branchName);
if (latestSnapshot != null) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
Expand All @@ -254,6 +260,7 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
safeLatestSnapshotId,
branchName,
null);
generatedSnapshot += 1;
}
Expand Down Expand Up @@ -283,6 +290,7 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
safeLatestSnapshotId,
branchName,
null);
generatedSnapshot += 1;
}
Expand Down Expand Up @@ -428,6 +436,7 @@ public void overwrite(
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
null,
branchName,
null);
generatedSnapshot += 1;
}
Expand Down Expand Up @@ -523,6 +532,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) {
Collections.emptyMap(),
Snapshot.CommitKind.ANALYZE,
null,
branchName,
statsFileName);
}

Expand Down Expand Up @@ -596,10 +606,11 @@ private int tryCommit(
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
@Nullable Long safeLatestSnapshotId,
String branchName,
@Nullable String statsFileName) {
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName);
cnt++;
if (tryCommitOnce(
tableFiles,
Expand All @@ -611,6 +622,7 @@ private int tryCommit(
commitKind,
latestSnapshot,
safeLatestSnapshotId,
branchName,
statsFileName)) {
break;
}
Expand Down Expand Up @@ -672,6 +684,7 @@ private int tryOverwrite(
Snapshot.CommitKind.OVERWRITE,
latestSnapshot,
null,
branchName,
null)) {
break;
}
Expand All @@ -690,10 +703,14 @@ public boolean tryCommitOnce(
Snapshot.CommitKind commitKind,
@Nullable Snapshot latestSnapshot,
@Nullable Long safeLatestSnapshotId,
String branchName,
@Nullable String newStatsFileName) {
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshotId)
: snapshotManager.branchSnapshotPath(branchName, newSnapshotId);

if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
Expand Down Expand Up @@ -775,7 +792,7 @@ public boolean tryCommitOnce(
newIndexManifest = indexManifest;
}

long latestSchemaId = schemaManager.latest().get().id();
long latestSchemaId = schemaManager.latest(branchName).get().id();

// write new stats or inherit from the previous snapshot
String statsFileName = null;
Expand Down Expand Up @@ -840,7 +857,7 @@ public boolean tryCommitOnce(
boolean committed =
fileIO.writeFileUtf8(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshotId);
snapshotManager.commitLatestHint(newSnapshotId, branchName);
}
return committed;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public KeyValueFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism) {
Integer scanManifestParallelism,
String branchName) {
super(
partitionType,
bucketFilter,
Expand All @@ -63,7 +64,8 @@ public KeyValueFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism);
scanManifestParallelism,
branchName);
this.fieldKeyStatsConverters =
new FieldStatsConverters(
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.Preconditions.checkState;
Expand Down Expand Up @@ -91,8 +92,16 @@ public SchemaManager withLock(@Nullable Lock lock) {

/** @return latest schema. */
public Optional<TableSchema> latest() {
return latest(DEFAULT_MAIN_BRANCH);
}

public Optional<TableSchema> latest(String branchName) {
Path directoryPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? schemaDirectory()
: branchSchemaDirectory(branchName);
try {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX)
.reduce(Math::max)
.map(this::schema);
} catch (IOException e) {
Expand Down Expand Up @@ -482,6 +491,10 @@ public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
}

public Path branchSchemaDirectory(String branchName) {
return new Path(getBranchPath(tableRoot, branchName) + "/schema");
}

public Path branchSchemaPath(String branchName, long schemaId) {
return new Path(
getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId);
Expand Down
Loading

0 comments on commit 9bd6020

Please sign in to comment.