Skip to content

Commit

Permalink
[core] Unify table and global dynamic options for both Flink and Spar…
Browse files Browse the repository at this point in the history
…k engine
  • Loading branch information
xiangyuf committed Nov 2, 2024
1 parent fa99bea commit 774f85c
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 56 deletions.
13 changes: 12 additions & 1 deletion docs/content/flink/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,27 @@ SELECT * FROM ....;
## Setting dynamic options

When interacting with the Paimon table, table options can be tuned without changing the options in the catalog. Paimon will extract job-level dynamic options and take effect in the current session.
The dynamic option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts.
The dynamic table option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts.
The dynamic global option's key format is `${config_key}`. Global options will take effect for all the tables. Table options will override global options if there are conflicts.

For example:

```sql
-- set scan.timestamp-millis=1697018249001 for all tables
SET 'scan.timestamp-millis' = '1697018249001';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T1
-- set scan.timestamp-millis=1697018249001 for others tables
SET 'paimon.mycatalog.default.T1.scan.timestamp-millis' = '1697018249000';
SET 'scan.timestamp-millis' = '1697018249001';
SELECT * FROM T1 JOIN T2 ON xxxx;
```
14 changes: 7 additions & 7 deletions docs/content/spark/auxiliary.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ under the License.
## Set / Reset
The SET command sets a property, returns the value of an existing property or returns all SQLConf properties with value and meaning.
The RESET command resets runtime configurations specific to the current session which were set via the SET command to their default values.
To set paimon configs specifically, you need add the `spark.paimon.` prefix. You can also set table specific options at this format:
`paimon.spark.${dbName}.${tableName}.${config_key}`. The dbName/tableName can be `*`, which means matching all the specific parts. Table
options will override global options if there are conflicts.
To set dynamic options globally, you need add the `spark.paimon.` prefix. You can also set dynamic table options at this format:
`spark.paimon.${dbName}.${tableName}.${config_key}`. The dbName/tableName can be `*`, which means matching all the specific parts. Dynamic
table options will override global options if there are conflicts.

```sql
-- set spark conf
Expand All @@ -44,16 +44,16 @@ SET spark.paimon.file.block-size=512M;
RESET spark.paimon.file.block-size;

-- set scan.snapshot-id=1 for the table default.T
SET paimon.spark.default.T.scan.snapshot-id=1;
SET spark.paimon.default.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=1 for the table T in any databases
SET paimon.spark.*.T.scan.snapshot-id=1;
SET spark.paimon.*.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=2 for the table default.T1 and scan.snapshot-id=1 on default.T2
SET paimon.spark.scan.snapshot-id=1;
SET paimon.spark.default.T1.scan.snapshot-id=2;
SET spark.paimon.scan.snapshot-id=1;
SET spark.paimon.default.T1.scan.snapshot-id=2;
SELECT * FROM default.T1 JOIN default.T2 ON xxxx;
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.options.StructuredOptionsSplitter.escapeWithSingleQuote;
Expand Down Expand Up @@ -302,6 +304,34 @@ public static Map<String, String> convertToPropertiesPrefixKey(
return properties;
}

public static Map<String, String> convertToDynamicTableProperties(
Map<String, String> confData,
String globalOptionKeyPrefix,
Pattern tableOptionKeyPattern,
int keyGroup) {
Map<String, String> globalOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();

confData.keySet().stream()
.filter(k -> k.startsWith(globalOptionKeyPrefix))
.forEach(
k -> {
Matcher matcher = tableOptionKeyPattern.matcher(k);
if (matcher.find()) {
tableOptions.put(
matcher.group(keyGroup), convertToString(confData.get(k)));
} else {
globalOptions.put(
k.substring(globalOptionKeyPrefix.length()),
convertToString(confData.get(k)));
}
});

// table options should override global options for the same key
globalOptions.putAll(tableOptions);
return globalOptions;
}

static boolean containsPrefixMap(Map<String, String> confData, String key) {
return confData.keySet().stream().anyMatch(candidate -> filterPrefixMapKey(key, candidate));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.lineage.TableLineageEntity;
import org.apache.paimon.lineage.TableLineageEntityImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -69,7 +70,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
Expand Down Expand Up @@ -231,7 +231,7 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
Table table;

Map<String, String> dynamicOptions = getDynamicTableConfigOptions(context);
Map<String, String> dynamicOptions = getDynamicConfigOptions(context);
dynamicOptions.forEach(
(key, newValue) -> {
String oldValue = origin.getOptions().get(key);
Expand All @@ -241,6 +241,7 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) {
});
Map<String, String> newOptions = new HashMap<>();
newOptions.putAll(origin.getOptions());
// dynamic options should override origin options
newOptions.putAll(dynamicOptions);

// notice that the Paimon table schema must be the same with the Flink's
Expand Down Expand Up @@ -304,16 +305,19 @@ static boolean schemaEquals(RowType rowType1, RowType rowType2) {
/**
* The dynamic option's format is:
*
* <p>{@link
* FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}.${catalog}.${database}.${tableName}.key =
* value. These job level configs will be extracted and injected into the target table option.
* <p>Global Options: key = value .
*
* <p>Table Options: {@link
* FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}${catalog}.${database}.${tableName}.key =
* value.
*
* <p>These job level options will be extracted and injected into the target table option. Table
* options will override global options if there are conflicts.
*
* @param context The table factory context.
* @return The dynamic options of this target table.
*/
static Map<String, String> getDynamicTableConfigOptions(DynamicTableFactory.Context context) {

Map<String, String> optionsFromTableConfig = new HashMap<>();
static Map<String, String> getDynamicConfigOptions(DynamicTableFactory.Context context) {

ReadableConfig config = context.getConfiguration();

Expand All @@ -329,23 +333,14 @@ static Map<String, String> getDynamicTableConfigOptions(DynamicTableFactory.Cont

String template =
String.format(
"(%s)\\.(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
"(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX,
context.getObjectIdentifier().getCatalogName(),
context.getObjectIdentifier().getDatabaseName(),
context.getObjectIdentifier().getObjectName());
Pattern pattern = Pattern.compile(template);

conf.keySet()
.forEach(
(key) -> {
if (key.startsWith(FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX)) {
Matcher matcher = pattern.matcher(key);
if (matcher.find()) {
optionsFromTableConfig.put(matcher.group(5), conf.get(key));
}
}
});
Map<String, String> optionsFromTableConfig =
OptionsUtils.convertToDynamicTableProperties(conf, "", pattern, 5);

if (!optionsFromTableConfig.isEmpty()) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class FlinkConnectorOptions {

public static final String NONE = "none";

public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";
public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon.";

public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public void testSchemaEquals() {
@Test
public void testGetDynamicOptions() {
Configuration configuration = new Configuration();
configuration.setString("paimon.catalog1.db.T.k1", "v1");
configuration.setString("paimon.*.db.*.k2", "v2");
configuration.setString("k1", "v2");
ObjectIdentifier identifier = ObjectIdentifier.of("catalog1", "db", "T");
DynamicTableFactory.Context context =
new FactoryUtil.DefaultDynamicTableContext(
Expand All @@ -74,9 +73,25 @@ public void testGetDynamicOptions() {
configuration,
AbstractFlinkTableFactoryTest.class.getClassLoader(),
false);
Map<String, String> options =
AbstractFlinkTableFactory.getDynamicTableConfigOptions(context);
assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2"));
Map<String, String> options = AbstractFlinkTableFactory.getDynamicConfigOptions(context);
assertThat(options).isEqualTo(ImmutableMap.of("k1", "v2"));

configuration = new Configuration();
configuration.setString("k1", "v2");
configuration.setString("k3", "v3");
configuration.setString("paimon.catalog1.db.T.k1", "v1");
configuration.setString("paimon.*.db.*.k2", "v2");
identifier = ObjectIdentifier.of("catalog1", "db", "T");
context =
new FactoryUtil.DefaultDynamicTableContext(
identifier,
null,
new HashMap<>(),
configuration,
AbstractFlinkTableFactoryTest.class.getClassLoader(),
false);
options = AbstractFlinkTableFactory.getDynamicConfigOptions(context);
assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2", "k3", "v3"));
}

private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,32 @@ import org.apache.paimon.table.Table

import org.apache.spark.sql.catalyst.SQLConfHelper

import java.util.{HashMap => JHashMap, Map => JMap}
import java.util.{Map => JMap}
import java.util.regex.Pattern

import scala.collection.JavaConverters._

object OptionUtils extends SQLConfHelper {

private val PAIMON_OPTION_PREFIX = "spark.paimon."

def mergeSQLConfWithIdentifier(
extraOptions: JMap[String, String],
ident: Identifier): JMap[String, String] = {

val tableOptionsTemplate = String.format(
"(%s)(%s|\\*)\\.(%s|\\*)\\.(.+)",
PAIMON_OPTION_PREFIX,
ident.getDatabaseName,
ident.getObjectName)
val tableOptionsPattern = Pattern.compile(tableOptionsTemplate)
val globalOptions = new JHashMap[String, String]
val tableOptions = new JHashMap[String, String]

conf.getAllConfs
.filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
.foreach {
case (key, value) =>
val matcher = tableOptionsPattern.matcher(key)
if (matcher.find) {
tableOptions.put(matcher.group(4), value)
} else {
globalOptions.put(key.stripPrefix(PAIMON_OPTION_PREFIX), value)
}
}

// table options should override global options for the same key
globalOptions.putAll(tableOptions)
globalOptions.putAll(extraOptions)
globalOptions
val mergedOptions = org.apache.paimon.options.OptionsUtils
.convertToDynamicTableProperties(
conf.getAllConfs.asJava,
PAIMON_OPTION_PREFIX,
tableOptionsPattern,
4)
mergedOptions.putAll(extraOptions)
mergedOptions
}

def copyWithSQLConf[T <: Table](
Expand All @@ -67,7 +57,6 @@ object OptionUtils extends SQLConfHelper {
extraOptions: JMap[String, String]): T = {
val mergedOptions: JMap[String, String] =
mergeSQLConfWithIdentifier(extraOptions, ident)

if (mergedOptions.isEmpty) {
table
} else {
Expand Down

0 comments on commit 774f85c

Please sign in to comment.