Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 17, 2024
1 parent 0fb6076 commit 1c10fd0
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
</tr>
<tr>
<td><h5>case-sensitive</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Indicates whether this catalog is case-sensitive.</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public class CatalogOptions {
public static final ConfigOption<Boolean> CASE_SENSITIVE =
ConfigOptions.key("case-sensitive")
.booleanType()
.noDefaultValue()
.defaultValue(true)
.withFallbackKeys("allow-upper-case")
.withDescription("Indicates whether this catalog is case-sensitive.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
Expand Down Expand Up @@ -114,6 +115,11 @@ public Optional<CatalogLockFactory> defaultLockFactory() {
return Optional.empty();
}

@Override
public boolean caseSensitive() {
return catalogOptions.get(CASE_SENSITIVE);
}

public Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import java.util.Map;
import java.util.concurrent.Callable;

import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;

/** A catalog implementation for {@link FileIO}. */
public class FileSystemCatalog extends AbstractCatalog {

Expand Down Expand Up @@ -161,9 +159,4 @@ public void close() throws Exception {}
public String warehouse() {
return warehouse.toString();
}

@Override
public boolean caseSensitive() {
return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public List<PartitionEntry> listPartitions(Identifier identifier)

@Override
public boolean caseSensitive() {
return options.getOptional(CASE_SENSITIVE).orElse(true);
return options.get(CASE_SENSITIVE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
Expand Down Expand Up @@ -899,11 +898,6 @@ private void alterTableToHms(Table table, Identifier identifier, TableSchema new
clients.execute(client -> HiveAlterTableUtils.alterTable(client, identifier, table));
}

@Override
public boolean caseSensitive() {
return catalogOptions.getOptional(CASE_SENSITIVE).orElse(false);
}

@Override
protected boolean allowCustomTablePath() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,19 @@ public void testListDatabasesWhenNoDatabases() {

@Test
public void testCheckIdentifierUpperCase() throws Exception {
catalog.createDatabase("test_db", false);
HiveConf hiveConf = new HiveConf();
String jdoConnectionURL = "jdbc:derby:memory:" + UUID.randomUUID();
hiveConf.setVar(METASTORECONNECTURLKEY, jdoConnectionURL + ";create=true");
String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
Options catalogOptions = new Options();
catalogOptions.set(CatalogOptions.CASE_SENSITIVE, false);
HiveCatalog hiveCatalog =
new HiveCatalog(fileIO, hiveConf, metastoreClientClass, catalogOptions, warehouse);

hiveCatalog.createDatabase("test_db", false);
assertThatThrownBy(
() ->
catalog.createTable(
hiveCatalog.createTable(
Identifier.create("TEST_DB", "new_table"),
DEFAULT_TABLE_SCHEMA,
false))
Expand All @@ -96,12 +105,13 @@ public void testCheckIdentifierUpperCase() throws Exception {

assertThatThrownBy(
() ->
catalog.createTable(
hiveCatalog.createTable(
Identifier.create("test_db", "NEW_TABLE"),
DEFAULT_TABLE_SCHEMA,
false))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Table name [NEW_TABLE] cannot contain upper case in the catalog.");
hiveCatalog.close();
}

private static final String HADOOP_CONF_DIR =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,18 +991,23 @@ public void testCreateTableAs() throws Exception {

@Test
public void testRenameTable() throws Exception {
Map<String, String> props = new HashMap<>();
props.put("case-sensitive", "false");
registerHiveCatalog("case_insensitive_hive", props);
tEnv.executeSql("USE CATALOG case_insensitive_hive");
tEnv.executeSql("USE test_db");
tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
tEnv.executeSql("INSERT INTO t1 SELECT 1").await();
// the source table do not exist.
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t3 RENAME TO t4"))
.hasMessage(
"Table `my_hive`.`test_db`.`t3` doesn't exist or is a temporary table.");
"Table `case_insensitive_hive`.`test_db`.`t3` doesn't exist or is a temporary table.");

// the target table has existed.
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO t2"))
.hasMessage(
"Could not execute ALTER TABLE my_hive.test_db.t1 RENAME TO my_hive.test_db.t2");
"Could not execute ALTER TABLE case_insensitive_hive.test_db.t1 RENAME TO my_hive.test_db.t2");

// the target table name has upper case.
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO T1"))
Expand Down Expand Up @@ -1159,7 +1164,12 @@ public void testHiveLock() throws InterruptedException, Catalog.TableNotExistExc
}

@Test
public void testUpperCase() {
public void testUpperCase() throws Exception {
Map<String, String> props = new HashMap<>();
props.put("case-sensitive", "false");
registerHiveCatalog("case_insensitive_hive", props);
tEnv.executeSql("USE CATALOG case_insensitive_hive");

assertThatThrownBy(
() ->
tEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable;
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable;
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable;
import org.apache.spark.sql.internal.SessionState;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand All @@ -70,7 +69,6 @@
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
Expand All @@ -91,18 +89,10 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, S
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
Map<String, String> newOptions = new HashMap<>(options.asCaseSensitiveMap());
SessionState sessionState = SparkSession.active().sessionState();

CatalogContext catalogContext =
CatalogContext.create(Options.fromMap(options), sessionState.newHadoopConf());

// if spark is case-insensitive, set case-sensitive to catalog
if (!sessionState.conf().caseSensitiveAnalysis()) {
newOptions.put(CASE_SENSITIVE.key(), "true");
}
options = new CaseInsensitiveStringMap(newOptions);

CatalogContext.create(
Options.fromMap(options),
SparkSession.active().sessionState().newHadoopConf());
this.catalog = CatalogFactory.createCatalog(catalogContext);
this.defaultDatabase =
options.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.Map;
import java.util.concurrent.Callable;

import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG;
Expand Down Expand Up @@ -242,7 +241,6 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {
SparkSession sparkSession = SparkSession.active();
SessionState sessionState = sparkSession.sessionState();
Configuration hadoopConf = sessionState.newHadoopConf();
SparkConf sparkConf = new SparkConf();
if (options.containsKey(METASTORE.key())
&& options.get(METASTORE.key()).equalsIgnoreCase("hive")) {
String uri = options.get(CatalogOptions.URI.key());
Expand All @@ -257,11 +255,6 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {
}
}
}
if ("in-memory"
.equals(sparkSession.conf().get(StaticSQLConf.CATALOG_IMPLEMENTATION().key()))) {
LOG.warn("InMemoryCatalog here may cause bad effect.");
}

this.catalogName = name;
this.sparkCatalog = new SparkCatalog();

Expand All @@ -273,6 +266,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {
CREATE_UNDERLYING_SESSION_CATALOG.key(),
CREATE_UNDERLYING_SESSION_CATALOG.defaultValue())) {
this.underlyingSessionCatalogEnabled = true;
SparkConf sparkConf = new SparkConf();
for (Map.Entry<String, String> entry : options.entrySet()) {
sparkConf.set("spark.hadoop." + entry.getKey(), entry.getValue());
hadoopConf.set(entry.getKey(), entry.getValue());
Expand Down Expand Up @@ -330,11 +324,6 @@ private void fillCommonConfigurations(Map<String, String> options, SQLConf sqlCo
} else {
options.put(DEFAULT_DATABASE.key(), sessionCatalogDefaultDatabase);
}

// if spark is case-insensitive, set case-sensitive to catalog
if (!sqlConf.caseSensitiveAnalysis()) {
options.put(CASE_SENSITIVE.key(), "true");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,17 @@ public static void closeMetastore() throws Exception {

@Test
public void testCreateTableCaseSensitive(@TempDir java.nio.file.Path tempDir) {
// firstly, we use hive metastore to creata table, and check the result.
// firstly, we use hive metastore to create table, and check the result.
Path warehousePath = new Path("file:" + tempDir.toString());
SparkSession spark =
SparkSession.builder()
.config("spark.sql.warehouse.dir", warehousePath.toString())
// with case-sensitive false
.config("spark.sql.caseSensitive", "false")
// with hive metastore
.config("spark.sql.catalogImplementation", "hive")
.config(
"spark.sql.catalog.spark_catalog",
SparkGenericCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.case-sensitive", "true")
.master("local[2]")
.getOrCreate();

Expand All @@ -81,13 +80,12 @@ public void testCreateTableCaseSensitive(@TempDir java.nio.file.Path tempDir) {
SparkSession spark1 =
SparkSession.builder()
.config("spark.sql.warehouse.dir", warehousePath.toString())
// with case-sensitive true
.config("spark.sql.caseSensitive", "true")
// with hive metastore
.config("spark.sql.catalogImplementation", "hive")
.config(
"spark.sql.catalog.spark_catalog",
SparkGenericCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.case-sensitive", "false")
.master("local[2]")
.getOrCreate();

Expand Down

0 comments on commit 1c10fd0

Please sign in to comment.