Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 6, 2024
1 parent a5de1b2 commit 7100dc0
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
* Default implementation of {@link FileStoreCommit}.
*
Expand Down Expand Up @@ -707,10 +705,7 @@ public boolean tryCommitOnce(
@Nullable String newStatsFileName) {
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshotId)
: snapshotManager.branchSnapshotPath(branchName, newSnapshotId);
Path newSnapshotPath = snapshotManager.snapshotPath(branchName, newSnapshotId);

if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ public Path branchDirectory() {

/** Return the path string of a branch. */
public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) {
if (StringUtils.isBlank(branchName)) {
return tablePath.toString();
}
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
branchName = forwardBranchName(fileIO, tablePath, branchName);
}
Expand Down Expand Up @@ -122,10 +119,10 @@ public void createBranch(String branchName, String tagName) {
try {
// Copy the corresponding tag, snapshot and schema files into the branch directory
fileIO.copyFileUtf8(
tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName));
tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName));
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
snapshotManager.snapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
Expand Down Expand Up @@ -201,7 +198,7 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE
if (tag.getCreateTime() < fromTag.getCreateTime()) {
fileIO.copyFileUtf8(
tagManager.tagPath(tag.getTagName()),
tagManager.branchTagPath(branchName, tag.getTagName()));
tagManager.tagPath(branchName, tag.getTagName()));
}
}
// Copy snapshots.
Expand All @@ -216,7 +213,7 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE
if (snapshot.id() < fromSnapshot.id()) {
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
snapshotManager.snapshotPath(branchName, snapshot.id()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,48 +75,36 @@ public Path tablePath() {
}

public Path snapshotDirectory() {
return branchSnapshotDirectory(DEFAULT_MAIN_BRANCH);
return snapshotDirectory(DEFAULT_MAIN_BRANCH);
}

public Path snapshotPath(long snapshotId) {
return branchSnapshotPath(DEFAULT_MAIN_BRANCH, snapshotId);
public Path snapshotDirectory(String branchName) {
return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot");
}

public Path branchSnapshotDirectory(String branchName) {
return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot");
public Path snapshotPath(long snapshotId) {
return snapshotPath(DEFAULT_MAIN_BRANCH, snapshotId);
}

public Path branchSnapshotPath(String branchName, long snapshotId) {
public Path snapshotPath(String branchName, long snapshotId) {
return new Path(
getBranchPath(fileIO, tablePath, branchName)
+ "/snapshot/"
+ SNAPSHOT_PREFIX
+ snapshotId);
}

public Path snapshotPathByBranch(String branchName, long snapshotId) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotPath(snapshotId)
: branchSnapshotPath(branchName, snapshotId);
}

public Path snapshotDirByBranch(String branchName) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotDirectory()
: branchSnapshotDirectory(branchName);
}

public Snapshot snapshot(long snapshotId) {
return snapshot(DEFAULT_MAIN_BRANCH, snapshotId);
}

public Snapshot snapshot(String branchName, long snapshotId) {
Path snapshotPath = snapshotPathByBranch(branchName, snapshotId);
Path snapshotPath = snapshotPath(branchName, snapshotId);
return Snapshot.fromPath(fileIO, snapshotPath);
}

public boolean branchSnapshotExists(String branchName, long snapshotId) {
Path path = snapshotPathByBranch(branchName, snapshotId);
Path path = snapshotPath(branchName, snapshotId);
try {
return fileIO.exists(path);
} catch (IOException e) {
Expand Down Expand Up @@ -265,7 +253,7 @@ public Iterator<Snapshot> snapshots() throws IOException {
}

public Iterator<Snapshot> snapshotsWithBranch(String branchName) throws IOException {
return listVersionedFiles(fileIO, snapshotDirByBranch(branchName), SNAPSHOT_PREFIX)
return listVersionedFiles(fileIO, snapshotDirectory(branchName), SNAPSHOT_PREFIX)
.map(snapshotId -> snapshot(branchName, snapshotId))
.sorted(Comparator.comparingLong(Snapshot::id))
.iterator();
Expand Down Expand Up @@ -415,7 +403,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
}

private @Nullable Long findLatest(String branchName) throws IOException {
Path snapshotDir = snapshotDirByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
if (!fileIO.exists(snapshotDir)) {
return null;
}
Expand All @@ -433,7 +421,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
}

private @Nullable Long findEarliest(String branchName) throws IOException {
Path snapshotDir = snapshotDirByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
if (!fileIO.exists(snapshotDir)) {
return null;
}
Expand All @@ -452,7 +440,7 @@ public Long readHint(String fileName) {
}

public Long readHint(String fileName, String branchName) {
Path snapshotDir = snapshotDirByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
Path path = new Path(snapshotDir, fileName);
int retryNumber = 0;
while (retryNumber++ < READ_HINT_RETRY_NUM) {
Expand All @@ -472,14 +460,14 @@ public Long readHint(String fileName, String branchName) {

private Long findByListFiles(BinaryOperator<Long> reducer, String branchName)
throws IOException {
Path snapshotDir = snapshotDirByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX)
.reduce(reducer)
.orElse(null);
}

public void deleteLatestHint(String branchName) throws IOException {
Path snapshotDir = snapshotDirByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
Path hintFile = new Path(snapshotDir, LATEST);
fileIO.delete(hintFile, false);
}
Expand All @@ -502,7 +490,7 @@ public void commitEarliestHint(long snapshotId, String branchName) throws IOExce

private void commitHint(long snapshotId, String fileName, String branchName)
throws IOException {
Path snapshotDir = snapshotDirByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
Path hintFile = new Path(snapshotDir, fileName);
fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,20 @@ public TagManager(FileIO fileIO, Path tablePath) {

/** Return the root Directory of tags. */
public Path tagDirectory() {
return branchTagDirectory(DEFAULT_MAIN_BRANCH);
return tagDirectory(DEFAULT_MAIN_BRANCH);
}

public Path branchTagDirectory(String branchName) {
public Path tagDirectory(String branchName) {
return new Path(getBranchPath(fileIO, tablePath, branchName) + "/tag");
}

/** Return the path of a tag. */
public Path tagPath(String tagName) {
return branchTagPath(DEFAULT_MAIN_BRANCH, tagName);
return tagPath(DEFAULT_MAIN_BRANCH, tagName);
}

/** Return the path of a tag in branch. */
public Path branchTagPath(String branchName, String tagName) {
public Path tagPath(String branchName, String tagName) {
return new Path(
getBranchPath(fileIO, tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
}
Expand Down Expand Up @@ -201,7 +201,7 @@ private void doClean(

/** Check if a tag exists. */
public boolean branchTagExists(String branchName, String tagName) {
Path path = branchTagPath(branchName, tagName);
Path path = tagPath(branchName, tagName);
try {
return fileIO.exists(path);
} catch (IOException e) {
Expand Down Expand Up @@ -276,9 +276,7 @@ public SortedMap<Snapshot, List<String>> tagsWithBranch(
try {

Path tagDirectory =
StringUtils.isBlank(branchName)
? tagDirectory()
: branchTagDirectory(branchName);
StringUtils.isBlank(branchName) ? tagDirectory() : tagDirectory(branchName);
List<Path> paths =
listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX)
.map(FileStatus::getPath)
Expand Down Expand Up @@ -335,7 +333,7 @@ public List<TableTag> branchTableTags(String branchName) {
Path tagDirectory =
branchName.equals(DEFAULT_MAIN_BRANCH)
? tagDirectory()
: branchTagDirectory(branchName);
: tagDirectory(branchName);

List<Pair<Path, Long>> paths =
listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,15 +949,14 @@ public void testCreateBranch() throws Exception {
// verify test-tag in test-branch is equal to snapshot 2
Snapshot branchTag =
Snapshot.fromPath(
new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag"));
new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag"));
assertThat(branchTag.equals(snapshot2)).isTrue();

// verify snapshot in test-branch is equal to snapshot 2
SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath);
Snapshot branchSnapshot =
Snapshot.fromPath(
new TraceableFileIO(),
snapshotManager.branchSnapshotPath("test-branch", 2));
new TraceableFileIO(), snapshotManager.snapshotPath("test-branch", 2));
assertThat(branchSnapshot.equals(snapshot2)).isTrue();

// verify schema in test-branch is equal to schema 0
Expand Down

0 comments on commit 7100dc0

Please sign in to comment.