Skip to content

Commit

Permalink
fix conflicts.
Browse files Browse the repository at this point in the history
  • Loading branch information
discivigour committed Jul 31, 2024
1 parent 5bd3c6f commit e550821
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -294,7 +296,7 @@ public void alterTable(
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(identifier);
throw new TableNotExistException(identifier, branchName);
}

alterTableImpl(identifier, branchName, changes);
Expand Down Expand Up @@ -399,7 +401,21 @@ protected abstract TableSchema getDataTableSchema(Identifier identifier, String

@VisibleForTesting
public Path getDataTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getObjectName());
Optional<Pair<Identifier, String>> optionalBranchName =
getOriginalIdentifierAndBranch(identifier);
String branch = DEFAULT_MAIN_BRANCH;
if (optionalBranchName.isPresent()) {
identifier = optionalBranchName.get().getLeft();
branch = optionalBranchName.get().getRight();
}
Path databasePath = newDatabasePath(identifier.getDatabaseName());
if (branch.equals(DEFAULT_MAIN_BRANCH)) {
return new Path(databasePath, identifier.getObjectName());
} else {
return new Path(
databasePath + File.separator + identifier.getObjectName(),
"branch/" + BranchManager.BRANCH_PREFIX + branch);
}
}

private static Optional<Pair<Identifier, String>> getOriginalIdentifierAndBranch(
Expand Down Expand Up @@ -477,9 +493,21 @@ public static Path newTableLocation(String warehouse, Identifier identifier) {
"Table name[%s] cannot contain '%s' separator",
identifier.getObjectName(), SYSTEM_TABLE_SPLITTER));
}
return new Path(
newDatabasePath(warehouse, identifier.getDatabaseName()),
identifier.getObjectName());
Optional<Pair<Identifier, String>> optionalBranchName =
getOriginalIdentifierAndBranch(identifier);
String branch = DEFAULT_MAIN_BRANCH;
if (optionalBranchName.isPresent()) {
identifier = optionalBranchName.get().getLeft();
branch = optionalBranchName.get().getRight();
}
Path databasePath = newDatabasePath(warehouse, identifier.getDatabaseName());
if (branch.equals(DEFAULT_MAIN_BRANCH)) {
return new Path(databasePath, identifier.getObjectName());
} else {
return new Path(
databasePath + File.separator + identifier.getObjectName(),
"branch/" + BranchManager.BRANCH_PREFIX + branch);
}
}

public static Path newDatabasePath(String warehouse, String database) {
Expand Down
22 changes: 17 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
* This interface is responsible for reading and writing metadata such as database/table from a
* paimon catalog.
Expand Down Expand Up @@ -411,16 +413,26 @@ public Identifier identifier() {
/** Exception for trying to operate on a table that doesn't exist. */
class TableNotExistException extends Exception {

private static final String MSG = "Table %s does not exist.";

private static final String MSG_TABLE = "Table %s does not exist.";
private static final String MSG_BRANCH = "Branch %s does not exist.";
private final Identifier identifier;

public TableNotExistException(Identifier identifier) {
this(identifier, null);
this(identifier, null, DEFAULT_MAIN_BRANCH);
}

public TableNotExistException(Identifier identifier, Throwable cause) {
super(String.format(MSG, identifier.getFullName()), cause);
public TableNotExistException(Identifier identifier, String branchName) {
this(identifier, null, branchName);
}

public TableNotExistException(Identifier identifier, Throwable cause, String branchName) {
super(
branchName.equals(DEFAULT_MAIN_BRANCH)
? String.format(MSG_TABLE, identifier.getFullName())
: String.format(
MSG_BRANCH,
identifier.getFullName() + BRANCH_PREFIX + branchName),
cause);
this.identifier = identifier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public TableSchema getDataTableSchema(Identifier identifier, String branchName)
return s;
}
})
.orElseThrow(() -> new TableNotExistException(identifier));
.orElseThrow(() -> new TableNotExistException(identifier, branchName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.jdbc.JdbcCatalog;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
Expand All @@ -29,6 +30,7 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
Expand All @@ -38,6 +40,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -349,6 +353,49 @@ public void testGetTable() throws Exception {
.withMessage("Table non_existing_db.test_table does not exist.");
}

@Test
public void testGetDataTableLocation() {
Path path =
((JdbcCatalog) catalog)
.getDataTableLocation(Identifier.create("test_db", "test_table$branch_a"));
assertThat(path.toString())
.isEqualTo(
new File(
"file:/" + tempFile,
"test_db"
+ ".db"
+ File.separator
+ "test_table"
+ File.separator
+ "branch"
+ File.separator
+ BranchManager.BRANCH_PREFIX
+ "a")
.toString());
}

@Test
public void testNewTableLocation() {
Path path =
AbstractCatalog.newTableLocation(
String.valueOf(tempFile),
Identifier.create("test_db", "test_table$branch_a"));
assertThat(path.toString())
.isEqualTo(
new File(
String.valueOf(tempFile),
"test_db"
+ ".db"
+ File.separator
+ "test_table"
+ File.separator
+ "branch"
+ File.separator
+ BranchManager.BRANCH_PREFIX
+ "a")
.toString());
}

@Test
public void testDropTable() throws Exception {
catalog.createDatabase("test_db", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.table.planner.factories.TestValuesTableFactory;
Expand Down Expand Up @@ -278,6 +279,22 @@ public void testReadWriteBranch() throws Exception {
assertThat(rows).containsExactlyInAnyOrder(Row.of(2), Row.of(1));
}

@Test
public void testBranchNotExist() throws Exception {
// create table
sql("CREATE TABLE T (id INT)");
// insert data
batchSql("INSERT INTO T VALUES (1)");
// create tag
paimonTable("T").createTag("tag1", 1);
// create branch
paimonTable("T").createBranch("branch1", "tag1");
// call the FileSystemCatalog.getDataTableSchema() function
assertThatThrownBy(() -> paimonTable("T$branch_branch2"))
.isInstanceOf(Catalog.TableNotExistException.class)
.hasMessage("Branch %s does not exist.", "default.T$branch_branch2");
}

@Override
protected List<String> ddl() {
return Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.procedure

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest

class AlterBranchProcedureTest extends PaimonSparkTestBase with StreamTest {

import testImplicits._
test("Paimon Procedure: alter schema structure and test $branch syntax.") {
withTempDir {
checkpointDir =>
// define a change-log table and test `forEachBatch` api
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
val location = loadTable("T").location().toString

val inputData = MemoryStream[(Int, String)]
val stream = inputData
.toDS()
.toDF("a", "b")
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreachBatch {
(batch: Dataset[Row], _: Long) =>
batch.write.format("paimon").mode("append").save(location)
}
.start()

val query = () => spark.sql("SELECT * FROM T ORDER BY a")
try {
// snapshot-1
inputData.addData((1, "a"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Nil)

// snapshot-2
inputData.addData((2, "b"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)

// snapshot-3
inputData.addData((2, "b2"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)

val table = loadTable("T")
val branchManager = table.branchManager()

// create branch with snapshot
checkAnswer(
spark.sql(
"CALL paimon.sys.create_branch(table => 'test.T', branch => 'snapshot_branch', snapshot => 2)"),
Row(true) :: Nil)
assert(branchManager.branchExists("snapshot_branch"))

spark.sql("INSERT INTO T VALUES (1, 'APPLE'), (2,'DOG'), (2, 'horse')")
spark.sql("ALTER TABLE `T$branch_snapshot_branch` ADD COLUMNS(c INT)")
spark.sql(
"INSERT INTO `T$branch_snapshot_branch` VALUES " + "(1,'cherry', 100), (2,'bird', 200), (3, 'wolf', 400)")

checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, "APPLE") :: Row(2, "horse") :: Nil)
checkAnswer(
spark.sql("SELECT * FROM `T$branch_snapshot_branch` ORDER BY a, b,c"),
Row(1, "cherry", 100) :: Row(2, "bird", 200) :: Row(3, "wolf", 400) :: Nil)
assert(branchManager.branchExists("snapshot_branch"))
}
}
}
}

0 comments on commit e550821

Please sign in to comment.