Skip to content

Commit

Permalink
[hive] HiveCatalog support branch (#3820)
Browse files Browse the repository at this point in the history
This closes #3820.

---------

Co-authored-by: tsreaper <[email protected]>
  • Loading branch information
discivigour and tsreaper authored Aug 1, 2024
1 parent 4154c2b commit 59143c1
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
Expand Down Expand Up @@ -68,7 +69,7 @@ public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable fallback)
RowType mainRowType = main.schema().logicalRowType();
RowType fallbackRowType = fallback.schema().logicalRowType();
Preconditions.checkArgument(
mainRowType.equals(fallbackRowType),
sameRowTypeIgnoreNullable(mainRowType, fallbackRowType),
"Branch %s and %s does not have the same row type.\n"
+ "Row type of branch %s is %s.\n"
+ "Row type of branch %s is %s.",
Expand Down Expand Up @@ -104,6 +105,20 @@ public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable fallback)
}
}

private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType fallbackRowType) {
if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) {
return false;
}
for (int i = 0; i < mainRowType.getFieldCount(); i++) {
DataType mainType = mainRowType.getFields().get(i).type();
DataType fallbackType = fallbackRowType.getFields().get(i).type();
if (!mainType.equalsIgnoreNullable(fallbackType)) {
return false;
}
}
return true;
}

@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
return new FallbackReadFileStoreTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;

Expand Down Expand Up @@ -392,8 +393,6 @@ private static boolean isPaimonTable(Table table) {

@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
assertMainBranch(identifier);

if (!tableExists(identifier)) {
throw new TableNotExistException(identifier);
}
Expand Down Expand Up @@ -529,12 +528,14 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
assertMainBranch(identifier);

final SchemaManager schemaManager = schemaManager(identifier);
// first commit changes to underlying files
TableSchema schema = schemaManager.commitChanges(changes);

// currently only changes to main branch affects metastore
if (!DEFAULT_MAIN_BRANCH.equals(identifier.getBranchNameOrDefault())) {
return;
}
try {
Table table =
clients.run(
Expand Down Expand Up @@ -777,7 +778,10 @@ private FieldSchema convertToFieldSchema(DataField dataField) {
}

private SchemaManager schemaManager(Identifier identifier) {
return new SchemaManager(fileIO, getDataTableLocation(identifier))
return new SchemaManager(
fileIO,
getDataTableLocation(identifier),
identifier.getBranchNameOrDefault())
.withLock(lock(identifier));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,58 @@ public void testFlinkWriteAndHiveRead() throws Exception {
"Cannot find table '`my_hive`.`test_db`.`hive_table`' in any of the catalogs [default_catalog, my_hive], nor as a temporary table.");
}

@Test
public void testFlinkCreateBranchAndHiveRead() throws Exception {
tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
tEnv.executeSql("ALTER TABLE `t$branch_test` SET ( 'primary-key' = 'a', 'bucket' = '1' )")
.await();
tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (c INT)").await();

tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'x1', 10), (2, 'x2', 20)").await();
tEnv.executeSql("INSERT INTO t VALUES (3, 'x3'), (4, 'x4')").await();
tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'x11', 11)").await();
tEnv.executeSql("INSERT INTO t VALUES (3, 'x33')").await();

assertThat(collect("SELECT * FROM t"))
.containsExactlyInAnyOrder(Row.of(3, "x3"), Row.of(3, "x33"), Row.of(4, "x4"));
assertThat(collect("SELECT * FROM `t$branch_test`"))
.containsExactlyInAnyOrder(Row.of(1, "x11", 11), Row.of(2, "x2", 20));
assertThat(hiveShell.executeQuery("SELECT * FROM t"))
.containsExactlyInAnyOrder("3\tx3", "3\tx33", "4\tx4");
}

@Test
public void testFallbackBranchRead() throws Exception {
tEnv.executeSql(
"CREATE TABLE t ( pt INT, a INT, b STRING ) PARTITIONED BY (pt) "
+ "WITH ( 'file.format' = 'avro' )")
.await();
tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
tEnv.executeSql(
"ALTER TABLE `t$branch_test` SET ( 'primary-key' = 'pt, a', 'bucket' = '1' )")
.await();
tEnv.executeSql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'test' )").await();

tEnv.executeSql(
"INSERT INTO `t$branch_test` VALUES "
+ "(1, 20, 'cat'), (1, 30, 'dog'), (2, 10, 'tiger'), (2, 20, 'wolf')")
.await();
tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')").await();
tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (2, 10, 'lion')").await();

assertThat(collect("SELECT * FROM t"))
.containsExactlyInAnyOrder(
Row.of(1, 10, "apple"),
Row.of(1, 20, "banana"),
Row.of(2, 10, "lion"),
Row.of(2, 20, "wolf"));
assertThat(hiveShell.executeQuery("SELECT * FROM t"))
.containsExactlyInAnyOrder(
"1\t10\tapple", "1\t20\tbanana", "2\t10\tlion", "2\t20\twolf");
}

/**
* Test flink writing and hive reading to compare partitions and non-partitions table results.
*/
Expand Down Expand Up @@ -1467,4 +1519,14 @@ protected List<Row> collect(String sql) throws Exception {
}
return result;
}

private List<String> collectString(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
while (it.hasNext()) {
result.add(it.next().toString());
}
}
return result;
}
}

0 comments on commit 59143c1

Please sign in to comment.