Skip to content

Commit

Permalink
support flink write branch apache#2861
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 16, 2024
1 parent 35acc4c commit a4c2c4c
Show file tree
Hide file tree
Showing 19 changed files with 265 additions and 82 deletions.
17 changes: 17 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");

public static final ConfigOption<String> BRANCH_NAME =
key("branch-name")
.stringType()
.noDefaultValue()
.withDescription("Specify branch name.");

public static final ConfigOption<FileFormatType> FILE_FORMAT =
key("file.format")
.enumType(FileFormatType.class)
Expand Down Expand Up @@ -1115,6 +1121,17 @@ public Path path() {
return path(options.toMap());
}

public String branch() {
return branch(options.toMap());
}

public static String branch(Map<String, String> options) {
if (options.containsKey(BRANCH_NAME.key())) {
return options.get(BRANCH_NAME.key());
}
return "main";
}

public static Path path(Map<String, String> options) {
return new Path(options.get(PATH.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,7 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<String> BRANCH_NAME =
key("branch-name").stringType().defaultValue("main").withDescription("branch name");
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;
Expand All @@ -50,6 +51,7 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.BRANCH_NAME;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
Expand All @@ -66,6 +68,7 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
protected final String branchName;

@Nullable protected final LineageMetaFactory lineageMetaFactory;

Expand All @@ -74,6 +77,7 @@ protected AbstractCatalog(FileIO fileIO) {
this.lineageMetaFactory = null;
this.tableDefaultOptions = new HashMap<>();
this.catalogOptions = new Options();
branchName = BranchManager.DEFAULT_MAIN_BRANCH;
}

protected AbstractCatalog(FileIO fileIO, Options options) {
Expand All @@ -83,6 +87,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;
this.branchName = options.get(BRANCH_NAME);

if (lockEnabled()) {
checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public boolean tableExists(Identifier identifier) {
}

private boolean tableExists(Path tablePath) {
return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
return new SchemaManager(fileIO, tablePath).listAllIds(branchName).size() > 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,8 @@ public Optional<TableSchema> latest() {
}

public Optional<TableSchema> latest(String branchName) {
Path directoryPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? schemaDirectory()
: branchSchemaDirectory(branchName);
try {
return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX)
return listVersionedFiles(fileIO, schemaDirectory(branchName), SCHEMA_PREFIX)
.reduce(Math::max)
.map(this::schema);
} catch (IOException e) {
Expand All @@ -111,23 +107,36 @@ public Optional<TableSchema> latest(String branchName) {

/** List all schema. */
public List<TableSchema> listAll() {
return listAllIds().stream().map(this::schema).collect(Collectors.toList());
return listAll(DEFAULT_MAIN_BRANCH);
}

public List<TableSchema> listAll(String branchName) {
return listAllIds(branchName).stream().map(this::schema).collect(Collectors.toList());
}

/** List all schema IDs. */
public List<Long> listAllIds() {
return listAllIds(DEFAULT_MAIN_BRANCH);
}

/** List all schema IDs. */
public List<Long> listAllIds(String branchName) {
try {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
return listVersionedFiles(fileIO, schemaDirectory(branchName), SCHEMA_PREFIX)
.collect(Collectors.toList());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/** Create a new schema from {@link Schema}. */
public TableSchema createTable(Schema schema) throws Exception {
return createTable(schema, DEFAULT_MAIN_BRANCH);
}

/** Create a new schema from {@link Schema}. */
public TableSchema createTable(Schema schema, String branchName) throws Exception {
while (true) {
latest().ifPresent(
latest(branchName)
.ifPresent(
latest -> {
throw new IllegalStateException(
"Schema in filesystem exists, please use updating,"
Expand All @@ -151,20 +160,24 @@ public TableSchema createTable(Schema schema) throws Exception {
options,
schema.comment());

boolean success = commit(newSchema);
boolean success = commit(branchName, newSchema);
if (success) {
return newSchema;
}
}
}

/** Update {@link SchemaChange}s. */
public TableSchema commitChanges(SchemaChange... changes) throws Exception {
return commitChanges(Arrays.asList(changes));
return commitChanges(DEFAULT_MAIN_BRANCH, changes);
}

/** Update {@link SchemaChange}s. */
public TableSchema commitChanges(String branchName, SchemaChange... changes) throws Exception {
return commitChanges(branchName, Arrays.asList(changes));
}

/** Update {@link SchemaChange}s. */
public TableSchema commitChanges(List<SchemaChange> changes)
public TableSchema commitChanges(String branchName, List<SchemaChange> changes)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
while (true) {
Expand Down Expand Up @@ -361,7 +374,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
newComment);

try {
boolean success = commit(newSchema);
boolean success = commit(branchName, newSchema);
if (success) {
return newSchema;
}
Expand Down Expand Up @@ -455,9 +468,13 @@ private void updateColumn(

@VisibleForTesting
boolean commit(TableSchema newSchema) throws Exception {
SchemaValidation.validateTableSchema(newSchema);
return commit(DEFAULT_MAIN_BRANCH, newSchema);
}

Path schemaPath = toSchemaPath(newSchema.id());
@VisibleForTesting
boolean commit(String branchName, TableSchema newSchema) throws Exception {
SchemaValidation.validateTableSchema(newSchema);
Path schemaPath = branchSchemaPath(branchName, newSchema.id());
Callable<Boolean> callable = () -> fileIO.writeFileUtf8(schemaPath, newSchema.toString());
if (lock == null) {
return callable.call();
Expand Down Expand Up @@ -486,20 +503,35 @@ private Path schemaDirectory() {
return new Path(tableRoot + "/schema");
}

public Path schemaDirectory(String branchName) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? schemaDirectory()
: new Path(getBranchPath(tableRoot, branchName) + "/schema");
}

@VisibleForTesting
public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
}

public Path branchSchemaDirectory(String branchName) {
return new Path(getBranchPath(tableRoot, branchName) + "/schema");
}

public Path branchSchemaPath(String branchName, long schemaId) {
return new Path(
getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId);
return branchName.equals(DEFAULT_MAIN_BRANCH)
? toSchemaPath(schemaId)
: new Path(
getBranchPath(tableRoot, branchName)
+ "/schema/"
+ SCHEMA_PREFIX
+ schemaId);
}

/**
* Delete schema with specific id.
*
* @param schemaId the schema id to delete.
*/
public void deleteSchema(String branchName, long schemaId) {
fileIO.deleteQuietly(branchSchemaPath(branchName, schemaId));
}
/**
* Delete schema with specific id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.BranchManager;

import java.io.IOException;
import java.io.UncheckedIOException;

import static org.apache.paimon.CoreOptions.BRANCH_NAME;
import static org.apache.paimon.CoreOptions.PATH;

/** Factory to create {@link FileStoreTable}. */
Expand All @@ -53,9 +55,13 @@ public static FileStoreTable create(FileIO fileIO, Path path) {

public static FileStoreTable create(FileIO fileIO, Options options) {
Path tablePath = CoreOptions.path(options);
String branchName = BranchManager.DEFAULT_MAIN_BRANCH;
if (options.contains(BRANCH_NAME)) {
branchName = options.get(BRANCH_NAME);
}
TableSchema tableSchema =
new SchemaManager(fileIO, tablePath)
.latest()
.latest(branchName)
.orElseThrow(
() ->
new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void createBranch(String branchName, String tagName) {
try {
// Copy the corresponding tag, snapshot and schema files into the branch directory
fileIO.copyFileUtf8(
tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName));
tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName));
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ public Snapshot snapshot(String branchName, long snapshotId) {
}

public boolean snapshotExists(long snapshotId) {
Path path = snapshotPath(snapshotId);
return snapshotExists(DEFAULT_MAIN_BRANCH, snapshotId);
}

public boolean snapshotExists(String branchName, long snapshotId) {
Path path = snapshotPathByBranch(branchName, snapshotId);
try {
return fileIO.exists(path);
} catch (IOException e) {
Expand Down Expand Up @@ -149,6 +153,11 @@ public boolean snapshotExists(long snapshotId) {
return snapshotId == null ? null : snapshot(snapshotId);
}

public @Nullable Snapshot earliestSnapshot(String branchName) {
Long snapshotId = earliestSnapshotId(branchName);
return snapshotId == null ? null : snapshot(snapshotId);
}

public @Nullable Long earliestSnapshotId() {
return earliestSnapshotId(DEFAULT_MAIN_BRANCH);
}
Expand Down Expand Up @@ -318,13 +327,17 @@ public Optional<Snapshot> latestSnapshotOfUser(String user) {
return Optional.empty();
}

/** Find the snapshot of the specified identifiers written by the specified user. */
public List<Snapshot> findSnapshotsForIdentifiers(
@Nonnull String user, List<Long> identifiers) {
return findSnapshotsForIdentifiers(user, identifiers, DEFAULT_MAIN_BRANCH);
}
/** Find the snapshot of the specified identifiers written by the specified user. */
public List<Snapshot> findSnapshotsForIdentifiers(
@Nonnull String user, List<Long> identifiers, String branchName) {
if (identifiers.isEmpty()) {
return Collections.emptyList();
}
Long latestId = latestSnapshotId();
Long latestId = latestSnapshotId(branchName);
if (latestId == null) {
return Collections.emptyList();
}
Expand All @@ -338,7 +351,7 @@ public List<Snapshot> findSnapshotsForIdentifiers(
List<Snapshot> matchedSnapshots = new ArrayList<>();
Set<Long> remainingIdentifiers = new HashSet<>(identifiers);
for (long id = latestId; id >= earliestId && !remainingIdentifiers.isEmpty(); id--) {
Snapshot snapshot = snapshot(id);
Snapshot snapshot = snapshot(branchName, id);
if (user.equals(snapshot.commitUser())) {
if (remainingIdentifiers.remove(snapshot.commitIdentifier())) {
matchedSnapshots.add(snapshot);
Expand Down
Loading

0 comments on commit a4c2c4c

Please sign in to comment.