Skip to content

Commit

Permalink
support flink write branch
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Apr 19, 2024
1 parent 85bd8a3 commit 541c591
Show file tree
Hide file tree
Showing 40 changed files with 1,127 additions and 269 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<td>Boolean</td>
<td>Whether to create underlying storage when reading and writing the table.</td>
</tr>
<tr>
<td><h5>branch</h5></td>
<td style="word-wrap: break-word;">"main"</td>
<td>String</td>
<td>Specify branch name.</td>
</tr>
<tr>
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
Expand Down
14 changes: 14 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");

public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");

public static final ConfigOption<FileFormatType> FILE_FORMAT =
key("file.format")
.enumType(FileFormatType.class)
Expand Down Expand Up @@ -1149,6 +1152,17 @@ public Path path() {
return path(options.toMap());
}

public String branch() {
return branch(options.toMap());
}

public static String branch(Map<String, String> options) {
if (options.containsKey(BRANCH.key())) {
return options.get(BRANCH.key());
}
return BRANCH.defaultValue();
}

public static Path path(Map<String, String> options) {
return new Path(options.get(PATH.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public FileStorePathFactory pathFactory() {

@Override
public SnapshotManager snapshotManager() {
return new SnapshotManager(fileIO, options.path());
return new SnapshotManager(fileIO, options.path(), options.branch());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/** {@link FileStore} for reading and writing {@link InternalRow}. */
public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
Expand Down Expand Up @@ -71,11 +70,7 @@ public BucketMode bucketMode() {

@Override
public AppendOnlyFileStoreScan newScan() {
return newScan(DEFAULT_MAIN_BRANCH);
}

public AppendOnlyFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
return newScan(false);
}

@Override
Expand Down Expand Up @@ -106,12 +101,12 @@ public AppendOnlyFileStoreWrite newWrite(
rowType,
pathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true).withManifestCacheFilter(manifestFilter),
options,
tableName);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
private AppendOnlyFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -146,7 +141,6 @@ public void pushdown(Predicate predicate) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
branchName,
options.fileIndexReadEnabled());
}

Expand Down
2 changes: 0 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public interface FileStore<T> extends Serializable {

FileStoreScan newScan();

FileStoreScan newScan(String branchName);

ManifestList.Factory manifestListFactory();

ManifestFile.Factory manifestFileFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** {@link FileStore} for querying and updating {@link KeyValue}s. */
Expand Down Expand Up @@ -112,11 +111,7 @@ public BucketMode bucketMode() {

@Override
public KeyValueFileStoreScan newScan() {
return newScan(DEFAULT_MAIN_BRANCH);
}

public KeyValueFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
return newScan(false);
}

@Override
Expand Down Expand Up @@ -185,7 +180,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
pathFactory(),
format2PathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
deletionVectorsMaintainerFactory,
options,
Expand All @@ -209,7 +204,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
return pathFactoryMap;
}

private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) {
private KeyValueFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -240,7 +235,6 @@ public void pushdown(Predicate keyFilter) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
branchName,
options.deletionVectorsEnabled(),
options.mergeEngine());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -66,6 +67,7 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
protected final String branchName;

@Nullable protected final LineageMetaFactory lineageMetaFactory;

Expand All @@ -74,6 +76,7 @@ protected AbstractCatalog(FileIO fileIO) {
this.lineageMetaFactory = null;
this.tableDefaultOptions = new HashMap<>();
this.catalogOptions = new Options();
branchName = BranchManager.DEFAULT_MAIN_BRANCH;
}

protected AbstractCatalog(FileIO fileIO, Options options) {
Expand All @@ -83,6 +86,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;
this.branchName = options.get(CoreOptions.BRANCH);
}

@Override
Expand Down Expand Up @@ -325,7 +329,12 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
return table;
} else {
return getDataTable(identifier);
Table table = getDataTable(identifier);
// Override branch option
if (!branchName.equals(BranchManager.DEFAULT_MAIN_BRANCH)) {
table.options().put(CoreOptions.BRANCH.key(), branchName);
}
return table;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public boolean tableExists(Identifier identifier) {
}

private boolean tableExists(Path tablePath) {
return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
return new SchemaManager(fileIO, tablePath, branchName).listAllIds().size() > 0;
}

@Override
Expand Down Expand Up @@ -153,7 +153,7 @@ private SchemaManager schemaManager(Identifier identifier) {
new RuntimeException(
"No lock context when lock is enabled."))))
.orElse(null);
return new SchemaManager(fileIO, path)
return new SchemaManager(fileIO, path, branchName)
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
updateTable(connections, catalogKey, fromTable, toTable);

Path fromPath = getDataTableLocation(fromTable);
if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
if (new SchemaManager(fileIO, fromPath, branchName).listAllIds().size() > 0) {
// Rename the file system's table directory. Maintain consistency between tables in
// the file system and tables in the Hive Metastore.
Path toPath = getDataTableLocation(toTable);
Expand Down Expand Up @@ -323,7 +323,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
return new SchemaManager(fileIO, tableLocation)
return new SchemaManager(fileIO, tableLocation, branchName)
.latest()
.orElseThrow(
() -> new RuntimeException("There is no paimon table in " + tableLocation));
Expand Down Expand Up @@ -374,7 +374,7 @@ public void close() throws Exception {
}

private SchemaManager getSchemaManager(Identifier identifier) {
return new SchemaManager(fileIO, getDataTableLocation(identifier))
return new SchemaManager(fileIO, getDataTableLocation(identifier), branchName)
.withLock(lock(identifier));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private final SchemaManager schemaManager;
private final TableSchema schema;
protected final ScanBucketFilter bucketKeyFilter;
private final String branchName;

private PartitionPredicate partitionFilter;
private Snapshot specifiedSnapshot = null;
Expand All @@ -98,8 +97,7 @@ public AbstractFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName) {
Integer scanManifestParallelism) {
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
Expand All @@ -111,7 +109,6 @@ public AbstractFileStoreScan(
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
this.scanManifestParallelism = scanManifestParallelism;
this.branchName = branchName;
}

@Override
Expand Down Expand Up @@ -368,7 +365,7 @@ private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
if (manifests == null) {
snapshot =
specifiedSnapshot == null
? snapshotManager.latestSnapshot(branchName)
? snapshotManager.latestSnapshot()
: specifiedSnapshot;
if (snapshot == null) {
manifests = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public AppendOnlyFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName,
boolean fileIndexReadEnabled) {
super(
partitionType,
Expand All @@ -74,8 +73,7 @@ public AppendOnlyFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism,
branchName);
scanManifestParallelism);
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id());
this.fileIndexReadEnabled = fileIndexReadEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void commit(
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot(branchName);
latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
Expand Down Expand Up @@ -654,7 +654,7 @@ private int tryCommit(
@Nullable String statsFileName) {
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName);
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
cnt++;
if (tryCommitOnce(
tableFiles,
Expand Down Expand Up @@ -754,7 +754,7 @@ public boolean tryCommitOnce(
Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshotId)
: snapshotManager.branchSnapshotPath(branchName, newSnapshotId);
: snapshotManager.snapshotPath(branchName, newSnapshotId);

if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
Expand Down Expand Up @@ -836,7 +836,7 @@ public boolean tryCommitOnce(
newIndexManifest = indexManifest;
}

long latestSchemaId = schemaManager.latest(branchName).get().id();
long latestSchemaId = schemaManager.latest().get().id();

// write new stats or inherit from the previous snapshot
String statsFileName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public KeyValueFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName,
boolean deletionVectorsEnabled,
MergeEngine mergeEngine) {
super(
Expand All @@ -75,8 +74,7 @@ public KeyValueFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism,
branchName);
scanManifestParallelism);
this.fieldKeyStatsConverters =
new FieldStatsConverters(
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),
Expand Down
Loading

0 comments on commit 541c591

Please sign in to comment.