Skip to content

Commit

Permalink
HiveCatalog support upper case
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jun 6, 2024
1 parent 3b1810b commit 3eadd8b
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -231,10 +232,13 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
checkNotSystemTable(identifier, "createTable");
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
validateAutoCreateClose(schema.options());

if (!caseSensitive()) {
identifier = formatIdentifier(identifier);
}

if (!databaseExists(identifier.getDatabaseName())) {
throw new DatabaseNotExistException(identifier.getDatabaseName());
}
Expand All @@ -251,14 +255,25 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
createTableImpl(identifier, schema);
}

/** If identifier is case insensitive, format it to lower case. */
protected Identifier formatIdentifier(Identifier identifier) {
return new Identifier(
identifier.getDatabaseName().toLowerCase(Locale.ROOT),
identifier.getObjectName().toLowerCase(Locale.ROOT));
}

protected abstract void createTableImpl(Identifier identifier, Schema schema);

@Override
public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
checkNotSystemTable(fromTable, "renameTable");
checkNotSystemTable(toTable, "renameTable");
validateIdentifierNameCaseInsensitive(toTable);

if (!caseSensitive()) {
fromTable = formatIdentifier(fromTable);
toTable = formatIdentifier(toTable);
}

if (!tableExists(fromTable)) {
if (ignoreIfNotExists) {
Expand All @@ -281,9 +296,12 @@ public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
checkNotSystemTable(identifier, "alterTable");
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitiveInSchemaChange(changes);

if (!caseSensitive()) {
identifier = formatIdentifier(identifier);
}

if (!tableExists(identifier)) {
if (ignoreIfNotExists) {
return;
Expand Down Expand Up @@ -466,11 +484,6 @@ public static void validateCaseInsensitive(
type, illegalNames));
}

protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
validateCaseInsensitive(caseSensitive(), "Database", identifier.getDatabaseName());
validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName());
}

private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
List<String> fieldNames = new ArrayList<>();
for (SchemaChange change : changes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,22 +545,29 @@ public void repairDatabase(String databaseName) {
@Override
public void repairTable(Identifier identifier) throws TableNotExistException {
checkNotSystemTable(identifier, "repairTable");
validateIdentifierNameCaseInsensitive(identifier);

if (!caseSensitive()) {
identifier = formatIdentifier(identifier);
}

// Variable should be final in lambda expression
Identifier newIdentifier = identifier;

TableSchema tableSchema =
tableSchemaInFileSystem(getDataTableLocation(identifier))
.orElseThrow(() -> new TableNotExistException(identifier));
Table newTable = createHiveTable(identifier, tableSchema);
tableSchemaInFileSystem(getDataTableLocation(newIdentifier))
.orElseThrow(() -> new TableNotExistException(newIdentifier));
Table newTable = createHiveTable(newIdentifier, tableSchema);
try {
try {
Table table =
client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
client.getTable(
newIdentifier.getDatabaseName(), newIdentifier.getObjectName());
checkArgument(
isPaimonTable(table),
"Table %s is not a paimon table in hive metastore.",
identifier.getFullName());
newIdentifier.getFullName());
if (!newTable.getSd().getCols().equals(table.getSd().getCols())) {
alterTableToHms(table, identifier, tableSchema);
alterTableToHms(table, newIdentifier, tableSchema);
}
} catch (NoSuchObjectException e) {
// hive table does not exist.
Expand All @@ -572,9 +579,9 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
// Do not close client, it is for HiveCatalog
@SuppressWarnings("resource")
HiveMetastoreClient metastoreClient =
new HiveMetastoreClient(identifier, tableSchema, client);
new HiveMetastoreClient(newIdentifier, tableSchema, client);
List<BinaryRow> partitions =
getTable(identifier).newReadBuilder().newScan().listPartitions();
getTable(newIdentifier).newReadBuilder().newScan().listPartitions();
for (BinaryRow partition : partitions) {
metastoreClient.addPartition(partition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_TYPE_VALUE;
import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/** Tests for {@link HiveCatalog}. */
Expand Down Expand Up @@ -82,25 +82,31 @@ public void testListDatabasesWhenNoDatabases() {
}

@Test
public void testCheckIdentifierUpperCase() throws Exception {
catalog.createDatabase("test_db", false);
assertThatThrownBy(
() ->
catalog.createTable(
Identifier.create("TEST_DB", "new_table"),
DEFAULT_TABLE_SCHEMA,
false))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Database name [TEST_DB] cannot contain upper case in the catalog.");

assertThatThrownBy(
() ->
catalog.createTable(
Identifier.create("test_db", "NEW_TABLE"),
DEFAULT_TABLE_SCHEMA,
false))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Table name [NEW_TABLE] cannot contain upper case in the catalog.");
public void testSupportIdentifierUpperCase() throws Exception {
// create database with upper case
catalog.createDatabase("TEST_DB", false);
assertThat(catalog.listDatabases()).contains("test_db");

// create table with upper case
catalog.createTable(
Identifier.create("test_db", "INIT_TABLE"), DEFAULT_TABLE_SCHEMA, false);
assertTrue(catalog.tableExists(new Identifier("test_db", "init_table")));
assertThat(catalog.listTables("test_db")).containsExactly("init_table");

// alter table with upper-case
SchemaChange schemaChange = SchemaChange.setOption("hive.table.owner", "Hms");
catalog.alterTable(
Identifier.create("test_db", "INIT_TABLE"), Arrays.asList(schemaChange), false);
Table table = ((HiveCatalog) catalog).getHmsClient().getTable("test_db", "init_table");
Map<String, String> tableProperties = table.getParameters();
assertThat(tableProperties).containsEntry("table.owner", "Hms");

// rename table name with upper case
catalog.renameTable(
new Identifier("test_db", "init_table"),
new Identifier("test_db", "NEW_TABLE"),
false);
assertThat(catalog.listTables("test_db")).containsExactly("new_table");
}

private static final String HADOOP_CONF_DIR =
Expand Down Expand Up @@ -172,6 +178,12 @@ private void testHiveConfDirFromEnvImpl() {
assertThat(hiveConf.get("hive.metastore.uris")).isEqualTo("dummy-hms");
}

@Test
public void testCreateDbUpperCase() throws Exception {
catalog.createDatabase("DBB", false);
assertThat(catalog.listDatabases()).contains("dbb");
}

@Test
public void testAddHiveTableParameters() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,10 +650,7 @@ public void testRenameTable() throws Exception {
"Could not execute ALTER TABLE my_hive.test_db.t1 RENAME TO my_hive.test_db.t2");

// the target table name has upper case.
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO T1"))
.hasMessage("Table name [T1] cannot contain upper case in the catalog.");

tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
tEnv.executeSql("ALTER TABLE t1 RENAME TO T3").await();

// hive read
List<String> tables = hiveShell.executeQuery("SHOW TABLES");
Expand Down Expand Up @@ -804,15 +801,11 @@ public void testHiveLock() throws InterruptedException {
}

@Test
public void testUpperCase() {
assertThatThrownBy(
() ->
tEnv.executeSql(
"CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await())
.hasRootCauseMessage(
String.format(
"Table name [%s] cannot contain upper case in the catalog.", "T"));
public void testUpperCase() throws Exception {
tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await();
List<String> tables = hiveShell.executeQuery("SHOW TABLES");
assertThat(tables.contains("t")).isTrue();

assertThatThrownBy(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.spark.PaimonHiveTestBase

import org.apache.spark.sql.{Row, SparkSession}
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions

import scala.collection.JavaConverters.mapAsJavaMapConverter

abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {

test("Paimon DDL with hive catalog: create database with location and comment") {
Expand Down Expand Up @@ -125,6 +128,36 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
}
}

test("Paimon DDL with hive catalog: create/alter/rename table using upper case") {
Seq(paimonHiveCatalogName).foreach {
catalogName =>
spark.sql(s"USE $catalogName")
spark.sql(s"CREATE DATABASE paimon_db")
spark.sql(s"USE paimon_db")

// create table with upper case
spark.sql(
s"CREATE TABLE PAIMON_TBL (id int, name string, dt string) using paimon TBLPROPERTIES ('file.format' = 'parquet')")
checkAnswer(spark.sql("show tables").select("tableName"), Row("paimon_tbl") :: Nil)

// alter table with upper case
spark.sql(s"ALTER TABLE PAIMON_TBL SET TBLPROPERTIES ('write-only' = 'false')")
val options = rowsToMap(spark.sql("SELECT * FROM `paimon_tbl$options`").collect())
assertThat(options).containsEntry("write-only", "false");

// rename table with upper case
spark.sql(s"ALTER TABLE paimon_tbl RENAME TO NEW_PAIMON_TBL")
checkAnswer(spark.sql("show tables").select("tableName"), Row("new_paimon_tbl") :: Nil)

spark.sql(s"drop table new_paimon_tbl")
spark.sql(s"drop database paimon_db")
}
}

def rowsToMap(rows: Array[Row]): java.util.Map[String, String] = {
rows.map(row => (row.getString(0), row.getString(1))).toMap.asJava
}

def supportDefaultDatabaseWithSessionCatalog = true

def getDatabaseLocation(dbName: String): String = {
Expand Down

0 comments on commit 3eadd8b

Please sign in to comment.