Skip to content

Commit

Permalink
[core] Support create table column in-sensitive in HiveCatalog (#3822)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Jul 30, 2024
1 parent 405779a commit fcd5261
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 5 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>assert-upper-case</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If column name contains upper-case during create table would not support when spark.sql.caseSensitive=true, you can set spark.sql.caseSensitive=false to support upper-case condition. </td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,11 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<Boolean> ASSERT_UPPER_CASE =
ConfigOptions.key("assert-upper-case")
.booleanType()
.defaultValue(false)
.withDescription(
"If column name contains upper-case during create table would not support when spark.sql.caseSensitive=true, you can set spark.sql.caseSensitive=false to support upper-case condition. ");
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -567,7 +568,7 @@ private void alterTableToHms(Table table, Identifier identifier, TableSchema new

@Override
public boolean caseSensitive() {
return false;
return catalogOptions.get(ASSERT_UPPER_CASE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,59 @@ public void testCreateExternalTable() throws Exception {
assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
}

@Test
public void testCreateInsensitiveTable() throws Exception {
tEnv.executeSql(
String.join(
"\n",
"CREATE CATALOG paimon_catalog_01 WITH (",
" 'type' = 'paimon',",
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL',",
" 'assert-upper-case' = 'true'",
")"))
.await();
tEnv.executeSql("USE CATALOG paimon_catalog_01").await();
tEnv.executeSql("USE test_db").await();
tEnv.executeSql("CREATE TABLE t ( aa INT, Bb STRING ) WITH ( 'file.format' = 'avro' )")
.await();
assertThat(
hiveShell
.executeQuery("DESC FORMATTED t")
.contains("Table Type: \tEXTERNAL_TABLE \tNULL"))
.isTrue();
tEnv.executeSql("DROP TABLE t").await();
Path tablePath = new Path(path, "test_db.db/t");
assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();

tEnv.executeSql(
String.join(
"\n",
"CREATE CATALOG paimon_catalog_02 WITH (",
" 'type' = 'paimon',",
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL',",
" 'assert-upper-case' = 'false'",
")"))
.await();
tEnv.executeSql("USE CATALOG paimon_catalog_02").await();
tEnv.executeSql("USE test_db").await();

// set case-sensitive = false would throw exception out
assertThrows(
RuntimeException.class,
() ->
tEnv.executeSql(
"CREATE TABLE t1 ( aa INT, Bb STRING ) WITH ( 'file.format' = 'avro' )")
.await());
}

@Test
public void testFlinkWriteAndHiveRead() throws Exception {
tEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.internal.SessionState;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand All @@ -53,6 +54,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand All @@ -71,10 +73,18 @@ public class SparkCatalog extends SparkBaseCatalog {
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
Map<String, String> newOptions = new HashMap<>(options.asCaseSensitiveMap());
SessionState sessionState = SparkSession.active().sessionState();

CatalogContext catalogContext =
CatalogContext.create(
Options.fromMap(options),
SparkSession.active().sessionState().newHadoopConf());
CatalogContext.create(Options.fromMap(options), sessionState.newHadoopConf());

// add case-insensitive from sql conf
newOptions.put(
ASSERT_UPPER_CASE.key(),
Boolean.toString(!sessionState.conf().caseSensitiveAnalysis()));
options = new CaseInsensitiveStringMap(newOptions);

this.catalog = CatalogFactory.createCatalog(catalogContext);
this.defaultDatabase =
options.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Map;
import java.util.concurrent.Callable;

import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG;
Expand Down Expand Up @@ -284,6 +285,10 @@ private CaseInsensitiveStringMap autoFillConfigurations(
Map<String, String> newOptions = new HashMap<>(options.asCaseSensitiveMap());
fillAliyunConfigurations(newOptions, hadoopConf);
fillCommonConfigurations(newOptions, sqlConf);

// add case-insensitive from sql conf
newOptions.put(ASSERT_UPPER_CASE.key(), Boolean.toString(!sqlConf.caseSensitiveAnalysis()));

return new CaseInsensitiveStringMap(newOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.junit.jupiter.api.io.TempDir;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

/** Base tests for spark read. */
public class SparkGenericCatalogWithHiveTest {
Expand All @@ -44,6 +45,59 @@ public static void closeMetastore() throws Exception {
testHiveMetastore.stop();
}

@Test
public void testCreateTableCaseSensitive(@TempDir java.nio.file.Path tempDir) {
// firstly, we use hive metastore to creata table, and check the result.
Path warehousePath = new Path("file:" + tempDir.toString());
SparkSession spark =
SparkSession.builder()
.config("spark.sql.warehouse.dir", warehousePath.toString())
// with case-sensitive false
.config("spark.sql.caseSensitive", "false")
// with hive metastore
.config("spark.sql.catalogImplementation", "hive")
.config(
"spark.sql.catalog.spark_catalog",
SparkGenericCatalog.class.getName())
.master("local[2]")
.getOrCreate();

spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
spark.sql("USE my_db1");
spark.sql(
"CREATE TABLE IF NOT EXISTS t2 (a INT, Bb INT, c STRING) USING paimon TBLPROPERTIES"
+ " ('file.format'='avro')");

assertThat(
spark.sql("SHOW TABLES").collectAsList().stream()
.map(s -> s.get(1))
.map(Object::toString))
.containsExactlyInAnyOrder("t2");
spark.close();

SparkSession spark1 =
SparkSession.builder()
.config("spark.sql.warehouse.dir", warehousePath.toString())
// with case-sensitive true
.config("spark.sql.caseSensitive", "true")
// with hive metastore
.config("spark.sql.catalogImplementation", "hive")
.config(
"spark.sql.catalog.spark_catalog",
SparkGenericCatalog.class.getName())
.master("local[2]")
.getOrCreate();

spark1.sql("USE my_db1");
assertThrows(
RuntimeException.class,
() ->
spark1.sql(
"CREATE TABLE IF NOT EXISTS t3 (a INT, Bb INT, c STRING) USING paimon TBLPROPERTIES"
+ " ('file.format'='avro')"));
spark1.close();
}

@Test
public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) {
// firstly, we use hive metastore to creata table, and check the result.
Expand All @@ -66,7 +120,7 @@ public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) {
+ " ('file.format'='avro')");

assertThat(spark.sql("SHOW NAMESPACES").collectAsList().stream().map(Object::toString))
.containsExactlyInAnyOrder("[default]", "[my_db]");
.containsExactlyInAnyOrder("[default]", "[my_db]", "[my_db1]");

assertThat(
spark.sql("SHOW TABLES").collectAsList().stream()
Expand Down

0 comments on commit fcd5261

Please sign in to comment.