From 774f85cae411dbe78e41e2a31465698d32753e41 Mon Sep 17 00:00:00 2001 From: Xiangyu Feng Date: Fri, 1 Nov 2024 01:48:42 +0800 Subject: [PATCH] [core] Unify table and global dynamic options for both Flink and Spark engine --- docs/content/flink/quick-start.md | 13 ++++++- docs/content/spark/auxiliary.md | 14 ++++---- .../apache/paimon/options/OptionsUtils.java | 30 ++++++++++++++++ .../flink/AbstractFlinkTableFactory.java | 35 ++++++++----------- .../paimon/flink/FlinkConnectorOptions.java | 2 +- .../flink/AbstractFlinkTableFactoryTest.java | 25 ++++++++++--- .../paimon/spark/util/OptionUtils.scala | 33 ++++++----------- 7 files changed, 96 insertions(+), 56 deletions(-) diff --git a/docs/content/flink/quick-start.md b/docs/content/flink/quick-start.md index 62559065ec9a9..e50acfe484e1b 100644 --- a/docs/content/flink/quick-start.md +++ b/docs/content/flink/quick-start.md @@ -269,11 +269,16 @@ SELECT * FROM ....; ## Setting dynamic options When interacting with the Paimon table, table options can be tuned without changing the options in the catalog. Paimon will extract job-level dynamic options and take effect in the current session. -The dynamic option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts. +The dynamic table option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts. +The dynamic global option's key format is `${config_key}`. Global options will take effect for all the tables. Table options will override global options if there are conflicts. For example: ```sql +-- set scan.timestamp-millis=1697018249001 for all tables +SET 'scan.timestamp-millis' = '1697018249001'; +SELECT * FROM T; + -- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000'; SELECT * FROM T; @@ -281,4 +286,10 @@ SELECT * FROM T; -- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000'; SELECT * FROM T; + +-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T1 +-- set scan.timestamp-millis=1697018249001 for others tables +SET 'paimon.mycatalog.default.T1.scan.timestamp-millis' = '1697018249000'; +SET 'scan.timestamp-millis' = '1697018249001'; +SELECT * FROM T1 JOIN T2 ON xxxx; ``` diff --git a/docs/content/spark/auxiliary.md b/docs/content/spark/auxiliary.md index 5227c272cc20f..f7c140799ce66 100644 --- a/docs/content/spark/auxiliary.md +++ b/docs/content/spark/auxiliary.md @@ -29,9 +29,9 @@ under the License. ## Set / Reset The SET command sets a property, returns the value of an existing property or returns all SQLConf properties with value and meaning. The RESET command resets runtime configurations specific to the current session which were set via the SET command to their default values. -To set paimon configs specifically, you need add the `spark.paimon.` prefix. You can also set table specific options at this format: -`paimon.spark.${dbName}.${tableName}.${config_key}`. The dbName/tableName can be `*`, which means matching all the specific parts. Table -options will override global options if there are conflicts. +To set dynamic options globally, you need add the `spark.paimon.` prefix. You can also set dynamic table options at this format: +`spark.paimon.${dbName}.${tableName}.${config_key}`. The dbName/tableName can be `*`, which means matching all the specific parts. Dynamic +table options will override global options if there are conflicts. ```sql -- set spark conf @@ -44,16 +44,16 @@ SET spark.paimon.file.block-size=512M; RESET spark.paimon.file.block-size; -- set scan.snapshot-id=1 for the table default.T -SET paimon.spark.default.T.scan.snapshot-id=1; +SET spark.paimon.default.T.scan.snapshot-id=1; SELECT * FROM default.T; -- set scan.snapshot-id=1 for the table T in any databases -SET paimon.spark.*.T.scan.snapshot-id=1; +SET spark.paimon.*.T.scan.snapshot-id=1; SELECT * FROM default.T; -- set scan.snapshot-id=2 for the table default.T1 and scan.snapshot-id=1 on default.T2 -SET paimon.spark.scan.snapshot-id=1; -SET paimon.spark.default.T1.scan.snapshot-id=2; +SET spark.paimon.scan.snapshot-id=1; +SET spark.paimon.default.T1.scan.snapshot-id=2; SELECT * FROM default.T1 JOIN default.T2 ON xxxx; ``` diff --git a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java index 47eb45007197f..a625454f3996e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java @@ -27,6 +27,8 @@ import java.util.Locale; import java.util.Map; import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.paimon.options.StructuredOptionsSplitter.escapeWithSingleQuote; @@ -302,6 +304,34 @@ public static Map convertToPropertiesPrefixKey( return properties; } + public static Map convertToDynamicTableProperties( + Map confData, + String globalOptionKeyPrefix, + Pattern tableOptionKeyPattern, + int keyGroup) { + Map globalOptions = new HashMap<>(); + Map tableOptions = new HashMap<>(); + + confData.keySet().stream() + .filter(k -> k.startsWith(globalOptionKeyPrefix)) + .forEach( + k -> { + Matcher matcher = tableOptionKeyPattern.matcher(k); + if (matcher.find()) { + tableOptions.put( + matcher.group(keyGroup), convertToString(confData.get(k))); + } else { + globalOptions.put( + k.substring(globalOptionKeyPrefix.length()), + convertToString(confData.get(k))); + } + }); + + // table options should override global options for the same key + globalOptions.putAll(tableOptions); + return globalOptions; + } + static boolean containsPrefixMap(Map confData, String key) { return confData.keySet().stream().anyMatch(candidate -> filterPrefixMapKey(key, candidate)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index e469044f5f4fb..e2dbb99e282a7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -33,6 +33,7 @@ import org.apache.paimon.lineage.TableLineageEntity; import org.apache.paimon.lineage.TableLineageEntityImpl; import org.apache.paimon.options.Options; +import org.apache.paimon.options.OptionsUtils; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; @@ -69,7 +70,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.BiConsumer; -import java.util.regex.Matcher; import java.util.regex.Pattern; import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE; @@ -231,7 +231,7 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) { CatalogTable origin = context.getCatalogTable().getOrigin(); Table table; - Map dynamicOptions = getDynamicTableConfigOptions(context); + Map dynamicOptions = getDynamicConfigOptions(context); dynamicOptions.forEach( (key, newValue) -> { String oldValue = origin.getOptions().get(key); @@ -241,6 +241,7 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) { }); Map newOptions = new HashMap<>(); newOptions.putAll(origin.getOptions()); + // dynamic options should override origin options newOptions.putAll(dynamicOptions); // notice that the Paimon table schema must be the same with the Flink's @@ -304,16 +305,19 @@ static boolean schemaEquals(RowType rowType1, RowType rowType2) { /** * The dynamic option's format is: * - *

{@link - * FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}.${catalog}.${database}.${tableName}.key = - * value. These job level configs will be extracted and injected into the target table option. + *

Global Options: key = value . + * + *

Table Options: {@link + * FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}${catalog}.${database}.${tableName}.key = + * value. + * + *

These job level options will be extracted and injected into the target table option. Table + * options will override global options if there are conflicts. * * @param context The table factory context. * @return The dynamic options of this target table. */ - static Map getDynamicTableConfigOptions(DynamicTableFactory.Context context) { - - Map optionsFromTableConfig = new HashMap<>(); + static Map getDynamicConfigOptions(DynamicTableFactory.Context context) { ReadableConfig config = context.getConfiguration(); @@ -329,23 +333,14 @@ static Map getDynamicTableConfigOptions(DynamicTableFactory.Cont String template = String.format( - "(%s)\\.(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)", + "(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)", FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX, context.getObjectIdentifier().getCatalogName(), context.getObjectIdentifier().getDatabaseName(), context.getObjectIdentifier().getObjectName()); Pattern pattern = Pattern.compile(template); - - conf.keySet() - .forEach( - (key) -> { - if (key.startsWith(FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX)) { - Matcher matcher = pattern.matcher(key); - if (matcher.find()) { - optionsFromTableConfig.put(matcher.group(5), conf.get(key)); - } - } - }); + Map optionsFromTableConfig = + OptionsUtils.convertToDynamicTableProperties(conf, "", pattern, 5); if (!optionsFromTableConfig.isEmpty()) { LOG.info( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 2c12b70a2493e..4bcf11cf25c8c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -44,7 +44,7 @@ public class FlinkConnectorOptions { public static final String NONE = "none"; - public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon"; + public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon."; public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java index aeb46da8d1b79..38d48fa21d2d6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java @@ -63,8 +63,7 @@ public void testSchemaEquals() { @Test public void testGetDynamicOptions() { Configuration configuration = new Configuration(); - configuration.setString("paimon.catalog1.db.T.k1", "v1"); - configuration.setString("paimon.*.db.*.k2", "v2"); + configuration.setString("k1", "v2"); ObjectIdentifier identifier = ObjectIdentifier.of("catalog1", "db", "T"); DynamicTableFactory.Context context = new FactoryUtil.DefaultDynamicTableContext( @@ -74,9 +73,25 @@ public void testGetDynamicOptions() { configuration, AbstractFlinkTableFactoryTest.class.getClassLoader(), false); - Map options = - AbstractFlinkTableFactory.getDynamicTableConfigOptions(context); - assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2")); + Map options = AbstractFlinkTableFactory.getDynamicConfigOptions(context); + assertThat(options).isEqualTo(ImmutableMap.of("k1", "v2")); + + configuration = new Configuration(); + configuration.setString("k1", "v2"); + configuration.setString("k3", "v3"); + configuration.setString("paimon.catalog1.db.T.k1", "v1"); + configuration.setString("paimon.*.db.*.k2", "v2"); + identifier = ObjectIdentifier.of("catalog1", "db", "T"); + context = + new FactoryUtil.DefaultDynamicTableContext( + identifier, + null, + new HashMap<>(), + configuration, + AbstractFlinkTableFactoryTest.class.getClassLoader(), + false); + options = AbstractFlinkTableFactory.getDynamicConfigOptions(context); + assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2", "k3", "v3")); } private void innerTest(RowType r1, RowType r2, boolean expectEquals) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala index 3e841fb625fe9..f16d88057e5e8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala @@ -23,9 +23,11 @@ import org.apache.paimon.table.Table import org.apache.spark.sql.catalyst.SQLConfHelper -import java.util.{HashMap => JHashMap, Map => JMap} +import java.util.{Map => JMap} import java.util.regex.Pattern +import scala.collection.JavaConverters._ + object OptionUtils extends SQLConfHelper { private val PAIMON_OPTION_PREFIX = "spark.paimon." @@ -33,32 +35,20 @@ object OptionUtils extends SQLConfHelper { def mergeSQLConfWithIdentifier( extraOptions: JMap[String, String], ident: Identifier): JMap[String, String] = { - val tableOptionsTemplate = String.format( "(%s)(%s|\\*)\\.(%s|\\*)\\.(.+)", PAIMON_OPTION_PREFIX, ident.getDatabaseName, ident.getObjectName) val tableOptionsPattern = Pattern.compile(tableOptionsTemplate) - val globalOptions = new JHashMap[String, String] - val tableOptions = new JHashMap[String, String] - - conf.getAllConfs - .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX)) - .foreach { - case (key, value) => - val matcher = tableOptionsPattern.matcher(key) - if (matcher.find) { - tableOptions.put(matcher.group(4), value) - } else { - globalOptions.put(key.stripPrefix(PAIMON_OPTION_PREFIX), value) - } - } - - // table options should override global options for the same key - globalOptions.putAll(tableOptions) - globalOptions.putAll(extraOptions) - globalOptions + val mergedOptions = org.apache.paimon.options.OptionsUtils + .convertToDynamicTableProperties( + conf.getAllConfs.asJava, + PAIMON_OPTION_PREFIX, + tableOptionsPattern, + 4) + mergedOptions.putAll(extraOptions) + mergedOptions } def copyWithSQLConf[T <: Table]( @@ -67,7 +57,6 @@ object OptionUtils extends SQLConfHelper { extraOptions: JMap[String, String]): T = { val mergedOptions: JMap[String, String] = mergeSQLConfWithIdentifier(extraOptions, ident) - if (mergedOptions.isEmpty) { table } else {