Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Skip case checking in catalog #4730

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -282,8 +281,6 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
throws TableAlreadyExistException, DatabaseNotExistException {
checkNotBranch(identifier, "createTable");
checkNotSystemTable(identifier, "createTable");
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
validateAutoCreateClose(schema.options());
validateCustomTablePath(schema.options());

Expand Down Expand Up @@ -339,7 +336,6 @@ public void renameTable(Identifier fromTable, Identifier toTable, boolean ignore
checkNotBranch(toTable, "renameTable");
checkNotSystemTable(fromTable, "renameTable");
checkNotSystemTable(toTable, "renameTable");
validateIdentifierNameCaseInsensitive(toTable);

try {
getTable(fromTable);
Expand All @@ -366,8 +362,6 @@ public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
checkNotSystemTable(identifier, "alterTable");
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitiveInSchemaChange(changes);

try {
getTable(identifier);
Expand Down Expand Up @@ -571,30 +565,6 @@ protected void checkNotSystemDatabase(String database) {
}
}

protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
CatalogUtils.validateCaseInsensitive(
caseSensitive(), "Database", identifier.getDatabaseName());
CatalogUtils.validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName());
}

private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
List<String> fieldNames = new ArrayList<>();
for (SchemaChange change : changes) {
if (change instanceof SchemaChange.AddColumn) {
SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn) change;
fieldNames.addAll(Arrays.asList(addColumn.fieldNames()));
} else if (change instanceof SchemaChange.RenameColumn) {
SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn) change;
fieldNames.add(rename.newName());
}
}
validateFieldNameCaseInsensitive(fieldNames);
}

protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
CatalogUtils.validateCaseInsensitive(caseSensitive(), "Field", fieldNames);
}

private void validateAutoCreateClose(Map<String, String> options) {
checkArgument(
!Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,10 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Utils for {@link Catalog}. */
public class CatalogUtils {
Expand Down Expand Up @@ -64,25 +60,4 @@ public static String table(String path) {
public static Map<String, String> tableDefaultOptions(Map<String, String> options) {
return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX);
}

/** Validate database, table and field names must be lowercase when not case-sensitive. */
public static void validateCaseInsensitive(
boolean caseSensitive, String type, String... names) {
validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
}

/** Validate database, table and field names must be lowercase when not case-sensitive. */
public static void validateCaseInsensitive(
boolean caseSensitive, String type, List<String> names) {
if (caseSensitive) {
return;
}
List<String> illegalNames =
names.stream().filter(f -> !f.equals(f.toLowerCase())).collect(Collectors.toList());
checkArgument(
illegalNames.isEmpty(),
String.format(
"%s name %s cannot contain upper case in the catalog.",
type, illegalNames));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.catalog;

import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
Expand All @@ -29,7 +28,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link FileSystemCatalog}. */
Expand All @@ -39,14 +37,13 @@ public class FileSystemCatalogTest extends CatalogTestBase {
public void setUp() throws Exception {
super.setUp();
Options catalogOptions = new Options();
catalogOptions.set(CatalogOptions.CASE_SENSITIVE, false);
catalog = new FileSystemCatalog(fileIO, new Path(warehouse), catalogOptions);
}

@Test
public void testCreateTableCaseSensitive() throws Exception {
catalog.createDatabase("test_db", false);
Identifier identifier = Identifier.create("test_db", "new_table");
Identifier identifier = Identifier.create("test_db", "new_TABLE");
Schema schema =
Schema.newBuilder()
.column("Pk1", DataTypes.INT())
Expand All @@ -64,11 +61,7 @@ public void testCreateTableCaseSensitive() throws Exception {
.partitionKeys("Pk1", "pk2")
.primaryKey("Pk1", "pk2", "pk3")
.build();

// Create table throws Exception if using uppercase when 'allow-upper-case' is false
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> catalog.createTable(identifier, schema, false))
.withMessage("Field name [Pk1, Col1] cannot contain upper case in the catalog.");
catalog.createTable(identifier, schema, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
Expand Down Expand Up @@ -87,25 +88,18 @@ public void testCleanTimeoutLockAndAcquireLock() throws SQLException, Interrupte
}

@Test
public void testCheckIdentifierUpperCase() throws Exception {
public void testUpperCase() throws Exception {
catalog.createDatabase("test_db", false);
assertThatThrownBy(
() ->
catalog.createTable(
Identifier.create("TEST_DB", "new_table"),
DEFAULT_TABLE_SCHEMA,
false))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Database name [TEST_DB] cannot contain upper case in the catalog.");
.isInstanceOf(Catalog.DatabaseNotExistException.class)
.hasMessage("Database TEST_DB does not exist.");

assertThatThrownBy(
() ->
catalog.createTable(
Identifier.create("test_db", "NEW_TABLE"),
DEFAULT_TABLE_SCHEMA,
false))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Table name [NEW_TABLE] cannot contain upper case in the catalog.");
catalog.createTable(Identifier.create("test_db", "new_TABLE"), DEFAULT_TABLE_SCHEMA, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.cdc.EventParser;
Expand Down Expand Up @@ -154,13 +153,6 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) {
return this;
}

@Override
protected void validateCaseSensitivity() {
CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", database);
CatalogUtils.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix);
CatalogUtils.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix);
}

@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
Expand Down Expand Up @@ -113,12 +112,6 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) {
true);
}

@Override
protected void validateCaseSensitivity() {
CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", database);
CatalogUtils.validateCaseInsensitive(caseSensitive, "Table", table);
}

@Override
protected void beforeBuildingSourceSink() throws Exception {
Identifier identifier = new Identifier(database, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ public void build() throws Exception {

catalog.createDatabase(database, true);

validateCaseSensitivity();

beforeBuildingSourceSink();

DataStream<RichCdcMultiplexRecord> input =
Expand All @@ -126,8 +124,6 @@ public void build() throws Exception {
buildSink(input, parserFactory);
}

protected abstract void validateCaseSensitivity();

protected void beforeBuildingSourceSink() throws Exception {}

protected Source<CdcSourceRecord, ?, ?> buildSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,18 @@ public void testListDatabasesWhenNoDatabases() {
@Test
public void testCheckIdentifierUpperCase() throws Exception {
catalog.createDatabase("test_db", false);
assertThatThrownBy(
() ->
catalog.createTable(
Identifier.create("TEST_DB", "new_table"),
DEFAULT_TABLE_SCHEMA,
false))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Database name [TEST_DB] cannot contain upper case in the catalog.");

assertThatThrownBy(() -> catalog.createDatabase("TEST_DB", false))
.isInstanceOf(Catalog.DatabaseAlreadyExistException.class)
.hasMessage("Database TEST_DB already exists.");
catalog.createTable(Identifier.create("TEST_DB", "new_table"), DEFAULT_TABLE_SCHEMA, false);
assertThatThrownBy(
() ->
catalog.createTable(
Identifier.create("test_db", "NEW_TABLE"),
DEFAULT_TABLE_SCHEMA,
false))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Table name [NEW_TABLE] cannot contain upper case in the catalog.");
.isInstanceOf(Catalog.TableAlreadyExistException.class)
.hasMessage("Table test_db.NEW_TABLE already exists.");
}

private static final String HADOOP_CONF_DIR =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,7 @@ public void testCreateInsensitiveTable() throws Exception {
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL',",
" 'allow-upper-case' = 'true'",
" 'table.type' = 'EXTERNAL'",
")"))
.await();
tEnv.executeSql("USE CATALOG paimon_catalog_01").await();
Expand All @@ -593,30 +592,6 @@ public void testCreateInsensitiveTable() throws Exception {
tEnv.executeSql("DROP TABLE t").await();
Path tablePath = new Path(path, "test_db.db/t");
assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();

tEnv.executeSql(
String.join(
"\n",
"CREATE CATALOG paimon_catalog_02 WITH (",
" 'type' = 'paimon',",
" 'metastore' = 'hive',",
" 'uri' = '',",
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL',",
" 'allow-upper-case' = 'false'",
")"))
.await();
tEnv.executeSql("USE CATALOG paimon_catalog_02").await();
tEnv.executeSql("USE test_db").await();

// set case-sensitive = false would throw exception out
assertThatThrownBy(
() ->
tEnv.executeSql(
"CREATE TABLE t1 ( aa INT, Bb STRING ) WITH ( 'file.format' = 'avro' )")
.await())
.isInstanceOf(RuntimeException.class);
}

@Test
Expand Down Expand Up @@ -1006,7 +981,8 @@ public void testRenameTable() throws Exception {

// the target table name has upper case.
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO T1"))
.hasMessage("Table name [T1] cannot contain upper case in the catalog.");
.hasMessage(
"Could not execute ALTER TABLE my_hive.test_db.t1 RENAME TO my_hive.test_db.T1");

tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();

Expand Down Expand Up @@ -1160,24 +1136,16 @@ public void testHiveLock() throws InterruptedException, Catalog.TableNotExistExc

@Test
public void testUpperCase() {
tEnv.executeSql("CREATE TABLE T (a INT, b STRING ) WITH ( 'file.format' = 'avro' )");
tEnv.executeSql(
"CREATE TABLE tT (A INT, b STRING, C STRING) WITH ( 'file.format' = 'avro')");
assertThatThrownBy(
() ->
tEnv.executeSql(
"CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
.await())
.hasRootCauseMessage(
String.format(
"Table name [%s] cannot contain upper case in the catalog.", "T"));

assertThatThrownBy(
() ->
tEnv.executeSql(
"CREATE TABLE t (A INT, b STRING, C STRING) WITH ( 'file.format' = 'avro')")
"CREATE TABLE tt ( A INT, b STRING, C STRING) WITH ( 'file.format' = 'avro' )")
.await())
.hasRootCauseMessage(
String.format(
"Field name %s cannot contain upper case in the catalog.",
"[A, C]"));
"Table (or view) test_db.tt already exists in Catalog my_hive.");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable;
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable;
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable;
import org.apache.spark.sql.internal.SessionState;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand All @@ -70,7 +69,6 @@
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
Expand All @@ -91,18 +89,10 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, S
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
Map<String, String> newOptions = new HashMap<>(options.asCaseSensitiveMap());
SessionState sessionState = SparkSession.active().sessionState();

CatalogContext catalogContext =
CatalogContext.create(Options.fromMap(options), sessionState.newHadoopConf());

// if spark is case-insensitive, set case-sensitive to catalog
if (!sessionState.conf().caseSensitiveAnalysis()) {
newOptions.put(CASE_SENSITIVE.key(), "true");
}
options = new CaseInsensitiveStringMap(newOptions);

CatalogContext.create(
Options.fromMap(options),
SparkSession.active().sessionState().newHadoopConf());
this.catalog = CatalogFactory.createCatalog(catalogContext);
this.defaultDatabase =
options.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue());
Expand Down
Loading
Loading