Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Support table options via SQL conf for Spark Engine #4393

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docs/content/flink/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,27 @@ SELECT * FROM ....;
## Setting dynamic options

When interacting with the Paimon table, table options can be tuned without changing the options in the catalog. Paimon will extract job-level dynamic options and take effect in the current session.
The dynamic option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts.
The dynamic table option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts.
The dynamic global option's key format is `${config_key}`. Global options will take effect for all the tables. Table options will override global options if there are conflicts.

For example:

```sql
-- set scan.timestamp-millis=1697018249001 for all tables
xiangyuf marked this conversation as resolved.
Show resolved Hide resolved
SET 'scan.timestamp-millis' = '1697018249001';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T1
xiangyuf marked this conversation as resolved.
Show resolved Hide resolved
-- set scan.timestamp-millis=1697018249001 for others tables
SET 'paimon.mycatalog.default.T1.scan.timestamp-millis' = '1697018249000';
SET 'scan.timestamp-millis' = '1697018249001';
SELECT * FROM T1 JOIN T2 ON xxxx;
```
22 changes: 20 additions & 2 deletions docs/content/spark/auxiliary.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,35 @@ under the License.
## Set / Reset
The SET command sets a property, returns the value of an existing property or returns all SQLConf properties with value and meaning.
The RESET command resets runtime configurations specific to the current session which were set via the SET command to their default values.
To set paimon configs specifically, you need add the `spark.paimon.` prefix.
To set dynamic options globally, you need add the `spark.paimon.` prefix. You can also set dynamic table options at this format:
`spark.paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all
the specific parts. Dynamic table options will override global options if there are conflicts.

```sql
-- set spark conf
SET spark.sql.sources.partitionOverwriteMode=dynamic;

-- set paimon conf
SET spark.paimon.file.block-size=512M;

-- reset conf
RESET spark.paimon.file.block-size;

-- set catalog
USE paimon;

-- set scan.snapshot-id=1 for the table default.T in any catalogs
SET spark.paimon.*.default.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=1 for the table T in any databases and catalogs
SET spark.paimon.*.*.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=2 for the table default.T1 in any catalogs and scan.snapshot-id=1 on other tables
SET spark.paimon.scan.snapshot-id=1;
SET spark.paimon.*.default.T1.scan.snapshot-id=2;
SELECT * FROM default.T1 JOIN default.T2 ON xxxx;
```

## Describe table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.options.StructuredOptionsSplitter.escapeWithSingleQuote;
Expand Down Expand Up @@ -302,6 +304,34 @@ public static Map<String, String> convertToPropertiesPrefixKey(
return properties;
}

public static Map<String, String> convertToDynamicTableProperties(
Map<String, String> confData,
String globalOptionKeyPrefix,
Pattern tableOptionKeyPattern,
int keyGroup) {
Map<String, String> globalOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();

confData.keySet().stream()
.filter(k -> k.startsWith(globalOptionKeyPrefix))
.forEach(
k -> {
Matcher matcher = tableOptionKeyPattern.matcher(k);
if (matcher.find()) {
tableOptions.put(
matcher.group(keyGroup), convertToString(confData.get(k)));
} else {
globalOptions.put(
k.substring(globalOptionKeyPrefix.length()),
convertToString(confData.get(k)));
}
});

// table options should override global options for the same key
globalOptions.putAll(tableOptions);
return globalOptions;
}

static boolean containsPrefixMap(Map<String, String> confData, String key) {
return confData.keySet().stream().anyMatch(candidate -> filterPrefixMapKey(key, candidate));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.lineage.TableLineageEntity;
import org.apache.paimon.lineage.TableLineageEntityImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -71,7 +72,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
Expand Down Expand Up @@ -239,7 +239,7 @@ Table buildPaimonTable(DynamicTableFactory.Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
Table table;

Map<String, String> dynamicOptions = getDynamicTableConfigOptions(context);
Map<String, String> dynamicOptions = getDynamicConfigOptions(context);
dynamicOptions.forEach(
(key, newValue) -> {
String oldValue = origin.getOptions().get(key);
Expand All @@ -249,6 +249,7 @@ Table buildPaimonTable(DynamicTableFactory.Context context) {
});
Map<String, String> newOptions = new HashMap<>();
newOptions.putAll(origin.getOptions());
// dynamic options should override origin options
newOptions.putAll(dynamicOptions);

FileStoreTable fileStoreTable;
Expand Down Expand Up @@ -324,16 +325,19 @@ static boolean schemaEquals(RowType rowType1, RowType rowType2) {
/**
* The dynamic option's format is:
*
* <p>{@link
* FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}.${catalog}.${database}.${tableName}.key =
* value. These job level configs will be extracted and injected into the target table option.
* <p>Global Options: key = value .
*
* <p>Table Options: {@link
* FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}${catalog}.${database}.${tableName}.key =
* value.
*
* <p>These job level options will be extracted and injected into the target table option. Table
* options will override global options if there are conflicts.
*
* @param context The table factory context.
* @return The dynamic options of this target table.
*/
static Map<String, String> getDynamicTableConfigOptions(DynamicTableFactory.Context context) {

Map<String, String> optionsFromTableConfig = new HashMap<>();
static Map<String, String> getDynamicConfigOptions(DynamicTableFactory.Context context) {

ReadableConfig config = context.getConfiguration();

Expand All @@ -349,23 +353,14 @@ static Map<String, String> getDynamicTableConfigOptions(DynamicTableFactory.Cont

String template =
String.format(
"(%s)\\.(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
"(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX,
context.getObjectIdentifier().getCatalogName(),
context.getObjectIdentifier().getDatabaseName(),
context.getObjectIdentifier().getObjectName());
Pattern pattern = Pattern.compile(template);

conf.keySet()
.forEach(
(key) -> {
if (key.startsWith(FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX)) {
Matcher matcher = pattern.matcher(key);
if (matcher.find()) {
optionsFromTableConfig.put(matcher.group(5), conf.get(key));
}
}
});
Map<String, String> optionsFromTableConfig =
OptionsUtils.convertToDynamicTableProperties(conf, "", pattern, 5);

if (!optionsFromTableConfig.isEmpty()) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class FlinkConnectorOptions {

public static final String NONE = "none";

public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";
public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon.";

public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public void testSchemaEquals() {
@Test
public void testGetDynamicOptions() {
Configuration configuration = new Configuration();
configuration.setString("paimon.catalog1.db.T.k1", "v1");
configuration.setString("paimon.*.db.*.k2", "v2");
configuration.setString("k1", "v2");
ObjectIdentifier identifier = ObjectIdentifier.of("catalog1", "db", "T");
DynamicTableFactory.Context context =
new FactoryUtil.DefaultDynamicTableContext(
Expand All @@ -74,9 +73,25 @@ public void testGetDynamicOptions() {
configuration,
AbstractFlinkTableFactoryTest.class.getClassLoader(),
false);
Map<String, String> options =
AbstractFlinkTableFactory.getDynamicTableConfigOptions(context);
assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2"));
Map<String, String> options = AbstractFlinkTableFactory.getDynamicConfigOptions(context);
assertThat(options).isEqualTo(ImmutableMap.of("k1", "v2"));

configuration = new Configuration();
configuration.setString("k1", "v2");
configuration.setString("k3", "v3");
configuration.setString("paimon.catalog1.db.T.k1", "v1");
configuration.setString("paimon.*.db.*.k2", "v2");
identifier = ObjectIdentifier.of("catalog1", "db", "T");
context =
new FactoryUtil.DefaultDynamicTableContext(
identifier,
null,
new HashMap<>(),
configuration,
AbstractFlinkTableFactoryTest.class.getClassLoader(),
false);
options = AbstractFlinkTableFactory.getDynamicConfigOptions(context);
assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2", "k3", "v3"));
}

private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,9 @@ protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
if (paimonTable instanceof FormatTable) {
return convertToFileTable(ident, (FormatTable) paimonTable);
} else {
return new SparkTable(copyWithSQLConf(paimonTable, extraOptions));
return new SparkTable(
copyWithSQLConf(
paimonTable, catalogName, toIdentifier(ident), extraOptions));
}
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.paimon.spark

import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.{CatalogContext, CatalogUtils, Identifier}
import org.apache.paimon.options.Options
import org.apache.paimon.spark.SparkSource.NAME
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.sources.PaimonSink
import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf
import org.apache.paimon.spark.util.OptionUtils.{extractCatalogName, mergeSQLConfWithIdentifier}
import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory}
import org.apache.paimon.table.system.AuditLogTable

Expand Down Expand Up @@ -80,9 +82,15 @@ class SparkSource
}

private def loadTable(options: JMap[String, String]): DataTable = {
val path = CoreOptions.path(options)
val catalogContext = CatalogContext.create(
Options.fromMap(mergeSQLConf(options)),
SparkSession.active.sessionState.newHadoopConf())
Options.fromMap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SparkSource loadTable, maybe we can just keep the original way, because it is simpler and more general through spark.read.format("paimon").options() to set table level config. And it is difficult to get catalogname and dbname here

mergeSQLConfWithIdentifier(
options,
extractCatalogName().getOrElse(NAME),
Identifier.create(CatalogUtils.database(path), CatalogUtils.table(path)))),
SparkSession.active.sessionState.newHadoopConf()
)
val table = FileStoreTableFactory.create(catalogContext)
if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
new AuditLogTable(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,61 @@

package org.apache.paimon.spark.util

import org.apache.paimon.catalog.Identifier
import org.apache.paimon.table.Table

import org.apache.spark.sql.catalyst.SQLConfHelper

import java.util.{HashMap => JHashMap, Map => JMap}
import java.util.{Map => JMap}
import java.util.regex.Pattern

import scala.collection.JavaConverters._

object OptionUtils extends SQLConfHelper {

private val PAIMON_OPTION_PREFIX = "spark.paimon."
private val SPARK_CATALOG_PREFIX = "spark.sql.catalog."

def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, String] = {
val mergedOptions = new JHashMap[String, String](
conf.getAllConfs
.filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
.map {
case (key, value) =>
key.stripPrefix(PAIMON_OPTION_PREFIX) -> value
}
.toMap
.asJava)
def extractCatalogName(): Option[String] = {
val sparkCatalogTemplate = String.format("%s([^.]*)$", SPARK_CATALOG_PREFIX)
val sparkCatalogPattern = Pattern.compile(sparkCatalogTemplate)
conf.getAllConfs.filterKeys(_.startsWith(SPARK_CATALOG_PREFIX)).foreach {
case (key, _) =>
val matcher = sparkCatalogPattern.matcher(key)
if (matcher.find())
return Option(matcher.group(1))
}
Option.empty
}

def mergeSQLConfWithIdentifier(
extraOptions: JMap[String, String],
catalogName: String,
ident: Identifier): JMap[String, String] = {
val tableOptionsTemplate = String.format(
"(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
PAIMON_OPTION_PREFIX,
catalogName,
ident.getDatabaseName,
ident.getObjectName)
val tableOptionsPattern = Pattern.compile(tableOptionsTemplate)
val mergedOptions = org.apache.paimon.options.OptionsUtils
.convertToDynamicTableProperties(
conf.getAllConfs.asJava,
PAIMON_OPTION_PREFIX,
tableOptionsPattern,
5)
mergedOptions.putAll(extraOptions)
mergedOptions
}

def copyWithSQLConf[T <: Table](table: T, extraOptions: JMap[String, String]): T = {
val mergedOptions = mergeSQLConf(extraOptions)
def copyWithSQLConf[T <: Table](
table: T,
catalogName: String,
ident: Identifier,
extraOptions: JMap[String, String]): T = {
val mergedOptions: JMap[String, String] =
mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
if (mergedOptions.isEmpty) {
table
} else {
Expand Down
Loading
Loading