From fcd52611db7b85708d186c41a590610cccf2f66b Mon Sep 17 00:00:00 2001
From: xuzifu666 <1206332514@qq.com>
Date: Tue, 30 Jul 2024 11:06:42 +0800
Subject: [PATCH 1/7] [core] Support create table column in-sensitive in
HiveCatalog (#3822)
---
.../generated/catalog_configuration.html | 6 ++
.../apache/paimon/options/CatalogOptions.java | 7 +++
.../org/apache/paimon/hive/HiveCatalog.java | 3 +-
.../paimon/hive/HiveCatalogITCaseBase.java | 53 ++++++++++++++++++
.../org/apache/paimon/spark/SparkCatalog.java | 16 +++++-
.../paimon/spark/SparkGenericCatalog.java | 5 ++
.../SparkGenericCatalogWithHiveTest.java | 56 ++++++++++++++++++-
7 files changed, 141 insertions(+), 5 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html
index cab6e731e851..53ba7d0ceab5 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -26,6 +26,12 @@
+
+ assert-upper-case |
+ false |
+ Boolean |
+ If column name contains upper-case during create table would not support when spark.sql.caseSensitive=true, you can set spark.sql.caseSensitive=false to support upper-case condition. |
+
client-pool-size |
2 |
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index f00a35a75094..91c024a34cfc 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -110,4 +110,11 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());
+
+ public static final ConfigOption ASSERT_UPPER_CASE =
+ ConfigOptions.key("assert-upper-case")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If column name contains upper-case during create table would not support when spark.sql.caseSensitive=true, you can set spark.sql.caseSensitive=false to support upper-case condition. ");
}
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 7edb49f5867a..cd78ad8210db 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -91,6 +91,7 @@
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
+import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -567,7 +568,7 @@ private void alterTableToHms(Table table, Identifier identifier, TableSchema new
@Override
public boolean caseSensitive() {
- return false;
+ return catalogOptions.get(ASSERT_UPPER_CASE);
}
@Override
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index e42d48c8b08d..39571005b553 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -363,6 +363,59 @@ public void testCreateExternalTable() throws Exception {
assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
}
+ @Test
+ public void testCreateInsensitiveTable() throws Exception {
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG paimon_catalog_01 WITH (",
+ " 'type' = 'paimon',",
+ " 'metastore' = 'hive',",
+ " 'uri' = '',",
+ " 'warehouse' = '" + path + "',",
+ " 'lock.enabled' = 'true',",
+ " 'table.type' = 'EXTERNAL',",
+ " 'assert-upper-case' = 'true'",
+ ")"))
+ .await();
+ tEnv.executeSql("USE CATALOG paimon_catalog_01").await();
+ tEnv.executeSql("USE test_db").await();
+ tEnv.executeSql("CREATE TABLE t ( aa INT, Bb STRING ) WITH ( 'file.format' = 'avro' )")
+ .await();
+ assertThat(
+ hiveShell
+ .executeQuery("DESC FORMATTED t")
+ .contains("Table Type: \tEXTERNAL_TABLE \tNULL"))
+ .isTrue();
+ tEnv.executeSql("DROP TABLE t").await();
+ Path tablePath = new Path(path, "test_db.db/t");
+ assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
+
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG paimon_catalog_02 WITH (",
+ " 'type' = 'paimon',",
+ " 'metastore' = 'hive',",
+ " 'uri' = '',",
+ " 'warehouse' = '" + path + "',",
+ " 'lock.enabled' = 'true',",
+ " 'table.type' = 'EXTERNAL',",
+ " 'assert-upper-case' = 'false'",
+ ")"))
+ .await();
+ tEnv.executeSql("USE CATALOG paimon_catalog_02").await();
+ tEnv.executeSql("USE test_db").await();
+
+ // set case-sensitive = false would throw exception out
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ tEnv.executeSql(
+ "CREATE TABLE t1 ( aa INT, Bb STRING ) WITH ( 'file.format' = 'avro' )")
+ .await());
+ }
+
@Test
public void testFlinkWriteAndHiveRead() throws Exception {
tEnv.executeSql(
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 4e8d8eaf7be7..c28a65cc8cbb 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -40,6 +40,7 @@
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.internal.SessionState;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -53,6 +54,7 @@
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -71,10 +73,18 @@ public class SparkCatalog extends SparkBaseCatalog {
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
+ Map newOptions = new HashMap<>(options.asCaseSensitiveMap());
+ SessionState sessionState = SparkSession.active().sessionState();
+
CatalogContext catalogContext =
- CatalogContext.create(
- Options.fromMap(options),
- SparkSession.active().sessionState().newHadoopConf());
+ CatalogContext.create(Options.fromMap(options), sessionState.newHadoopConf());
+
+ // add case-insensitive from sql conf
+ newOptions.put(
+ ASSERT_UPPER_CASE.key(),
+ Boolean.toString(!sessionState.conf().caseSensitiveAnalysis()));
+ options = new CaseInsensitiveStringMap(newOptions);
+
this.catalog = CatalogFactory.createCatalog(catalogContext);
this.defaultDatabase =
options.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue());
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 96b87701b90f..4860301af4f0 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -61,6 +61,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
+import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG;
@@ -284,6 +285,10 @@ private CaseInsensitiveStringMap autoFillConfigurations(
Map newOptions = new HashMap<>(options.asCaseSensitiveMap());
fillAliyunConfigurations(newOptions, hadoopConf);
fillCommonConfigurations(newOptions, sqlConf);
+
+ // add case-insensitive from sql conf
+ newOptions.put(ASSERT_UPPER_CASE.key(), Boolean.toString(!sqlConf.caseSensitiveAnalysis()));
+
return new CaseInsensitiveStringMap(newOptions);
}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
index 4377bc94a716..b0f1749dfeb3 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
@@ -28,6 +28,7 @@
import org.junit.jupiter.api.io.TempDir;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/** Base tests for spark read. */
public class SparkGenericCatalogWithHiveTest {
@@ -44,6 +45,59 @@ public static void closeMetastore() throws Exception {
testHiveMetastore.stop();
}
+ @Test
+ public void testCreateTableCaseSensitive(@TempDir java.nio.file.Path tempDir) {
+ // firstly, we use hive metastore to creata table, and check the result.
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession spark =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir", warehousePath.toString())
+ // with case-sensitive false
+ .config("spark.sql.caseSensitive", "false")
+ // with hive metastore
+ .config("spark.sql.catalogImplementation", "hive")
+ .config(
+ "spark.sql.catalog.spark_catalog",
+ SparkGenericCatalog.class.getName())
+ .master("local[2]")
+ .getOrCreate();
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE my_db1");
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS t2 (a INT, Bb INT, c STRING) USING paimon TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ assertThat(
+ spark.sql("SHOW TABLES").collectAsList().stream()
+ .map(s -> s.get(1))
+ .map(Object::toString))
+ .containsExactlyInAnyOrder("t2");
+ spark.close();
+
+ SparkSession spark1 =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir", warehousePath.toString())
+ // with case-sensitive true
+ .config("spark.sql.caseSensitive", "true")
+ // with hive metastore
+ .config("spark.sql.catalogImplementation", "hive")
+ .config(
+ "spark.sql.catalog.spark_catalog",
+ SparkGenericCatalog.class.getName())
+ .master("local[2]")
+ .getOrCreate();
+
+ spark1.sql("USE my_db1");
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ spark1.sql(
+ "CREATE TABLE IF NOT EXISTS t3 (a INT, Bb INT, c STRING) USING paimon TBLPROPERTIES"
+ + " ('file.format'='avro')"));
+ spark1.close();
+ }
+
@Test
public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) {
// firstly, we use hive metastore to creata table, and check the result.
@@ -66,7 +120,7 @@ public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) {
+ " ('file.format'='avro')");
assertThat(spark.sql("SHOW NAMESPACES").collectAsList().stream().map(Object::toString))
- .containsExactlyInAnyOrder("[default]", "[my_db]");
+ .containsExactlyInAnyOrder("[default]", "[my_db]", "[my_db1]");
assertThat(
spark.sql("SHOW TABLES").collectAsList().stream()
From 4189a23f1b139821359b529a0bd55fb54520bcf7 Mon Sep 17 00:00:00 2001
From: Zouxxyy
Date: Tue, 30 Jul 2024 12:04:31 +0800
Subject: [PATCH 2/7] [spark] Support global options via SQL conf (#3825)
---
.../org/apache/paimon/spark/SparkCatalog.java | 45 +++++------
.../org/apache/paimon/spark/SparkSource.scala | 5 +-
.../paimon/spark/util/OptionUtils.scala | 54 +++++++++++++
.../paimon/spark/sql/PaimonOptionTest.scala | 79 +++++++++++++++++++
4 files changed, 154 insertions(+), 29 deletions(-)
create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c28a65cc8cbb..d4576fa8d38d 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -26,7 +26,6 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
-import org.apache.paimon.table.Table;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
@@ -57,6 +56,8 @@
import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
+import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
+import static org.apache.paimon.spark.util.OptionUtils.mergeSQLConf;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Spark {@link TableCatalog} for paimon. */
@@ -220,21 +221,16 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti
@Override
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
- try {
- return new SparkTable(load(ident));
- } catch (Catalog.TableNotExistException e) {
- throw new NoSuchTableException(ident);
- }
+ return loadSparkTable(ident, Collections.emptyMap());
}
/**
* Do not annotate with @override
here to maintain compatibility with Spark 3.2-.
*/
public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException {
- Table table = loadPaimonTable(ident);
LOG.info("Time travel to version '{}'.", version);
- return new SparkTable(
- table.copy(Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), version)));
+ return loadSparkTable(
+ ident, Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), version));
}
/**
@@ -244,22 +240,13 @@ public SparkTable loadTable(Identifier ident, String version) throws NoSuchTable
* TableCatalog#loadTable(Identifier, long)}). But in SQL you should use seconds.
*/
public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
- Table table = loadPaimonTable(ident);
// Paimon's timestamp use millisecond
timestamp = timestamp / 1000;
-
LOG.info("Time travel target timestamp is {} milliseconds.", timestamp);
-
- Options option = new Options().set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp);
- return new SparkTable(table.copy(option.toMap()));
- }
-
- private Table loadPaimonTable(Identifier ident) throws NoSuchTableException {
- try {
- return load(ident);
- } catch (Catalog.TableNotExistException e) {
- throw new NoSuchTableException(ident);
- }
+ return loadSparkTable(
+ ident,
+ Collections.singletonMap(
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(timestamp)));
}
@Override
@@ -400,7 +387,7 @@ private Schema toInitialSchema(
return references.length == 1
&& references[0] instanceof FieldReference;
}));
- Map normalizedProperties = new HashMap<>(properties);
+ Map normalizedProperties = mergeSQLConf(properties);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(TableCatalog.PROP_COMMENT);
String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
@@ -469,10 +456,14 @@ protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident)
return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name());
}
- /** Load a Table Store table. */
- protected org.apache.paimon.table.Table load(Identifier ident)
- throws Catalog.TableNotExistException, NoSuchTableException {
- return catalog.getTable(toIdentifier(ident));
+ protected SparkTable loadSparkTable(Identifier ident, Map extraOptions)
+ throws NoSuchTableException {
+ try {
+ return new SparkTable(
+ copyWithSQLConf(catalog.getTable(toIdentifier(ident)), extraOptions));
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(ident);
+ }
}
// --------------------- unsupported methods ----------------------------
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index e7e744f37c8a..8ea2c31bc8f6 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.options.Options
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.sources.PaimonSink
+import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf
import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory}
import org.apache.paimon.table.system.AuditLogTable
@@ -64,7 +65,7 @@ class SparkSource
schema: StructType,
partitioning: Array[Transform],
properties: JMap[String, String]): Table = {
- new SparkTable(loadTable(properties))
+ SparkTable(loadTable(properties))
}
override def createRelation(
@@ -80,7 +81,7 @@ class SparkSource
private def loadTable(options: JMap[String, String]): DataTable = {
val catalogContext = CatalogContext.create(
- Options.fromMap(options),
+ Options.fromMap(mergeSQLConf(options)),
SparkSession.active.sessionState.newHadoopConf())
val table = FileStoreTableFactory.create(catalogContext)
if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
new file mode 100644
index 000000000000..af7ff7204cda
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.paimon.table.Table
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+object OptionUtils extends SQLConfHelper {
+
+ private val PAIMON_OPTION_PREFIX = "spark.paimon."
+
+ def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, String] = {
+ val mergedOptions = new JHashMap[String, String](
+ conf.getAllConfs
+ .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
+ .map {
+ case (key, value) =>
+ key.stripPrefix(PAIMON_OPTION_PREFIX) -> value
+ }
+ .asJava)
+ mergedOptions.putAll(extraOptions)
+ mergedOptions
+ }
+
+ def copyWithSQLConf[T <: Table](table: T, extraOptions: JMap[String, String]): T = {
+ val mergedOptions = mergeSQLConf(extraOptions)
+ if (mergedOptions.isEmpty) {
+ table
+ } else {
+ table.copy(mergedOptions).asInstanceOf[T]
+ }
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
new file mode 100644
index 000000000000..9fc571634645
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTableFactory
+
+import org.apache.spark.sql.Row
+import org.junit.jupiter.api.Assertions
+
+class PaimonOptionTest extends PaimonSparkTestBase {
+
+ import testImplicits._
+
+ test("Paimon Option: create table with sql conf") {
+ withSQLConf("spark.paimon.file.block-size" -> "512M") {
+ sql("CREATE TABLE T (id INT)")
+ val table = loadTable("T")
+ // check options in schema file directly
+ val fileStoreTable = FileStoreTableFactory.create(table.fileIO(), table.location())
+ Assertions.assertEquals("512M", fileStoreTable.options().get("file.block-size"))
+ }
+ }
+
+ test("Paimon Option: create table by dataframe with sql conf") {
+ withSQLConf("spark.paimon.file.block-size" -> "512M") {
+ Seq((1L, "x1"), (2L, "x2"))
+ .toDF("a", "b")
+ .write
+ .format("paimon")
+ .mode("append")
+ .saveAsTable("T")
+ val table = loadTable("T")
+ // check options in schema file directly
+ val fileStoreTable = FileStoreTableFactory.create(table.fileIO(), table.location())
+ Assertions.assertEquals("512M", fileStoreTable.options().get("file.block-size"))
+ }
+ }
+
+ test("Paimon Option: query table with sql conf") {
+ sql("CREATE TABLE T (id INT)")
+ sql("INSERT INTO T VALUES 1")
+ sql("INSERT INTO T VALUES 2")
+ checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil)
+ val table = loadTable("T")
+
+ // query with mutable option
+ withSQLConf("spark.paimon.scan.snapshot-id" -> "1") {
+ checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1))
+ checkAnswer(spark.read.format("paimon").load(table.location().toString), Row(1))
+ }
+
+ // query with immutable option
+ withSQLConf("spark.paimon.bucket" -> "1") {
+ assertThrows[UnsupportedOperationException] {
+ sql("SELECT * FROM T ORDER BY id")
+ }
+ assertThrows[UnsupportedOperationException] {
+ spark.read.format("paimon").load(table.location().toString)
+ }
+ }
+ }
+}
From 76d25200b28ef32eca85449371b3c4f2e39ac856 Mon Sep 17 00:00:00 2001
From: Jingsong
Date: Tue, 30 Jul 2024 11:27:38 +0800
Subject: [PATCH 3/7] [core] Rename catalog option to allow-upper-case
---
.../generated/catalog_configuration.html | 6 +++---
.../org/apache/paimon/options/CatalogOptions.java | 9 +++++----
.../java/org/apache/paimon/utils/StringUtils.java | 4 ++--
.../org/apache/paimon/catalog/AbstractCatalog.java | 12 +++++++++---
.../java/org/apache/paimon/catalog/Catalog.java | 6 ++----
.../org/apache/paimon/catalog/DelegateCatalog.java | 4 ++--
.../apache/paimon/catalog/FileSystemCatalog.java | 2 +-
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 2 +-
.../cdc/MessageQueueSyncTableActionBase.java | 2 +-
.../flink/action/cdc/SyncDatabaseActionBase.java | 14 +++++++++-----
.../flink/action/cdc/SyncTableActionBase.java | 10 +++++-----
.../action/cdc/SynchronizationActionBase.java | 4 ++--
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 4 ++--
.../paimon/flink/sink/cdc/CaseSensitiveUtils.java | 8 ++++----
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 6 +++---
.../java/org/apache/paimon/hive/HiveCatalog.java | 6 +++---
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 4 ++--
.../java/org/apache/paimon/spark/SparkCatalog.java | 10 +++++-----
.../apache/paimon/spark/SparkGenericCatalog.java | 8 +++++---
19 files changed, 66 insertions(+), 55 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 53ba7d0ceab5..a583c74714ba 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -27,10 +27,10 @@
- assert-upper-case |
- false |
+ allow-upper-case |
+ (none) |
Boolean |
- If column name contains upper-case during create table would not support when spark.sql.caseSensitive=true, you can set spark.sql.caseSensitive=false to support upper-case condition. |
+ Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog. |
client-pool-size |
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 91c024a34cfc..9ef681809eab 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -111,10 +111,11 @@ public class CatalogOptions {
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());
- public static final ConfigOption ASSERT_UPPER_CASE =
- ConfigOptions.key("assert-upper-case")
+ public static final ConfigOption ALLOW_UPPER_CASE =
+ ConfigOptions.key("allow-upper-case")
.booleanType()
- .defaultValue(false)
+ .noDefaultValue()
.withDescription(
- "If column name contains upper-case during create table would not support when spark.sql.caseSensitive=true, you can set spark.sql.caseSensitive=false to support upper-case condition. ");
+ "Indicates whether this catalog allow upper case, "
+ + "its default value depends on the implementation of the specific catalog.");
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index 94ce975aa5de..aeea97ffd9f6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -551,8 +551,8 @@ public static boolean isBlank(String str) {
return true;
}
- public static String caseSensitiveConversion(String str, boolean caseSensitive) {
- return caseSensitive ? str : str.toLowerCase();
+ public static String caseSensitiveConversion(String str, boolean allowUpperCase) {
+ return allowUpperCase ? str : str.toLowerCase();
}
public static boolean isNumeric(final CharSequence cs) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index bccf2a82b825..afe64666a875 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -54,6 +54,7 @@
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.createCommitUser;
+import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
@@ -129,6 +130,11 @@ protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}
+ @Override
+ public boolean allowUpperCase() {
+ return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
+ }
+
@Override
public void createDatabase(String name, boolean ignoreIfExists, Map properties)
throws DatabaseAlreadyExistException {
@@ -520,8 +526,8 @@ public static void validateCaseInsensitive(
}
protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
- validateCaseInsensitive(caseSensitive(), "Database", identifier.getDatabaseName());
- validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName());
+ validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
+ validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
}
private void validateFieldNameCaseInsensitiveInSchemaChange(List changes) {
@@ -539,7 +545,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List c
}
protected void validateFieldNameCaseInsensitive(List fieldNames) {
- validateCaseInsensitive(caseSensitive(), "Field", fieldNames);
+ validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
}
private void validateAutoCreateClose(Map options) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 2d3f52901e9c..fe8e0b68b813 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -262,10 +262,8 @@ default void alterTable(Identifier identifier, SchemaChange change, boolean igno
alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists);
}
- /** Return a boolean that indicates whether this catalog is case-sensitive. */
- default boolean caseSensitive() {
- return true;
- }
+ /** Return a boolean that indicates whether this catalog allow upper case. */
+ boolean allowUpperCase();
default void repairCatalog() {
throw new UnsupportedOperationException();
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 5f24d5cf8535..20bb99f1aa31 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -42,8 +42,8 @@ public Catalog wrapped() {
}
@Override
- public boolean caseSensitive() {
- return wrapped.caseSensitive();
+ public boolean allowUpperCase() {
+ return wrapped.allowUpperCase();
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 20a7f8b2c355..c2ff376019bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -176,7 +176,7 @@ public String warehouse() {
}
@Override
- public boolean caseSensitive() {
+ public boolean allowUpperCase() {
return catalogOptions.get(CASE_SENSITIVE);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 54fd6a0cea06..556c071f2e75 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -324,7 +324,7 @@ protected TableSchema getDataTableSchema(Identifier identifier, String branchNam
}
@Override
- public boolean caseSensitive() {
+ public boolean allowUpperCase() {
return false;
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
index ffc05aec09bb..28ad5b52aec5 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
@@ -79,7 +79,7 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) {
tableConfig,
retrievedSchema,
metadataConverters,
- caseSensitive,
+ allowUpperCase,
true,
false);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index bddebd229eaa..d6c4c12ecdd3 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -120,9 +120,9 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) {
@Override
protected void validateCaseSensitivity() {
- AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database);
- AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix);
- AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix);
+ AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", database);
+ AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table prefix", tablePrefix);
+ AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table suffix", tableSuffix);
}
@Override
@@ -135,12 +135,16 @@ protected FlatMapFunction recordParse()
protected EventParser.Factory buildEventParserFactory() {
NewTableSchemaBuilder schemaBuilder =
new NewTableSchemaBuilder(
- tableConfig, caseSensitive, partitionKeys, primaryKeys, metadataConverters);
+ tableConfig,
+ allowUpperCase,
+ partitionKeys,
+ primaryKeys,
+ metadataConverters);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
- new TableNameConverter(caseSensitive, mergeShards, tablePrefix, tableSuffix);
+ new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix);
Set createdTables;
try {
createdTables = new HashSet<>(catalog.listTables(database));
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index e335fc2be348..be8cedf0baf6 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -107,15 +107,15 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) {
tableConfig,
retrievedSchema,
metadataConverters,
- caseSensitive,
+ allowUpperCase,
true,
true);
}
@Override
protected void validateCaseSensitivity() {
- AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database);
- AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table", table);
+ AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", database);
+ AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table", table);
}
@Override
@@ -142,7 +142,7 @@ protected void beforeBuildingSourceSink() throws Exception {
buildComputedColumns(
computedColumnArgs,
fileStoreTable.schema().fields(),
- caseSensitive);
+ allowUpperCase);
// check partition keys and primary keys in case that user specified them
checkConstraints();
}
@@ -162,7 +162,7 @@ protected FlatMapFunction recordParse()
@Override
protected EventParser.Factory buildEventParserFactory() {
- boolean caseSensitive = this.caseSensitive;
+ boolean caseSensitive = this.allowUpperCase;
return () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 944185347c35..dc2562d91470 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -62,7 +62,7 @@ public abstract class SynchronizationActionBase extends ActionBase {
protected final String database;
protected final Configuration cdcSourceConfig;
protected final SyncJobHandler syncJobHandler;
- protected final boolean caseSensitive;
+ protected final boolean allowUpperCase;
protected Map tableConfig = new HashMap<>();
protected TypeMapping typeMapping = TypeMapping.defaultMapping();
@@ -78,7 +78,7 @@ public SynchronizationActionBase(
this.database = database;
this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
this.syncJobHandler = syncJobHandler;
- this.caseSensitive = catalog.caseSensitive();
+ this.allowUpperCase = catalog.allowUpperCase();
this.syncJobHandler.registerJdbcDriver();
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index a33f2c978321..d27fa32d0233 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -137,7 +137,7 @@ protected void beforeBuildingSourceSink() throws Exception {
+ ", or MySQL database does not exist.");
TableNameConverter tableNameConverter =
- new TableNameConverter(caseSensitive, mergeShards, tablePrefix, tableSuffix);
+ new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix);
for (JdbcTableInfo tableInfo : jdbcTableInfos) {
Identifier identifier =
Identifier.create(
@@ -152,7 +152,7 @@ protected void beforeBuildingSourceSink() throws Exception {
tableConfig,
tableInfo.schema(),
metadataConverters,
- caseSensitive,
+ allowUpperCase,
false,
true);
try {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
index 49890a11ccbc..4892aee03024 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
@@ -29,7 +29,7 @@ public class CaseSensitiveUtils {
public static DataStream cdcRecordConvert(
Catalog.Loader catalogLoader, DataStream input) {
- if (caseSensitive(catalogLoader)) {
+ if (allowUpperCase(catalogLoader)) {
return input;
}
@@ -47,7 +47,7 @@ public void processElement(
public static DataStream cdcMultiplexRecordConvert(
Catalog.Loader catalogLoader, DataStream input) {
- if (caseSensitive(catalogLoader)) {
+ if (allowUpperCase(catalogLoader)) {
return input;
}
@@ -65,9 +65,9 @@ public void processElement(
.name("Case-insensitive Convert");
}
- private static boolean caseSensitive(Catalog.Loader catalogLoader) {
+ private static boolean allowUpperCase(Catalog.Loader catalogLoader) {
try (Catalog catalog = catalogLoader.load()) {
- return catalog.caseSensitive();
+ return catalog.allowUpperCase();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 3d832d33949b..77c49e8f3da2 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -49,7 +49,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process
protected final Catalog.Loader catalogLoader;
protected Catalog catalog;
- private boolean caseSensitive;
+ private boolean allowUpperCase;
private static final List STRING_TYPES =
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
@@ -76,7 +76,7 @@ protected UpdatedDataFieldsProcessFunctionBase(Catalog.Loader catalogLoader) {
@Override
public void open(Configuration parameters) {
this.catalog = catalogLoader.load();
- this.caseSensitive = this.catalog.caseSensitive();
+ this.allowUpperCase = this.catalog.allowUpperCase();
}
protected void applySchemaChange(
@@ -203,7 +203,7 @@ protected List extractSchemaChanges(
List result = new ArrayList<>();
for (DataField newField : updatedDataFields) {
String newFieldName =
- StringUtils.caseSensitiveConversion(newField.name(), caseSensitive);
+ StringUtils.caseSensitiveConversion(newField.name(), allowUpperCase);
if (oldFields.containsKey(newFieldName)) {
DataField oldField = oldFields.get(newFieldName);
// we compare by ignoring nullable, because partition keys and primary keys might be
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index cd78ad8210db..1bbf4b3871e8 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -91,7 +91,7 @@
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
-import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -567,8 +567,8 @@ private void alterTableToHms(Table table, Identifier identifier, TableSchema new
}
@Override
- public boolean caseSensitive() {
- return catalogOptions.get(ASSERT_UPPER_CASE);
+ public boolean allowUpperCase() {
+ return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false);
}
@Override
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 39571005b553..d400ec40b323 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -375,7 +375,7 @@ public void testCreateInsensitiveTable() throws Exception {
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL',",
- " 'assert-upper-case' = 'true'",
+ " 'allow-upper-case' = 'true'",
")"))
.await();
tEnv.executeSql("USE CATALOG paimon_catalog_01").await();
@@ -401,7 +401,7 @@ public void testCreateInsensitiveTable() throws Exception {
" 'warehouse' = '" + path + "',",
" 'lock.enabled' = 'true',",
" 'table.type' = 'EXTERNAL',",
- " 'assert-upper-case' = 'false'",
+ " 'allow-upper-case' = 'false'",
")"))
.await();
tEnv.executeSql("USE CATALOG paimon_catalog_02").await();
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index d4576fa8d38d..b04dce2fa482 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -53,7 +53,7 @@
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
@@ -80,10 +80,10 @@ public void initialize(String name, CaseInsensitiveStringMap options) {
CatalogContext catalogContext =
CatalogContext.create(Options.fromMap(options), sessionState.newHadoopConf());
- // add case-insensitive from sql conf
- newOptions.put(
- ASSERT_UPPER_CASE.key(),
- Boolean.toString(!sessionState.conf().caseSensitiveAnalysis()));
+ // if spark is case-insensitive, set allow upper case to catalog
+ if (!sessionState.conf().caseSensitiveAnalysis()) {
+ newOptions.put(ALLOW_UPPER_CASE.key(), "true");
+ }
options = new CaseInsensitiveStringMap(newOptions);
this.catalog = CatalogFactory.createCatalog(catalogContext);
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 4860301af4f0..6c7f4252433e 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -61,7 +61,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
-import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG;
@@ -286,8 +286,10 @@ private CaseInsensitiveStringMap autoFillConfigurations(
fillAliyunConfigurations(newOptions, hadoopConf);
fillCommonConfigurations(newOptions, sqlConf);
- // add case-insensitive from sql conf
- newOptions.put(ASSERT_UPPER_CASE.key(), Boolean.toString(!sqlConf.caseSensitiveAnalysis()));
+ // if spark is case-insensitive, set allow upper case to catalog
+ if (!sqlConf.caseSensitiveAnalysis()) {
+ newOptions.put(ALLOW_UPPER_CASE.key(), "true");
+ }
return new CaseInsensitiveStringMap(newOptions);
}
From 1e98ba88d3f3542e4e997bf5a4510ac5f43ee544 Mon Sep 17 00:00:00 2001
From: chun ji <1138552873@qq.com>
Date: Tue, 30 Jul 2024 17:17:25 +0800
Subject: [PATCH 4/7] [cdc] fix `float` type convert error in
DebeziumSchemaUtils#fromDebeziumType (#3847)
---
.../flink/action/cdc/format/debezium/DebeziumSchemaUtils.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
index 1aab6653d4d4..e61b33d0ed1e 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java
@@ -225,6 +225,7 @@ private static DataType fromDebeziumType(String dbzType) {
return DataTypes.INT();
case "int64":
return DataTypes.BIGINT();
+ case "float":
case "float32":
case "float64":
return DataTypes.FLOAT();
From 36823449f28f3053ed5412760637aed0e17a2076 Mon Sep 17 00:00:00 2001
From: tsreaper
Date: Tue, 30 Jul 2024 17:46:00 +0800
Subject: [PATCH 5/7] [core] Immutable table options can now be changed on an
empty table (#3845)
---
.../java/org/apache/paimon/schema/Schema.java | 12 ++-
.../apache/paimon/schema/SchemaManager.java | 59 ++++++++-----
.../paimon/schema/SchemaManagerTest.java | 86 +++++++++++++++++++
.../paimon/flink/SchemaChangeITCase.java | 26 +++++-
.../paimon/flink/SchemaChangeITCase.java | 25 +++++-
5 files changed, 183 insertions(+), 25 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index b7575837471c..c6c79f4d4afd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -161,7 +161,11 @@ private List normalizePrimaryKeys(List primaryKeys) {
"Cannot define primary key on DDL and table options at the same time.");
}
String pk = options.get(CoreOptions.PRIMARY_KEY.key());
- primaryKeys = Arrays.asList(pk.split(","));
+ primaryKeys =
+ Arrays.stream(pk.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
options.remove(CoreOptions.PRIMARY_KEY.key());
}
return primaryKeys;
@@ -174,7 +178,11 @@ private List normalizePartitionKeys(List partitionKeys) {
"Cannot define partition on DDL and table options at the same time.");
}
String partitions = options.get(CoreOptions.PARTITION.key());
- partitionKeys = Arrays.asList(partitions.split(","));
+ partitionKeys =
+ Arrays.stream(partitions.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
options.remove(CoreOptions.PARTITION.key());
}
return partitionKeys;
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 4f70ac725e48..684adfe9da72 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -45,6 +45,7 @@
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
import javax.annotation.Nullable;
@@ -108,14 +109,14 @@ public Optional latest() {
try {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
.reduce(Math::max)
- .map(id -> schema(id));
+ .map(this::schema);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public List listAll() {
- return listAllIds().stream().map(id -> schema(id)).collect(Collectors.toList());
+ return listAllIds().stream().map(this::schema).collect(Collectors.toList());
}
/** List all schema IDs. */
@@ -184,24 +185,31 @@ public TableSchema commitChanges(SchemaChange... changes) throws Exception {
public TableSchema commitChanges(List changes)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
+ SnapshotManager snapshotManager = new SnapshotManager(fileIO, tableRoot, branch);
+ boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null);
+
while (true) {
- TableSchema schema =
+ TableSchema oldTableSchema =
latest().orElseThrow(
() ->
new Catalog.TableNotExistException(
fromPath(branchPath(), true)));
- Map newOptions = new HashMap<>(schema.options());
- List newFields = new ArrayList<>(schema.fields());
- AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId());
- String newComment = schema.comment();
+ Map newOptions = new HashMap<>(oldTableSchema.options());
+ List newFields = new ArrayList<>(oldTableSchema.fields());
+ AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());
+ String newComment = oldTableSchema.comment();
for (SchemaChange change : changes) {
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
- checkAlterTableOption(setOption.key());
+ if (hasSnapshots) {
+ checkAlterTableOption(setOption.key());
+ }
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
RemoveOption removeOption = (RemoveOption) change;
- checkAlterTableOption(removeOption.key());
+ if (hasSnapshots) {
+ checkAlterTableOption(removeOption.key());
+ }
newOptions.remove(removeOption.key());
} else if (change instanceof UpdateComment) {
UpdateComment updateComment = (UpdateComment) change;
@@ -245,7 +253,7 @@ public TableSchema commitChanges(List changes)
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
- validateNotPrimaryAndPartitionKey(schema, rename.fieldName());
+ validateNotPrimaryAndPartitionKey(oldTableSchema, rename.fieldName());
if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
fromPath(branchPath(), true), rename.fieldName());
@@ -263,7 +271,7 @@ public TableSchema commitChanges(List changes)
field.description()));
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
- validateNotPrimaryAndPartitionKey(schema, drop.fieldName());
+ validateNotPrimaryAndPartitionKey(oldTableSchema, drop.fieldName());
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn) change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
@@ -274,7 +282,7 @@ public TableSchema commitChanges(List changes)
}
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
- if (schema.partitionKeys().contains(update.fieldName())) {
+ if (oldTableSchema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
"Cannot update partition column [%s] type in the table[%s].",
@@ -310,7 +318,7 @@ public TableSchema commitChanges(List changes)
UpdateColumnNullability update = (UpdateColumnNullability) change;
if (update.fieldNames().length == 1
&& update.newNullability()
- && schema.primaryKeys().contains(update.fieldNames()[0])) {
+ && oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
throw new UnsupportedOperationException(
"Cannot change nullability of primary key");
}
@@ -346,20 +354,29 @@ public TableSchema commitChanges(List changes)
}
}
- TableSchema newSchema =
- new TableSchema(
- schema.id() + 1,
+ // We change TableSchema to Schema, because we want to deal with primary-key and
+ // partition in options.
+ Schema newSchema =
+ new Schema(
newFields,
- highestFieldId.get(),
- schema.partitionKeys(),
- schema.primaryKeys(),
+ oldTableSchema.partitionKeys(),
+ oldTableSchema.primaryKeys(),
newOptions,
newComment);
+ TableSchema newTableSchema =
+ new TableSchema(
+ oldTableSchema.id() + 1,
+ newSchema.fields(),
+ highestFieldId.get(),
+ newSchema.partitionKeys(),
+ newSchema.primaryKeys(),
+ newSchema.options(),
+ newSchema.comment());
try {
- boolean success = commit(newSchema);
+ boolean success = commit(newTableSchema);
if (success) {
- return newSchema;
+ return newTableSchema;
}
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 742e2188f2b0..4bd965268f00 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -19,9 +19,18 @@
package org.apache.paimon.schema;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -441,4 +450,81 @@ public void testMoveBefore() {
Assertions.assertEquals(
2, fields.get(0).id(), "The field id should remain as 2 after moving f2 before f0");
}
+
+ @Test
+ public void testAlterImmutableOptionsOnEmptyTable() throws Exception {
+ // create table without primary keys
+ Schema schema =
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ "");
+ Path tableRoot = new Path(tempDir.toString(), "table");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(), tableRoot);
+ manager.createTable(schema);
+
+ // set immutable options and set primary keys
+ manager.commitChanges(
+ SchemaChange.setOption("primary-key", "f0, f1"),
+ SchemaChange.setOption("partition", "f0"),
+ SchemaChange.setOption("bucket", "2"),
+ SchemaChange.setOption("merge-engine", "first-row"));
+
+ FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+ assertThat(table.schema().partitionKeys()).containsExactly("f0");
+ assertThat(table.schema().primaryKeys()).containsExactly("f0", "f1");
+
+ // read and write data to check that table is really a primary key table with first-row
+ // merge engine
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl> write =
+ table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+ TableCommitImpl commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple")));
+ write.write(GenericRow.of(1, 20L, BinaryString.fromString("banana")));
+ write.write(GenericRow.of(2, 10L, BinaryString.fromString("cat")));
+ write.write(GenericRow.of(2, 20L, BinaryString.fromString("dog")));
+ commit.commit(1, write.prepareCommit(false, 1));
+ write.write(GenericRow.of(1, 20L, BinaryString.fromString("peach")));
+ write.write(GenericRow.of(1, 30L, BinaryString.fromString("mango")));
+ write.write(GenericRow.of(2, 20L, BinaryString.fromString("tiger")));
+ write.write(GenericRow.of(2, 30L, BinaryString.fromString("wolf")));
+ commit.commit(2, write.prepareCommit(false, 2));
+ write.close();
+ commit.close();
+
+ List actual = new ArrayList<>();
+ try (RecordReaderIterator it =
+ new RecordReaderIterator<>(
+ table.newRead().createReader(table.newSnapshotReader().read()))) {
+ while (it.hasNext()) {
+ InternalRow row = it.next();
+ actual.add(
+ String.format(
+ "%s %d %d %s",
+ row.getRowKind().shortString(),
+ row.getInt(0),
+ row.getLong(1),
+ row.getString(2)));
+ }
+ }
+ assertThat(actual)
+ .containsExactlyInAnyOrder(
+ "+I 1 10 apple",
+ "+I 1 20 banana",
+ "+I 1 30 mango",
+ "+I 2 10 cat",
+ "+I 2 20 dog",
+ "+I 2 30 wolf");
+
+ // now that table is not empty, we cannot change immutable options
+ assertThatThrownBy(
+ () ->
+ manager.commitChanges(
+ SchemaChange.setOption("merge-engine", "deduplicate")))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Change 'merge-engine' is not supported yet.");
+ }
}
diff --git a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index adaa5b28c4e8..19ad41cae5ac 100644
--- a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink;
+import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import java.util.Map;
@@ -44,9 +45,29 @@ public void testSetAndRemoveOption() throws Exception {
}
@Test
- public void testSetAndResetImmutableOptions() {
+ public void testSetAndResetImmutableOptionsOnEmptyTables() {
+ sql("CREATE TABLE T1 (a INT, b INT)");
+ sql(
+ "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 'merge-engine' = 'first-row')");
+ sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+ assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 10), Row.of(2, 20));
+ assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 'deduplicate')"))
+ .rootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Change 'merge-engine' is not supported yet.");
+
+ sql(
+ "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('bucket' = '1', 'merge-engine' = 'first-row')");
+ sql("ALTER TABLE T2 RESET ('merge-engine')");
+ sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+ assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 11), Row.of(2, 21));
+ }
+
+ @Test
+ public void testSetAndResetImmutableOptionsOnNonEmptyTables() {
// bucket-key is immutable
sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)");
+ sql("INSERT INTO T1 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 'c')"))
.rootCause()
@@ -55,6 +76,7 @@ public void testSetAndResetImmutableOptions() {
sql(
"CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')");
+ sql("INSERT INTO T2 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
@@ -63,6 +85,7 @@ public void testSetAndResetImmutableOptions() {
// merge-engine is immutable
sql(
"CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH ('merge-engine' = 'partial-update')");
+ sql("INSERT INTO T4 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
@@ -70,6 +93,7 @@ public void testSetAndResetImmutableOptions() {
// sequence.field is immutable
sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH ('sequence.field' = 'b')");
+ sql("INSERT INTO T5 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 'c')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index fc5a3dbe0545..81f07b224ca7 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -846,10 +846,30 @@ public void testSetAndRemoveOption() throws Exception {
}
@Test
- public void testSetAndResetImmutableOptions() throws Exception {
+ public void testSetAndResetImmutableOptionsOnEmptyTables() {
+ sql("CREATE TABLE T1 (a INT, b INT)");
+ sql(
+ "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 'merge-engine' = 'first-row')");
+ sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+ assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 10), Row.of(2, 20));
+ assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 'deduplicate')"))
+ .rootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Change 'merge-engine' is not supported yet.");
+
+ sql(
+ "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('bucket' = '1', 'merge-engine' = 'first-row')");
+ sql("ALTER TABLE T2 RESET ('merge-engine')");
+ sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
+ assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 11), Row.of(2, 21));
+ }
+
+ @Test
+ public void testSetAndResetImmutableOptionsOnNonEmptyTables() {
// bucket-key is immutable
sql(
"CREATE TABLE T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'a')");
+ sql("INSERT INTO T1 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 'c')"))
.rootCause()
@@ -858,6 +878,7 @@ public void testSetAndResetImmutableOptions() throws Exception {
sql(
"CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')");
+ sql("INSERT INTO T2 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
@@ -866,6 +887,7 @@ public void testSetAndResetImmutableOptions() throws Exception {
// merge-engine is immutable
sql(
"CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH ('merge-engine' = 'partial-update')");
+ sql("INSERT INTO T4 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
@@ -873,6 +895,7 @@ public void testSetAndResetImmutableOptions() throws Exception {
// sequence.field is immutable
sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH ('sequence.field' = 'b')");
+ sql("INSERT INTO T5 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 'c')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
From 6435dd2053f7b094a33c0358615d7a3ddf1c4e92 Mon Sep 17 00:00:00 2001
From: Yann Byron
Date: Tue, 30 Jul 2024 19:39:05 +0800
Subject: [PATCH 6/7] [core][spark] check column nullability when write (#3842)
---
.../table/AppendOnlyFileStoreTable.java | 1 +
.../table/PrimaryKeyFileStoreTable.java | 1 +
.../paimon/table/sink/TableWriteImpl.java | 27 ++++++++
.../catalyst/analysis/PaimonAnalysis.scala | 20 ++----
.../apache/paimon/spark/sql/DDLTestBase.scala | 64 +++++++++++++++----
5 files changed, 84 insertions(+), 29 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 40eeb4d28789..0af78a5dac8b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -139,6 +139,7 @@ public TableWriteImpl newWrite(
AppendOnlyFileStoreWrite writer =
store().newWrite(commitUser, manifestFilter).withBucketMode(bucketMode());
return new TableWriteImpl<>(
+ rowType(),
writer,
createRowKeyExtractor(),
(record, rowKind) -> {
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 6ac2763ace66..b1e5b5366c3d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -160,6 +160,7 @@ public TableWriteImpl newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
+ rowType(),
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
(record, rowKind) ->
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 6e2194646d2a..580d7f4c4f6e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -30,13 +30,16 @@
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Restorable;
import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -47,6 +50,7 @@
*/
public class TableWriteImpl implements InnerTableWrite, Restorable>> {
+ private final RowType rowType;
private final FileStoreWrite write;
private final KeyAndBucketExtractor keyAndBucketExtractor;
private final RecordExtractor recordExtractor;
@@ -56,17 +60,28 @@ public class TableWriteImpl implements InnerTableWrite, Restorable write,
KeyAndBucketExtractor keyAndBucketExtractor,
RecordExtractor recordExtractor,
@Nullable RowKindGenerator rowKindGenerator,
boolean ignoreDelete) {
+ this.rowType = rowType;
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
this.rowKindGenerator = rowKindGenerator;
this.ignoreDelete = ignoreDelete;
+
+ List notNullColumnNames =
+ rowType.getFields().stream()
+ .filter(field -> !field.type().isNullable())
+ .map(DataField::name)
+ .collect(Collectors.toList());
+ this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
}
@Override
@@ -137,6 +152,7 @@ public void write(InternalRow row, int bucket) throws Exception {
@Nullable
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
+ checkNullability(row);
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return null;
@@ -148,6 +164,7 @@ public SinkRecord writeAndReturn(InternalRow row) throws Exception {
@Nullable
public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception {
+ checkNullability(row);
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return null;
@@ -157,6 +174,16 @@ public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception {
return record;
}
+ private void checkNullability(InternalRow row) {
+ for (int idx : notNullFieldIndex) {
+ if (row.isNullAt(idx)) {
+ String columnName = rowType.getFields().get(idx).name();
+ throw new RuntimeException(
+ String.format("Cannot write null to non-null column(%s)", columnName));
+ }
+ }
+ }
+
private SinkRecord toSinkRecord(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
return new SinkRecord(
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index 67685612664d..3dc0e40c9eff 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -26,11 +26,11 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import scala.collection.JavaConverters._
@@ -58,8 +58,8 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
}
private def schemaCompatible(
- tableSchema: StructType,
dataSchema: StructType,
+ tableSchema: StructType,
partitionCols: Seq[String],
parent: Array[String] = Array.empty): Boolean = {
@@ -82,9 +82,8 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
}
}
- tableSchema.zip(dataSchema).forall {
+ dataSchema.zip(tableSchema).forall {
case (f1, f2) =>
- checkNullability(f1, f2, partitionCols, parent)
f1.name == f2.name && dataTypeCompatible(f1.name, f1.dataType, f2.dataType)
}
}
@@ -115,17 +114,6 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
cast.setTagValue(Compatibility.castByTableInsertionTag, ())
cast
}
-
- private def checkNullability(
- input: StructField,
- expected: StructField,
- partitionCols: Seq[String],
- parent: Array[String] = Array.empty): Unit = {
- val fullColumnName = (parent ++ Array(input.name)).mkString(".")
- if (!partitionCols.contains(fullColumnName) && input.nullable && !expected.nullable) {
- throw new RuntimeException("Cannot write nullable values to non-null column")
- }
- }
}
case class PaimonPostHocResolutionRules(session: SparkSession) extends Rule[LogicalPlan] {
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index da40171042a1..db749a63619b 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.schema.Schema
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.types.DataTypes
+import org.apache.spark.SparkException
import org.apache.spark.sql.Row
import org.junit.jupiter.api.Assertions
@@ -33,33 +34,70 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
import testImplicits._
- test("Paimon DDL: create table with not null") {
+ test("Paimon DDL: create append table with not null") {
withTable("T") {
- sql("""
- |CREATE TABLE T (id INT NOT NULL, name STRING)
- |""".stripMargin)
+ sql("CREATE TABLE T (id INT NOT NULL, name STRING)")
- val exception = intercept[RuntimeException] {
- sql("""
- |INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")
- |""".stripMargin)
+ val e1 = intercept[SparkException] {
+ sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")""")
}
- Assertions.assertTrue(
- exception.getMessage().contains("Cannot write nullable values to non-null column"))
+ Assertions.assertTrue(e1.getMessage().contains("Cannot write null to non-null column"))
+
+ sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)""")
+ checkAnswer(
+ sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a"), (2, "b"), (3, null)).toDF()
+ )
+ val schema = spark.table("T").schema
+ Assertions.assertEquals(schema.size, 2)
+ Assertions.assertFalse(schema("id").nullable)
+ Assertions.assertTrue(schema("name").nullable)
+ }
+ }
+ test("Paimon DDL: create primary-key table with not null") {
+ withTable("T") {
sql("""
- |INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |TBLPROPERTIES ('primary-key' = 'id,pt')
|""".stripMargin)
+ val e1 = intercept[SparkException] {
+ sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", null)""")
+ }
+ Assertions.assertTrue(e1.getMessage().contains("Cannot write null to non-null column"))
+
+ val e2 = intercept[SparkException] {
+ sql("""INSERT INTO T VALUES (1, "a", "pt1"), (null, "b", "pt2")""")
+ }
+ Assertions.assertTrue(e2.getMessage().contains("Cannot write null to non-null column"))
+
+ sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", "pt1"), (3, null, "pt2")""")
checkAnswer(
sql("SELECT * FROM T ORDER BY id"),
- Seq((1, "a"), (2, "b"), (3, null)).toDF()
+ Seq((1, "a", "pt1"), (2, "b", "pt1"), (3, null, "pt2")).toDF()
)
val schema = spark.table("T").schema
- Assertions.assertEquals(schema.size, 2)
+ Assertions.assertEquals(schema.size, 3)
Assertions.assertFalse(schema("id").nullable)
Assertions.assertTrue(schema("name").nullable)
+ Assertions.assertFalse(schema("pt").nullable)
+ }
+ }
+
+ test("Paimon DDL: write nullable expression to non-null column") {
+ withTable("T") {
+ sql("""
+ |CREATE TABLE T (id INT NOT NULL, ts TIMESTAMP NOT NULL)
+ |""".stripMargin)
+
+ sql("INSERT INTO T SELECT 1, TO_TIMESTAMP('2024-07-01 16:00:00')")
+
+ checkAnswer(
+ sql("SELECT * FROM T ORDER BY id"),
+ Row(1, Timestamp.valueOf("2024-07-01 16:00:00")) :: Nil
+ )
}
}
From 63cb0f908a16215809720e4b167294f800cf01c9 Mon Sep 17 00:00:00 2001
From: xuzifu666 <1206332514@qq.com>
Date: Tue, 30 Jul 2024 19:46:44 +0800
Subject: [PATCH 7/7] [core] Support NumericPrimitiveToTimestamp in casting
(#3832)
---
.../apache/paimon/casting/CastExecutors.java | 1 +
.../casting/NumericPrimitiveToTimestamp.java | 62 +++++++++++++++++++
.../paimon/casting/CastExecutorTest.java | 17 +++++
3 files changed, 80 insertions(+)
create mode 100644 paimon-core/src/main/java/org/apache/paimon/casting/NumericPrimitiveToTimestamp.java
diff --git a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
index 10ad114e60a7..054d6fca17c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
@@ -45,6 +45,7 @@ public class CastExecutors {
.addRule(NumericPrimitiveToDecimalCastRule.INSTANCE)
.addRule(DecimalToNumericPrimitiveCastRule.INSTANCE)
.addRule(NumericPrimitiveCastRule.INSTANCE)
+ .addRule(NumericPrimitiveToTimestamp.INSTANCE)
// Boolean <-> numeric rules
.addRule(BooleanToNumericCastRule.INSTANCE)
.addRule(NumericToBooleanCastRule.INSTANCE)
diff --git a/paimon-core/src/main/java/org/apache/paimon/casting/NumericPrimitiveToTimestamp.java b/paimon-core/src/main/java/org/apache/paimon/casting/NumericPrimitiveToTimestamp.java
new file mode 100644
index 000000000000..b8c667cee13f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/NumericPrimitiveToTimestamp.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.utils.DateTimeUtils;
+
+import java.time.ZoneId;
+
+/**
+ * {{@link DataTypeFamily#INTEGER_NUMERIC} to @link DataTypeRoot#TIMESTAMP_WITHOUT_TIME_ZONE}/{@link
+ * DataTypeRoot#TIMESTAMP_WITH_LOCAL_TIME_ZONE}.
+ */
+public class NumericPrimitiveToTimestamp extends AbstractCastRule {
+
+ static final NumericPrimitiveToTimestamp INSTANCE = new NumericPrimitiveToTimestamp();
+
+ private NumericPrimitiveToTimestamp() {
+ super(
+ CastRulePredicate.builder()
+ .input(DataTypeFamily.NUMERIC)
+ .target(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+ .target(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ .build());
+ }
+
+ @Override
+ public CastExecutor create(DataType inputType, DataType targetType) {
+ ZoneId zoneId =
+ targetType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ ? ZoneId.systemDefault()
+ : DateTimeUtils.UTC_ZONE.toZoneId();
+ switch (inputType.getTypeRoot()) {
+ case INTEGER:
+ case BIGINT:
+ return value ->
+ Timestamp.fromLocalDateTime(
+ DateTimeUtils.toLocalDateTime(value.longValue() * 1000, zoneId));
+ default:
+ return null;
+ }
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/casting/CastExecutorTest.java b/paimon-core/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
index d80cb1a6c7bd..ff805993c5bd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
@@ -101,6 +101,23 @@ public void testNumericToNumeric() {
CastExecutors.resolve(new FloatType(false), new DoubleType(false)), 1F, 1D);
}
+ @Test
+ public void testNumericToTimestamp() {
+ compareCastResult(
+ CastExecutors.resolve(new BigIntType(false), new TimestampType(3)),
+ 1721898748,
+ DateTimeUtils.parseTimestampData("2024-07-25 09:12:28.000", 3));
+
+ Timestamp timestamp = Timestamp.fromEpochMillis(1721898748000L);
+ String tsString = DateTimeUtils.formatTimestamp(timestamp, TimeZone.getDefault(), 3);
+ Timestamp timestamp1 = DateTimeUtils.parseTimestampData(tsString, 3);
+
+ compareCastResult(
+ CastExecutors.resolve(new BigIntType(false), new LocalZonedTimestampType(3)),
+ 1721898748L,
+ timestamp1);
+ }
+
@Test
public void testNumericToDecimal() {
compareCastResult(