Skip to content

Commit

Permalink
[fix] Test fallback branch read on HiveCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Aug 1, 2024
1 parent e470bfb commit 0bc4e72
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
Expand Down Expand Up @@ -68,7 +69,7 @@ public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable fallback)
RowType mainRowType = main.schema().logicalRowType();
RowType fallbackRowType = fallback.schema().logicalRowType();
Preconditions.checkArgument(
mainRowType.equals(fallbackRowType),
sameRowTypeIgnoreNullable(mainRowType, fallbackRowType),
"Branch %s and %s does not have the same row type.\n"
+ "Row type of branch %s is %s.\n"
+ "Row type of branch %s is %s.",
Expand Down Expand Up @@ -104,6 +105,20 @@ public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable fallback)
}
}

private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType fallbackRowType) {
if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) {
return false;
}
for (int i = 0; i < mainRowType.getFieldCount(); i++) {
DataType mainType = mainRowType.getFields().get(i).type();
DataType fallbackType = fallbackRowType.getFields().get(i).type();
if (!mainType.equalsIgnoreNullable(fallbackType)) {
return false;
}
}
return true;
}

@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
return new FallbackReadFileStoreTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,10 @@ private FieldSchema convertToFieldSchema(DataField dataField) {
}

private SchemaManager schemaManager(Identifier identifier) {
return new SchemaManager(fileIO, getDataTableLocation(identifier))
return new SchemaManager(
fileIO,
getDataTableLocation(identifier),
identifier.getBranchNameOrDefault())
.withLock(lock(identifier));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,9 @@ private static Optional<TableSchema> getExistingSchema(
Path path = new Path(location);
Options options = HiveUtils.extractCatalogConfig(configuration);
options.set(CoreOptions.PATH, location);

CatalogContext context = CatalogContext.create(options, configuration);
try {
if (!options.contains(CoreOptions.BRANCH)) {
return new SchemaManager(FileIO.get(path, context), path).latest();
} else {
return new SchemaManager(
FileIO.get(path, context), path, options.get(CoreOptions.BRANCH))
.latest();
}

return new SchemaManager(FileIO.get(path, context), path).latest();
} catch (IOException e) {
LOG.warn(
"Failed to fetch Paimon table schema from path "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,78 +465,56 @@ public void testFlinkWriteAndHiveRead() throws Exception {

@Test
public void testFlinkCreateBranchAndHiveRead() throws Exception {
tEnv.executeSql(
"CREATE TABLE t ( "
+ "a INT, "
+ "b STRING"
+ ") WITH ( 'file.format' = 'avro' )")
tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
tEnv.executeSql("Call sys.create_branch('test_db.t','b1')").await();
tEnv.executeSql("INSERT INTO t$branch_b1 VALUES (1,'x1'), (2,'x2')").await();
tEnv.executeSql("INSERT INTO t VALUES (3,'x3')").await();
hiveShell.execute("SET paimon.branch=b1");
assertThat(hiveShell.executeQuery("SELECT * FROM t"))
.isEqualTo(Arrays.asList("1\tx1", "2\tx2"));
tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
tEnv.executeSql("ALTER TABLE `t$branch_test` SET ( 'primary-key' = 'a', 'bucket' = '1' )")
.await();
tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (c INT)").await();

tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'x1', 10), (2, 'x2', 20)").await();
tEnv.executeSql("INSERT INTO t VALUES (3, 'x3'), (4, 'x4')").await();
tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'x11', 11)").await();
tEnv.executeSql("INSERT INTO t VALUES (3, 'x33')").await();

tEnv.executeSql("Call sys.create_branch('test_db.t','b2')").await();
tEnv.executeSql("INSERT INTO t$branch_b2 VALUES (4,'x1'), (5,'x2')").await();
hiveShell.execute("SET paimon.branch=b2");
assertThat(collect("SELECT * FROM t"))
.containsExactlyInAnyOrder(Row.of(3, "x3"), Row.of(3, "x33"), Row.of(4, "x4"));
assertThat(collect("SELECT * FROM `t$branch_test`"))
.containsExactlyInAnyOrder(Row.of(1, "x11", 11), Row.of(2, "x2", 20));
assertThat(hiveShell.executeQuery("SELECT * FROM t"))
.isEqualTo(Arrays.asList("4\tx1", "5\tx2"));
hiveShell.execute("SET paimon.branch=null");
.containsExactlyInAnyOrder("3\tx3", "3\tx33", "4\tx4");
}

@Test
public void testFlinkAlterBranchAndHiveRead() throws Exception {
public void testFallbackBranchRead() throws Exception {
tEnv.executeSql(
"CREATE TABLE t ( "
+ "a INT, "
+ "b STRING"
+ ") WITH ( 'file.format' = 'avro' )")
"CREATE TABLE t ( pt INT, a INT, b STRING ) PARTITIONED BY (pt) "
+ "WITH ( 'file.format' = 'avro' )")
.await();
tEnv.executeSql("Call sys.create_branch('test_db.t','b1')").await();
tEnv.executeSql("INSERT INTO t$branch_b1 VALUES (1,'x1'), (2,'x2')").await();
tEnv.executeSql("INSERT INTO t VALUES (3,'x3')").await();
hiveShell.execute("SET paimon.branch=b1");
assertThat(hiveShell.executeQuery("SELECT * FROM t"))
.isEqualTo(Arrays.asList("1\tx1", "2\tx2"));
hiveShell.execute("SET paimon.branch=null");
tEnv.executeSql("ALTER TABLE `t$branch_b1` ADD (c INT)").await();
tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
tEnv.executeSql(
"INSERT INTO `t$branch_b1` VALUES "
+ "(1, 'x10', 100), (2, 'x11', 200), (2, 'x12', 400)")
"ALTER TABLE `t$branch_test` SET ( 'primary-key' = 'pt, a', 'bucket' = '1' )")
.await();
assertThat(collectString("SELECT * FROM t$branch_b1"))
tEnv.executeSql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'test' )").await();

tEnv.executeSql(
"INSERT INTO `t$branch_test` VALUES "
+ "(1, 20, 'cat'), (1, 30, 'dog'), (2, 10, 'tiger'), (2, 20, 'wolf')")
.await();
tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')").await();
tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (2, 10, 'lion')").await();

assertThat(collect("SELECT * FROM t"))
.containsExactlyInAnyOrder(
"+I[1, x10, 100]",
"+I[1, x1, null]",
"+I[2, x2, null]",
"+I[2, x11, 200]",
"+I[2, x12, 400]");
assertThat(collectString("SELECT * FROM t")).containsExactlyInAnyOrder("+I[3, x3]");
Row.of(1, 10, "apple"),
Row.of(1, 20, "banana"),
Row.of(2, 10, "lion"),
Row.of(2, 20, "wolf"));
assertThat(hiveShell.executeQuery("SELECT * FROM t"))
.containsExactlyInAnyOrder(
"1\t10\tapple", "1\t20\tbanana", "2\t10\tlion", "2\t20\twolf");
}

@Test
public void testHiveCreateAndFlinkCreateBranchAndHiveRead() throws Exception {
hiveShell.execute("SET hive.metastore.warehouse.dir=" + path);
hiveShell.execute(
"CREATE TABLE hive_test_table ( a INT, b STRING ) "
+ "STORED BY '"
+ PaimonStorageHandler.class.getName()
+ "'"
+ "TBLPROPERTIES ("
+ " 'primary-key'='a'"
+ ")");
tEnv.executeSql("Call sys.create_branch('test_db.hive_test_table','b1')").await();
tEnv.executeSql("INSERT INTO hive_test_table$branch_b1 VALUES (1,'x1'), (2,'x2')").await();
tEnv.executeSql("INSERT INTO hive_test_table VALUES (3,'x3')").await();
hiveShell.execute("SET paimon.branch=b1");
assertThat(hiveShell.executeQuery("SELECT * FROM hive_test_table"))
.isEqualTo(Arrays.asList("1\tx1", "2\tx2"));
hiveShell.execute("SET paimon.branch=null");
assertThat(hiveShell.executeQuery("SELECT * FROM hive_test_table"))
.isEqualTo(Collections.singletonList("3\tx3"));
}
/**
* Test flink writing and hive reading to compare partitions and non-partitions table results.
*/
Expand Down

0 comments on commit 0bc4e72

Please sign in to comment.