Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hzjhjjyy authored Jun 14, 2024
2 parents cbd29a4 + 8b4f3df commit 748fed3
Show file tree
Hide file tree
Showing 28 changed files with 899 additions and 274 deletions.
20 changes: 20 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ default void deleteQuietly(Path file) {
}
}

default void deleteFilesQuietly(List<Path> files) {
for (Path file : files) {
deleteQuietly(file);
}
}

default void deleteDirectoryQuietly(Path directory) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete " + directory.toString());
Expand Down Expand Up @@ -272,6 +278,20 @@ default boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOExceptio
return writeFileUtf8(targetPath, content);
}

/** Copy all files in sourceDirectory to directory targetDirectory. */
default void copyFilesUtf8(Path sourceDirectory, Path targetDirectory) throws IOException {
FileStatus[] fileStatuses = listStatus(sourceDirectory);
List<Path> copyFiles =
Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath())
.collect(Collectors.toList());
for (Path file : copyFiles) {
String fileName = file.getName();
Path targetPath = new Path(targetDirectory.toString() + "/" + fileName);
copyFileUtf8(file, targetPath);
}
}

/** Read file from {@link #overwriteFileUtf8} file. */
default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
int retryNumber = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,15 @@ public void restore(List<State<T>> states) {
}
}

public Map<BinaryRow, List<Integer>> getActiveBuckets() {
Map<BinaryRow, List<Integer>> result = new HashMap<>();
for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitions :
writers.entrySet()) {
result.put(partitions.getKey(), new ArrayList<>(partitions.getValue().keySet()));
}
return result;
}

private WriterContainer<T> getWriterWrapper(BinaryRow partition, int bucket) {
Map<Integer, WriterContainer<T>> buckets = writers.get(partition);
if (buckets == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ public void deleteBranch(String branchName) {
wrapped.deleteBranch(branchName);
}

@Override
public void mergeBranch(String branchName) {
privilegeChecker.assertCanInsert(identifier);
wrapped.mergeBranch(branchName);
}

@Override
public void replaceBranch(String fromBranch) {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.schema;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
Expand All @@ -26,6 +28,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -286,6 +289,15 @@ public static TableSchema fromJson(String json) {
return JsonSerdeUtil.fromJson(json, TableSchema.class);
}

public static TableSchema fromPath(FileIO fileIO, Path path) {
try {
String json = fileIO.readFileUtf8(path);
return TableSchema.fromJson(json);
} catch (IOException e) {
throw new RuntimeException("Fails to read schema from path " + path, e);
}
}

@Override
public String toString() {
return JsonSerdeUtil.toJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ public void deleteBranch(String branchName) {
branchManager().deleteBranch(branchName);
}

@Override
public void mergeBranch(String branchName) {
branchManager().mergeBranch(branchName);
}

@Override
public void replaceBranch(String fromBranch) {
branchManager().replaceBranch(fromBranch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ default void deleteBranch(String branchName) {
this.getClass().getSimpleName()));
}

@Override
default void mergeBranch(String branchName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support mergeBranch.",
this.getClass().getSimpleName()));
}

@Override
default void replaceBranch(String fromBranch) {
throw new UnsupportedOperationException(
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public interface Table extends Serializable {
@Experimental
void deleteBranch(String branchName);

/** Merge a branch to main branch. */
@Experimental
void mergeBranch(String branchName);

@Experimental
void replaceBranch(String fromBranch);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand All @@ -41,8 +42,10 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Manager for {@code Branch}. */
Expand Down Expand Up @@ -353,6 +356,78 @@ public boolean fileExists(Path path) {
}
}

public void mergeBranch(String branchName) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
"Branch name '%s' do not use in merge branch.",
branchName);
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName);

Long earliestSnapshotId = snapshotManager.copyWithBranch(branchName).earliestSnapshotId();
Snapshot earliestSnapshot =
snapshotManager.copyWithBranch(branchName).snapshot(earliestSnapshotId);
long earliestSchemaId = earliestSnapshot.schemaId();

try {
// Delete snapshot, schema, and tag from the main branch which occurs after
// earliestSnapshotId
List<Path> deleteSnapshotPaths =
listVersionedFileStatus(
fileIO, snapshotManager.snapshotDirectory(), "snapshot-")
.map(FileStatus::getPath)
.filter(
path ->
Snapshot.fromPath(fileIO, path).id()
>= earliestSnapshotId)
.collect(Collectors.toList());
List<Path> deleteSchemaPaths =
listVersionedFileStatus(fileIO, schemaManager.schemaDirectory(), "schema-")
.map(FileStatus::getPath)
.filter(
path ->
TableSchema.fromPath(fileIO, path).id()
>= earliestSchemaId)
.collect(Collectors.toList());
List<Path> deleteTagPaths =
listVersionedFileStatus(fileIO, tagManager.tagDirectory(), "tag-")
.map(FileStatus::getPath)
.filter(
path ->
Snapshot.fromPath(fileIO, path).id()
>= earliestSnapshotId)
.collect(Collectors.toList());

List<Path> deletePaths =
Stream.concat(
Stream.concat(
deleteSnapshotPaths.stream(),
deleteSchemaPaths.stream()),
deleteTagPaths.stream())
.collect(Collectors.toList());

// Delete latest snapshot hint
snapshotManager.deleteLatestHint();

fileIO.deleteFilesQuietly(deletePaths);
fileIO.copyFilesUtf8(
snapshotManager.copyWithBranch(branchName).snapshotDirectory(),
snapshotManager.snapshotDirectory());
fileIO.copyFilesUtf8(
schemaManager.copyWithBranch(branchName).schemaDirectory(),
schemaManager.schemaDirectory());
fileIO.copyFilesUtf8(
tagManager.copyWithBranch(branchName).tagDirectory(),
tagManager.tagDirectory());
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when merge branch '%s' (directory in %s).",
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}

/** Check if a branch exists. */
public boolean branchExists(String branchName) {
Path branchPath = branchPath(branchName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,12 @@ private Long findByListFiles(BinaryOperator<Long> reducer, Path dir, String pref
return listVersionedFiles(fileIO, dir, prefix).reduce(reducer).orElse(null);
}

public void deleteLatestHint() throws IOException {
Path snapshotDir = snapshotDirectory();
Path hintFile = new Path(snapshotDir, LATEST);
fileIO.delete(hintFile, false);
}

public void commitLatestHint(long snapshotId) throws IOException {
commitHint(snapshotId, LATEST, snapshotDirectory());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,136 @@ public void testDeleteBranch() throws Exception {
"Branch name 'branch1' doesn't exist."));
}

@Test
public void testMergeBranch() throws Exception {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);

// Verify branch1 and the main branch have the same data
assertThat(
getResult(
tableBranch.newRead(),
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");

// Test for unsupported branch name
assertThatThrownBy(() -> table.mergeBranch("test-branch"))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Branch name 'test-branch' doesn't exist."));

assertThatThrownBy(() -> table.mergeBranch("main"))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Branch name 'main' do not use in merge branch."));

// Write data to branch1
try (StreamTableWrite write = tableBranch.newWrite(commitUser);
StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
write.write(rowData(2, 20, 200L));
commit.commit(1, write.prepareCommit(false, 2));
}

// Validate data in branch1
assertThat(
getResult(
tableBranch.newRead(),
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset");

// Validate data in main branch not changed
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");

// Merge branch1 to main branch
table.mergeBranch(BRANCH_NAME);

// After merge branch1, verify branch1 and the main branch have the same data
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset");

// verify snapshot in branch1 and main branch is same
SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath);
Snapshot branchSnapshot =
Snapshot.fromPath(
new TraceableFileIO(),
snapshotManager.copyWithBranch(BRANCH_NAME).snapshotPath(2));
Snapshot snapshot =
Snapshot.fromPath(new TraceableFileIO(), snapshotManager.snapshotPath(2));
assertThat(branchSnapshot.equals(snapshot)).isTrue();

// verify schema in branch1 and main branch is same
SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath);
TableSchema branchSchema =
SchemaManager.fromPath(
new TraceableFileIO(),
schemaManager.copyWithBranch(BRANCH_NAME).toSchemaPath(0));
TableSchema schema0 = schemaManager.schema(0);
assertThat(branchSchema.equals(schema0)).isTrue();

// Write two rows data to branch1 again
try (StreamTableWrite write = tableBranch.newWrite(commitUser);
StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
write.write(rowData(3, 30, 300L));
write.write(rowData(4, 40, 400L));
commit.commit(2, write.prepareCommit(false, 3));
}

// Verify data in branch1
assertThat(
getResult(
tableBranch.newRead(),
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset",
"3|30|300|binary|varbinary|mapKey:mapVal|multiset",
"4|40|400|binary|varbinary|mapKey:mapVal|multiset");

// Verify data in main branch not changed
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset");

// Merge branch1 to main branch again
table.mergeBranch("branch1");

// Verify data in main branch is same to branch1
assertThat(
getResult(
table.newRead(),
toSplits(table.newSnapshotReader().read().dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset",
"3|30|300|binary|varbinary|mapKey:mapVal|multiset",
"4|40|400|binary|varbinary|mapKey:mapVal|multiset");
}

@Test
public void testUnsupportedTagName() throws Exception {
FileStoreTable table = createFileStoreTable();
Expand Down Expand Up @@ -1636,7 +1766,6 @@ protected void generateBranch(FileStoreTable table) throws Exception {
table.createBranch(BRANCH_NAME, "tag1");

// verify that branch1 file exist
TraceableFileIO fileIO = new TraceableFileIO();
BranchManager branchManager = table.branchManager();
assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue();

Expand Down
Loading

0 comments on commit 748fed3

Please sign in to comment.