Skip to content

Commit

Permalink
support flink write/read branch apache#2861
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 15, 2024
1 parent 35acc4c commit 14747ac
Show file tree
Hide file tree
Showing 125 changed files with 2,587 additions and 585 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,11 @@
<td>String</td>
<td>The warehouse root path of catalog.</td>
</tr>
<tr>
<td><h5>branch</h5></td>
<td style="word-wrap: break-word;">main</td>
<td>String</td>
<td>Specify the branch submitted by the schema.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,10 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<String> BRANCH =
key("branch")
.stringType()
.defaultValue("main")
.withDescription("Specify the branch submitted by the schema.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public FileStoreCommitImpl newCommit(String commitUser, String branchName) {
manifestFileFactory(),
manifestListFactory(),
indexManifestFileFactory(),
newScan(),
newScan(branchName),
options.bucket(),
options.manifestTargetSize(),
options.manifestFullCompactionThresholdSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -88,14 +89,37 @@ public AppendOnlyFileStoreRead newRead() {
pathFactory());
}

@Override
public AppendOnlyFileStoreRead newRead(String branchName) {
return new AppendOnlyFileStoreRead(
fileIO,
schemaManager,
schemaId,
rowType,
FileFormatDiscover.of(options),
pathFactory(),
branchName);
}

@Override
public AppendOnlyFileStoreWrite newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, DEFAULT_MAIN_BRANCH);
}

@Override
public FileStoreWrite<InternalRow> newWrite(String commitUser, String branch) {
return newWrite(commitUser, null, branch);
}

@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH);
}

@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName) {
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
Expand All @@ -104,9 +128,10 @@ public AppendOnlyFileStoreWrite newWrite(
rowType,
pathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true, branchName).withManifestCacheFilter(manifestFilter),
options,
tableName);
tableName,
branchName);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
Expand Down
7 changes: 7 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,17 @@ public interface FileStore<T> extends Serializable {

FileStoreRead<T> newRead();

FileStoreRead<T> newRead(String branchName);

FileStoreWrite<T> newWrite(String commitUser);

FileStoreWrite<T> newWrite(String commitUser, String branch);

FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter manifestFilter);

FileStoreWrite<T> newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName);

FileStoreCommit newCommit(String commitUser);

FileStoreCommit newCommit(String commitUser, String branchName);
Expand Down
46 changes: 43 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ public KeyValueFileStoreRead newRead() {
newReaderFactoryBuilder());
}

@Override
public KeyValueFileStoreRead newRead(String branchName) {
return new KeyValueFileStoreRead(
options,
schemaManager,
schemaId,
keyType,
valueType,
newKeyComparator(),
mfFactory,
newReaderFactoryBuilder(branchName),
branchName);
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
return KeyValueFileReaderFactory.builder(
fileIO,
Expand All @@ -143,13 +157,38 @@ public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
options);
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder(String branch) {
return KeyValueFileReaderFactory.builder(
fileIO,
schemaManager,
schemaId,
keyType,
valueType,
FileFormatDiscover.of(options),
pathFactory(),
keyValueFieldsExtractor,
options,
branch);
}

@Override
public KeyValueFileStoreWrite newWrite(String commitUser) {
return newWrite(commitUser, null);
return newWrite(commitUser, null, DEFAULT_MAIN_BRANCH);
}

@Override
public KeyValueFileStoreWrite newWrite(String commitUser, String branch) {
return newWrite(commitUser, null, branch);
}

@Override
public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH);
}

@Override
public KeyValueFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter, String branchName) {
IndexMaintainer.Factory<KeyValue> indexFactory = null;
if (bucketMode() == BucketMode.DYNAMIC) {
indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
Expand All @@ -173,12 +212,13 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
pathFactory(),
format2PathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true, branchName).withManifestCacheFilter(manifestFilter),
indexFactory,
deletionVectorsMaintainerFactory,
options,
keyValueFieldsExtractor,
tableName);
tableName,
branchName);
}

private Map<String, FileStorePathFactory> format2PathFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;
Expand All @@ -50,6 +51,7 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.BRANCH;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
Expand All @@ -66,6 +68,7 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
protected final String branchName;

@Nullable protected final LineageMetaFactory lineageMetaFactory;

Expand All @@ -74,6 +77,7 @@ protected AbstractCatalog(FileIO fileIO) {
this.lineageMetaFactory = null;
this.tableDefaultOptions = new HashMap<>();
this.catalogOptions = new Options();
branchName = BranchManager.DEFAULT_MAIN_BRANCH;
}

protected AbstractCatalog(FileIO fileIO, Options options) {
Expand All @@ -83,6 +87,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;
this.branchName = options.get(BRANCH);

if (lockEnabled()) {
checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ public boolean tableExists(Identifier identifier) {
}

private boolean tableExists(Path tablePath) {
return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
return new SchemaManager(fileIO, tablePath).listAllIds(branchName).size() > 0;
}

@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
return schemaManager(identifier)
.latest()
.latest(branchName)
.orElseThrow(() -> new TableNotExistException(identifier));
}

Expand Down Expand Up @@ -166,7 +166,7 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
schemaManager(identifier).commitChanges(changes);
schemaManager(identifier).commitChanges(branchName, changes);
}

private static <T> T uncheck(Callable<T> callable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.AsyncRecordReader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.BulkFormatMapping;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Projection;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class KeyValueFileReaderFactory {
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
private final String branch;

private KeyValueFileReaderFactory(
FileIO fileIO,
Expand All @@ -74,7 +76,8 @@ private KeyValueFileReaderFactory(
DataFilePathFactory pathFactory,
long asyncThreshold,
BinaryRow partition,
DeletionVector.Factory dvFactory) {
DeletionVector.Factory dvFactory,
String branch) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -86,6 +89,7 @@ private KeyValueFileReaderFactory(
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.dvFactory = dvFactory;
this.branch = branch;
}

public RecordReader<KeyValue> createRecordReader(
Expand All @@ -110,8 +114,8 @@ private RecordReader<KeyValue> createRecordReader(
() ->
bulkFormatMappingBuilder.build(
formatIdentifier,
schemaManager.schema(this.schemaId),
schemaManager.schema(schemaId));
schemaManager.schema(branch, this.schemaId),
schemaManager.schema(branch, schemaId));

BulkFormatMapping bulkFormatMapping =
reuseFormat
Expand Down Expand Up @@ -148,6 +152,30 @@ public static Builder builder(
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor,
CoreOptions options) {
return builder(
fileIO,
schemaManager,
schemaId,
keyType,
valueType,
formatDiscover,
pathFactory,
extractor,
options,
BranchManager.DEFAULT_MAIN_BRANCH);
}

public static Builder builder(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
RowType keyType,
RowType valueType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor,
CoreOptions options,
String branch) {
return new Builder(
fileIO,
schemaManager,
Expand All @@ -157,7 +185,8 @@ public static Builder builder(
formatDiscover,
pathFactory,
extractor,
options);
options,
branch);
}

/** Builder for {@link KeyValueFileReaderFactory}. */
Expand All @@ -178,6 +207,7 @@ public static class Builder {
private int[][] valueProjection;
private RowType projectedKeyType;
private RowType projectedValueType;
private String branch;

private Builder(
FileIO fileIO,
Expand All @@ -188,7 +218,8 @@ private Builder(
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
KeyValueFieldsExtractor extractor,
CoreOptions options) {
CoreOptions options,
String branch) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -202,6 +233,7 @@ private Builder(
this.options = options;
this.keyProjection = fullKeyProjection;
this.valueProjection = Projection.range(0, valueType.getFieldCount()).toNestedIndexes();
this.branch = branch;
applyProjection();
}

Expand All @@ -215,7 +247,8 @@ public Builder copyWithoutProjection() {
formatDiscover,
pathFactory,
extractor,
options);
options,
branch);
}

public Builder withKeyProjection(int[][] projection) {
Expand Down Expand Up @@ -263,7 +296,8 @@ public KeyValueFileReaderFactory build(
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition,
dvFactory);
dvFactory,
branch);
}

private void applyProjection() {
Expand Down
Loading

0 comments on commit 14747ac

Please sign in to comment.