Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix] hotfix branch manager gets schema, tag and snapshot paths method to reduce constant string usage. #4136

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
Expand Down Expand Up @@ -588,6 +589,13 @@ public Path toSchemaPath(long schemaId) {
return new Path(branchPath() + "/schema/" + SCHEMA_PREFIX + schemaId);
}

public List<Path> schemaPaths(Predicate<Long> predicate) throws IOException {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
.filter(predicate)
.map(this::toSchemaPath)
.collect(Collectors.toList());
}

/**
* Delete schema with specific id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand All @@ -29,12 +28,12 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Manager for {@code Branch}. */
Expand Down Expand Up @@ -87,20 +86,7 @@ public Path branchPath(String branchName) {

/** Create empty branch. */
public void createBranch(String branchName) {
checkArgument(
!isMainBranch(branchName),
String.format(
"Branch name '%s' is the default branch and cannot be used.",
DEFAULT_MAIN_BRANCH));
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(branchName),
"Branch name '%s' is blank.",
branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);
validateBranch(branchName);

try {
TableSchema latestSchema = schemaManager.latest().get();
Expand All @@ -118,21 +104,8 @@ branchName, branchPath(tablePath, branchName)),
}

public void createBranch(String branchName, String tagName) {
checkArgument(
!isMainBranch(branchName),
String.format(
"Branch name '%s' is the default branch and cannot be created.",
DEFAULT_MAIN_BRANCH));
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(branchName),
"Branch name '%s' is blank.",
branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
validateBranch(branchName);
checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);

Snapshot snapshot = tagManager.taggedSnapshot(tagName);

Expand Down Expand Up @@ -176,10 +149,7 @@ public void deleteBranch(String branchName) {
/** Check if path exists. */
public boolean fileExists(Path path) {
try {
if (fileIO.exists(path)) {
return true;
}
return false;
return fileIO.exists(path);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to determine if path '%s' exists.", path), e);
Expand All @@ -206,37 +176,15 @@ public void fastForward(String branchName) {
// Delete snapshot, schema, and tag from the main branch which occurs after
// earliestSnapshotId
List<Path> deleteSnapshotPaths =
listVersionedFileStatus(
fileIO, snapshotManager.snapshotDirectory(), "snapshot-")
.map(FileStatus::getPath)
.filter(
path ->
Snapshot.fromPath(fileIO, path).id()
>= earliestSnapshotId)
.collect(Collectors.toList());
List<Path> deleteSchemaPaths =
listVersionedFileStatus(fileIO, schemaManager.schemaDirectory(), "schema-")
.map(FileStatus::getPath)
.filter(
path ->
TableSchema.fromPath(fileIO, path).id()
>= earliestSchemaId)
.collect(Collectors.toList());
snapshotManager.snapshotPaths(id -> id >= earliestSnapshotId);
List<Path> deleteSchemaPaths = schemaManager.schemaPaths(id -> id >= earliestSchemaId);
List<Path> deleteTagPaths =
listVersionedFileStatus(fileIO, tagManager.tagDirectory(), "tag-")
.map(FileStatus::getPath)
.filter(
path ->
Snapshot.fromPath(fileIO, path).id()
>= earliestSnapshotId)
.collect(Collectors.toList());
tagManager.tagPaths(
path -> Snapshot.fromPath(fileIO, path).id() >= earliestSnapshotId);

List<Path> deletePaths =
Stream.concat(
Stream.concat(
deleteSnapshotPaths.stream(),
deleteSchemaPaths.stream()),
deleteTagPaths.stream())
Stream.of(deleteSnapshotPaths, deleteSchemaPaths, deleteTagPaths)
.flatMap(Collection::stream)
.collect(Collectors.toList());

// Delete latest snapshot hint
Expand Down Expand Up @@ -280,4 +228,21 @@ public List<String> branches() {
throw new RuntimeException(e);
}
}

private void validateBranch(String branchName) {
checkArgument(
!isMainBranch(branchName),
String.format(
"Branch name '%s' is the default branch and cannot be used.",
DEFAULT_MAIN_BRANCH));
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(branchName),
"Branch name '%s' is blank.",
branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,18 @@ public long snapshotCount() throws IOException {

public Iterator<Snapshot> snapshots() throws IOException {
return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
.map(id -> snapshot(id))
.map(this::snapshot)
.sorted(Comparator.comparingLong(Snapshot::id))
.iterator();
}

public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException {
return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
.filter(predicate)
.map(this::snapshotPath)
.collect(Collectors.toList());
}

public Iterator<Snapshot> snapshotsWithinRange(
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ public Path tagPath(String tagName) {
return new Path(branchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName);
}

public List<Path> tagPaths(Predicate<Path> predicate) throws IOException {
return listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
.map(FileStatus::getPath)
.filter(predicate)
.collect(Collectors.toList());
}

/** Create a tag from given snapshot and save it in the storage. */
public void createTag(
Snapshot snapshot,
Expand Down Expand Up @@ -307,10 +314,7 @@ public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
TreeMap<Snapshot, List<String>> tags =
new TreeMap<>(Comparator.comparingLong(Snapshot::id));
try {
List<Path> paths =
listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
.map(FileStatus::getPath)
.collect(Collectors.toList());
List<Path> paths = tagPaths(path -> true);

for (Path path : paths) {
String tagName = path.getName().substring(TAG_PREFIX.length());
Expand All @@ -335,10 +339,7 @@ public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
/** Get all {@link Tag}s. */
public List<Pair<Tag, String>> tagObjects() {
try {
List<Path> paths =
listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
.map(FileStatus::getPath)
.collect(Collectors.toList());
List<Path> paths = tagPaths(path -> true);
List<Pair<Tag, String>> tags = new ArrayList<>();
for (Path path : paths) {
String tagName = path.getName().substring(TAG_PREFIX.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ public void testUnsupportedBranchName() throws Exception {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Branch name 'main' is the default branch and cannot be created."));
"Branch name 'main' is the default branch and cannot be used."));

assertThatThrownBy(() -> table.createBranch("branch-1", "tag1"))
.satisfies(
Expand Down
Loading