diff --git a/docs/content/flink/quick-start.md b/docs/content/flink/quick-start.md index 62559065ec9a..e50acfe484e1 100644 --- a/docs/content/flink/quick-start.md +++ b/docs/content/flink/quick-start.md @@ -269,11 +269,16 @@ SELECT * FROM ....; ## Setting dynamic options When interacting with the Paimon table, table options can be tuned without changing the options in the catalog. Paimon will extract job-level dynamic options and take effect in the current session. -The dynamic option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts. +The dynamic table option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts. +The dynamic global option's key format is `${config_key}`. Global options will take effect for all the tables. Table options will override global options if there are conflicts. For example: ```sql +-- set scan.timestamp-millis=1697018249001 for all tables +SET 'scan.timestamp-millis' = '1697018249001'; +SELECT * FROM T; + -- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000'; SELECT * FROM T; @@ -281,4 +286,10 @@ SELECT * FROM T; -- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000'; SELECT * FROM T; + +-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T1 +-- set scan.timestamp-millis=1697018249001 for others tables +SET 'paimon.mycatalog.default.T1.scan.timestamp-millis' = '1697018249000'; +SET 'scan.timestamp-millis' = '1697018249001'; +SELECT * FROM T1 JOIN T2 ON xxxx; ``` diff --git a/docs/content/spark/auxiliary.md b/docs/content/spark/auxiliary.md index b822d1b898de..6330ca27ce31 100644 --- a/docs/content/spark/auxiliary.md +++ b/docs/content/spark/auxiliary.md @@ -29,17 +29,35 @@ under the License. ## Set / Reset The SET command sets a property, returns the value of an existing property or returns all SQLConf properties with value and meaning. The RESET command resets runtime configurations specific to the current session which were set via the SET command to their default values. -To set paimon configs specifically, you need add the `spark.paimon.` prefix. +To set dynamic options globally, you need add the `spark.paimon.` prefix. You can also set dynamic table options at this format: +`spark.paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all +the specific parts. Dynamic table options will override global options if there are conflicts. ```sql -- set spark conf SET spark.sql.sources.partitionOverwriteMode=dynamic; - + -- set paimon conf SET spark.paimon.file.block-size=512M; -- reset conf RESET spark.paimon.file.block-size; + +-- set catalog +USE paimon; + +-- set scan.snapshot-id=1 for the table default.T in any catalogs +SET spark.paimon.*.default.T.scan.snapshot-id=1; +SELECT * FROM default.T; + +-- set scan.snapshot-id=1 for the table T in any databases and catalogs +SET spark.paimon.*.*.T.scan.snapshot-id=1; +SELECT * FROM default.T; + +-- set scan.snapshot-id=2 for the table default.T1 in any catalogs and scan.snapshot-id=1 on other tables +SET spark.paimon.scan.snapshot-id=1; +SET spark.paimon.*.default.T1.scan.snapshot-id=2; +SELECT * FROM default.T1 JOIN default.T2 ON xxxx; ``` ## Describe table diff --git a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java index 47eb45007197..a625454f3996 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java @@ -27,6 +27,8 @@ import java.util.Locale; import java.util.Map; import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.paimon.options.StructuredOptionsSplitter.escapeWithSingleQuote; @@ -302,6 +304,34 @@ public static Map convertToPropertiesPrefixKey( return properties; } + public static Map convertToDynamicTableProperties( + Map confData, + String globalOptionKeyPrefix, + Pattern tableOptionKeyPattern, + int keyGroup) { + Map globalOptions = new HashMap<>(); + Map tableOptions = new HashMap<>(); + + confData.keySet().stream() + .filter(k -> k.startsWith(globalOptionKeyPrefix)) + .forEach( + k -> { + Matcher matcher = tableOptionKeyPattern.matcher(k); + if (matcher.find()) { + tableOptions.put( + matcher.group(keyGroup), convertToString(confData.get(k))); + } else { + globalOptions.put( + k.substring(globalOptionKeyPrefix.length()), + convertToString(confData.get(k))); + } + }); + + // table options should override global options for the same key + globalOptions.putAll(tableOptions); + return globalOptions; + } + static boolean containsPrefixMap(Map confData, String key) { return confData.keySet().stream().anyMatch(candidate -> filterPrefixMapKey(key, candidate)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index 27eef48b227b..9f90a2cd0130 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -35,6 +35,7 @@ import org.apache.paimon.lineage.TableLineageEntity; import org.apache.paimon.lineage.TableLineageEntityImpl; import org.apache.paimon.options.Options; +import org.apache.paimon.options.OptionsUtils; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; @@ -71,7 +72,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.BiConsumer; -import java.util.regex.Matcher; import java.util.regex.Pattern; import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE; @@ -239,7 +239,7 @@ Table buildPaimonTable(DynamicTableFactory.Context context) { CatalogTable origin = context.getCatalogTable().getOrigin(); Table table; - Map dynamicOptions = getDynamicTableConfigOptions(context); + Map dynamicOptions = getDynamicConfigOptions(context); dynamicOptions.forEach( (key, newValue) -> { String oldValue = origin.getOptions().get(key); @@ -249,6 +249,7 @@ Table buildPaimonTable(DynamicTableFactory.Context context) { }); Map newOptions = new HashMap<>(); newOptions.putAll(origin.getOptions()); + // dynamic options should override origin options newOptions.putAll(dynamicOptions); FileStoreTable fileStoreTable; @@ -324,16 +325,19 @@ static boolean schemaEquals(RowType rowType1, RowType rowType2) { /** * The dynamic option's format is: * - *

{@link - * FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}.${catalog}.${database}.${tableName}.key = - * value. These job level configs will be extracted and injected into the target table option. + *

Global Options: key = value . + * + *

Table Options: {@link + * FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}${catalog}.${database}.${tableName}.key = + * value. + * + *

These job level options will be extracted and injected into the target table option. Table + * options will override global options if there are conflicts. * * @param context The table factory context. * @return The dynamic options of this target table. */ - static Map getDynamicTableConfigOptions(DynamicTableFactory.Context context) { - - Map optionsFromTableConfig = new HashMap<>(); + static Map getDynamicConfigOptions(DynamicTableFactory.Context context) { ReadableConfig config = context.getConfiguration(); @@ -349,23 +353,14 @@ static Map getDynamicTableConfigOptions(DynamicTableFactory.Cont String template = String.format( - "(%s)\\.(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)", + "(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)", FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX, context.getObjectIdentifier().getCatalogName(), context.getObjectIdentifier().getDatabaseName(), context.getObjectIdentifier().getObjectName()); Pattern pattern = Pattern.compile(template); - - conf.keySet() - .forEach( - (key) -> { - if (key.startsWith(FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX)) { - Matcher matcher = pattern.matcher(key); - if (matcher.find()) { - optionsFromTableConfig.put(matcher.group(5), conf.get(key)); - } - } - }); + Map optionsFromTableConfig = + OptionsUtils.convertToDynamicTableProperties(conf, "", pattern, 5); if (!optionsFromTableConfig.isEmpty()) { LOG.info( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 55e21a35470e..e7bc6d23df8c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -44,7 +44,7 @@ public class FlinkConnectorOptions { public static final String NONE = "none"; - public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon"; + public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon."; public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java index aeb46da8d1b7..38d48fa21d2d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java @@ -63,8 +63,7 @@ public void testSchemaEquals() { @Test public void testGetDynamicOptions() { Configuration configuration = new Configuration(); - configuration.setString("paimon.catalog1.db.T.k1", "v1"); - configuration.setString("paimon.*.db.*.k2", "v2"); + configuration.setString("k1", "v2"); ObjectIdentifier identifier = ObjectIdentifier.of("catalog1", "db", "T"); DynamicTableFactory.Context context = new FactoryUtil.DefaultDynamicTableContext( @@ -74,9 +73,25 @@ public void testGetDynamicOptions() { configuration, AbstractFlinkTableFactoryTest.class.getClassLoader(), false); - Map options = - AbstractFlinkTableFactory.getDynamicTableConfigOptions(context); - assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2")); + Map options = AbstractFlinkTableFactory.getDynamicConfigOptions(context); + assertThat(options).isEqualTo(ImmutableMap.of("k1", "v2")); + + configuration = new Configuration(); + configuration.setString("k1", "v2"); + configuration.setString("k3", "v3"); + configuration.setString("paimon.catalog1.db.T.k1", "v1"); + configuration.setString("paimon.*.db.*.k2", "v2"); + identifier = ObjectIdentifier.of("catalog1", "db", "T"); + context = + new FactoryUtil.DefaultDynamicTableContext( + identifier, + null, + new HashMap<>(), + configuration, + AbstractFlinkTableFactoryTest.class.getClassLoader(), + false); + options = AbstractFlinkTableFactory.getDynamicConfigOptions(context); + assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2", "k3", "v3")); } private void innerTest(RowType r1, RowType r2, boolean expectEquals) { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index bf19d1ec700c..ae5ab8b6e96b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -503,7 +503,9 @@ protected org.apache.spark.sql.connector.catalog.Table loadSparkTable( if (paimonTable instanceof FormatTable) { return convertToFileTable(ident, (FormatTable) paimonTable); } else { - return new SparkTable(copyWithSQLConf(paimonTable, extraOptions)); + return new SparkTable( + copyWithSQLConf( + paimonTable, catalogName, toIdentifier(ident), extraOptions)); } } catch (Catalog.TableNotExistException e) { throw new NoSuchTableException(ident); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala index 67ab1312fa4e..0170a29f68d3 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala @@ -18,11 +18,13 @@ package org.apache.paimon.spark -import org.apache.paimon.catalog.CatalogContext +import org.apache.paimon.CoreOptions +import org.apache.paimon.catalog.{CatalogContext, CatalogUtils, Identifier} import org.apache.paimon.options.Options +import org.apache.paimon.spark.SparkSource.NAME import org.apache.paimon.spark.commands.WriteIntoPaimonTable import org.apache.paimon.spark.sources.PaimonSink -import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf +import org.apache.paimon.spark.util.OptionUtils.{extractCatalogName, mergeSQLConfWithIdentifier} import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory} import org.apache.paimon.table.system.AuditLogTable @@ -80,9 +82,15 @@ class SparkSource } private def loadTable(options: JMap[String, String]): DataTable = { + val path = CoreOptions.path(options) val catalogContext = CatalogContext.create( - Options.fromMap(mergeSQLConf(options)), - SparkSession.active.sessionState.newHadoopConf()) + Options.fromMap( + mergeSQLConfWithIdentifier( + options, + extractCatalogName().getOrElse(NAME), + Identifier.create(CatalogUtils.database(path), CatalogUtils.table(path)))), + SparkSession.active.sessionState.newHadoopConf() + ) val table = FileStoreTableFactory.create(catalogContext) if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) { new AuditLogTable(table) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala index 5b762ffb49de..b60dd1fb2173 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala @@ -18,34 +18,61 @@ package org.apache.paimon.spark.util +import org.apache.paimon.catalog.Identifier import org.apache.paimon.table.Table import org.apache.spark.sql.catalyst.SQLConfHelper -import java.util.{HashMap => JHashMap, Map => JMap} +import java.util.{Map => JMap} +import java.util.regex.Pattern import scala.collection.JavaConverters._ object OptionUtils extends SQLConfHelper { private val PAIMON_OPTION_PREFIX = "spark.paimon." + private val SPARK_CATALOG_PREFIX = "spark.sql.catalog." - def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, String] = { - val mergedOptions = new JHashMap[String, String]( - conf.getAllConfs - .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX)) - .map { - case (key, value) => - key.stripPrefix(PAIMON_OPTION_PREFIX) -> value - } - .toMap - .asJava) + def extractCatalogName(): Option[String] = { + val sparkCatalogTemplate = String.format("%s([^.]*)$", SPARK_CATALOG_PREFIX) + val sparkCatalogPattern = Pattern.compile(sparkCatalogTemplate) + conf.getAllConfs.filterKeys(_.startsWith(SPARK_CATALOG_PREFIX)).foreach { + case (key, _) => + val matcher = sparkCatalogPattern.matcher(key) + if (matcher.find()) + return Option(matcher.group(1)) + } + Option.empty + } + + def mergeSQLConfWithIdentifier( + extraOptions: JMap[String, String], + catalogName: String, + ident: Identifier): JMap[String, String] = { + val tableOptionsTemplate = String.format( + "(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)", + PAIMON_OPTION_PREFIX, + catalogName, + ident.getDatabaseName, + ident.getObjectName) + val tableOptionsPattern = Pattern.compile(tableOptionsTemplate) + val mergedOptions = org.apache.paimon.options.OptionsUtils + .convertToDynamicTableProperties( + conf.getAllConfs.asJava, + PAIMON_OPTION_PREFIX, + tableOptionsPattern, + 5) mergedOptions.putAll(extraOptions) mergedOptions } - def copyWithSQLConf[T <: Table](table: T, extraOptions: JMap[String, String]): T = { - val mergedOptions = mergeSQLConf(extraOptions) + def copyWithSQLConf[T <: Table]( + table: T, + catalogName: String, + ident: Identifier, + extraOptions: JMap[String, String]): T = { + val mergedOptions: JMap[String, String] = + mergeSQLConfWithIdentifier(extraOptions, catalogName, ident) if (mergedOptions.isEmpty) { table } else { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala index d35ac1d709c3..f74d6959b9f1 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala @@ -76,4 +76,130 @@ class PaimonOptionTest extends PaimonSparkTestBase { } } } + + test("Paimon Table Options: query one table with sql conf and table options") { + sql("CREATE TABLE T (id INT)") + sql("INSERT INTO T VALUES 1") + sql("INSERT INTO T VALUES 2") + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil) + val table = loadTable("T") + + // query with global options + withSQLConf("spark.paimon.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1)) + checkAnswer(spark.read.format("paimon").load(table.location().toString), Row(1)) + } + + // query with table options + withSQLConf("spark.paimon.*.*.T.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1)) + checkAnswer(spark.read.format("paimon").load(table.location().toString), Row(1)) + } + + // query with both global and table options + withSQLConf( + "spark.paimon.scan.snapshot-id" -> "1", + "spark.paimon.*.*.T.scan.snapshot-id" -> "2") { + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil) + checkAnswer( + spark.read.format("paimon").load(table.location().toString), + Row(1) :: Row(2) :: Nil) + } + } + + test("Paimon Table Options: query multiple tables with sql conf and table options") { + sql("CREATE TABLE T1 (id INT)") + sql("INSERT INTO T1 VALUES 1") + sql("INSERT INTO T1 VALUES 2") + + sql("CREATE TABLE T2 (id INT)") + sql("INSERT INTO T2 VALUES 1") + sql("INSERT INTO T2 VALUES 2") + checkAnswer( + sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), + Row(1, 1) :: Row(2, 2) :: Nil) + val table1 = loadTable("T1") + val table2 = loadTable("T1") + + // query with global options + withSQLConf("spark.paimon.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1)) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) + ) + } + + // query with table options + withSQLConf("spark.paimon.*.*.*.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1)) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) + ) + } + + // query with both global and table options + withSQLConf( + "spark.paimon.scan.snapshot-id" -> "1", + "spark.paimon.*.*.*.scan.snapshot-id" -> "2") { + checkAnswer( + sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), + Row(1, 1) :: Row(2, 2) :: Nil) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) :: Row(2) :: Nil + ) + } + + withSQLConf( + "spark.paimon.*.*.T1.scan.snapshot-id" -> "1", + "spark.paimon.*.*.T2.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1)) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) + ) + } + + withSQLConf( + "spark.paimon.*.*.T1.scan.snapshot-id" -> "1", + "spark.paimon.*.*.T2.scan.snapshot-id" -> "2") { + checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1)) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) + ) + } + + withSQLConf( + "spark.paimon.*.*.T1.scan.snapshot-id" -> "2", + "spark.paimon.*.*.T2.scan.snapshot-id" -> "2") { + checkAnswer( + sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), + Row(1, 1) :: Row(2, 2) :: Nil) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) :: Row(2) :: Nil + ) + } + } }