From fa99beadad9d8f7333f1077c4283f08778c44a3e Mon Sep 17 00:00:00 2001 From: Xiangyu Feng Date: Mon, 28 Oct 2024 19:49:08 +0800 Subject: [PATCH] [spark] Support table dynamic options via SQL conf for Spark Engine --- docs/content/spark/auxiliary.md | 17 ++- .../org/apache/paimon/spark/SparkCatalog.java | 3 +- .../org/apache/paimon/spark/SparkSource.scala | 14 +- .../paimon/spark/util/OptionUtils.scala | 54 +++++--- .../paimon/spark/sql/PaimonOptionTest.scala | 126 ++++++++++++++++++ 5 files changed, 192 insertions(+), 22 deletions(-) diff --git a/docs/content/spark/auxiliary.md b/docs/content/spark/auxiliary.md index b822d1b898de5..5227c272cc20f 100644 --- a/docs/content/spark/auxiliary.md +++ b/docs/content/spark/auxiliary.md @@ -29,7 +29,9 @@ under the License. ## Set / Reset The SET command sets a property, returns the value of an existing property or returns all SQLConf properties with value and meaning. The RESET command resets runtime configurations specific to the current session which were set via the SET command to their default values. -To set paimon configs specifically, you need add the `spark.paimon.` prefix. +To set paimon configs specifically, you need add the `spark.paimon.` prefix. You can also set table specific options at this format: +`paimon.spark.${dbName}.${tableName}.${config_key}`. The dbName/tableName can be `*`, which means matching all the specific parts. Table +options will override global options if there are conflicts. ```sql -- set spark conf @@ -40,6 +42,19 @@ SET spark.paimon.file.block-size=512M; -- reset conf RESET spark.paimon.file.block-size; + +-- set scan.snapshot-id=1 for the table default.T +SET paimon.spark.default.T.scan.snapshot-id=1; +SELECT * FROM default.T; + +-- set scan.snapshot-id=1 for the table T in any databases +SET paimon.spark.*.T.scan.snapshot-id=1; +SELECT * FROM default.T; + +-- set scan.snapshot-id=2 for the table default.T1 and scan.snapshot-id=1 on default.T2 +SET paimon.spark.scan.snapshot-id=1; +SET paimon.spark.default.T1.scan.snapshot-id=2; +SELECT * FROM default.T1 JOIN default.T2 ON xxxx; ``` ## Describe table diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 0c827714a4e29..980c0f97a7358 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -538,7 +538,8 @@ protected org.apache.spark.sql.connector.catalog.Table loadSparkTable( + formatTable.format().name()); } } else { - return new SparkTable(copyWithSQLConf(paimonTable, extraOptions)); + return new SparkTable( + copyWithSQLConf(paimonTable, toIdentifier(ident), extraOptions)); } } catch (Catalog.TableNotExistException e) { throw new NoSuchTableException(ident); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala index 67ab1312fa4e3..37d4961caeaed 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala @@ -18,11 +18,12 @@ package org.apache.paimon.spark -import org.apache.paimon.catalog.CatalogContext +import org.apache.paimon.CoreOptions +import org.apache.paimon.catalog.{CatalogContext, CatalogUtils, Identifier} import org.apache.paimon.options.Options import org.apache.paimon.spark.commands.WriteIntoPaimonTable import org.apache.paimon.spark.sources.PaimonSink -import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf +import org.apache.paimon.spark.util.OptionUtils.mergeSQLConfWithIdentifier import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory} import org.apache.paimon.table.system.AuditLogTable @@ -80,9 +81,14 @@ class SparkSource } private def loadTable(options: JMap[String, String]): DataTable = { + val path = CoreOptions.path(options) val catalogContext = CatalogContext.create( - Options.fromMap(mergeSQLConf(options)), - SparkSession.active.sessionState.newHadoopConf()) + Options.fromMap( + mergeSQLConfWithIdentifier( + options, + Identifier.create(CatalogUtils.database(path), CatalogUtils.table(path)))), + SparkSession.active.sessionState.newHadoopConf() + ) val table = FileStoreTableFactory.create(catalogContext) if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) { new AuditLogTable(table) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala index 5b762ffb49dea..3e841fb625fe9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala @@ -18,34 +18,56 @@ package org.apache.paimon.spark.util +import org.apache.paimon.catalog.Identifier import org.apache.paimon.table.Table import org.apache.spark.sql.catalyst.SQLConfHelper import java.util.{HashMap => JHashMap, Map => JMap} - -import scala.collection.JavaConverters._ +import java.util.regex.Pattern object OptionUtils extends SQLConfHelper { private val PAIMON_OPTION_PREFIX = "spark.paimon." - def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, String] = { - val mergedOptions = new JHashMap[String, String]( - conf.getAllConfs - .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX)) - .map { - case (key, value) => - key.stripPrefix(PAIMON_OPTION_PREFIX) -> value - } - .toMap - .asJava) - mergedOptions.putAll(extraOptions) - mergedOptions + def mergeSQLConfWithIdentifier( + extraOptions: JMap[String, String], + ident: Identifier): JMap[String, String] = { + + val tableOptionsTemplate = String.format( + "(%s)(%s|\\*)\\.(%s|\\*)\\.(.+)", + PAIMON_OPTION_PREFIX, + ident.getDatabaseName, + ident.getObjectName) + val tableOptionsPattern = Pattern.compile(tableOptionsTemplate) + val globalOptions = new JHashMap[String, String] + val tableOptions = new JHashMap[String, String] + + conf.getAllConfs + .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX)) + .foreach { + case (key, value) => + val matcher = tableOptionsPattern.matcher(key) + if (matcher.find) { + tableOptions.put(matcher.group(4), value) + } else { + globalOptions.put(key.stripPrefix(PAIMON_OPTION_PREFIX), value) + } + } + + // table options should override global options for the same key + globalOptions.putAll(tableOptions) + globalOptions.putAll(extraOptions) + globalOptions } - def copyWithSQLConf[T <: Table](table: T, extraOptions: JMap[String, String]): T = { - val mergedOptions = mergeSQLConf(extraOptions) + def copyWithSQLConf[T <: Table]( + table: T, + ident: Identifier, + extraOptions: JMap[String, String]): T = { + val mergedOptions: JMap[String, String] = + mergeSQLConfWithIdentifier(extraOptions, ident) + if (mergedOptions.isEmpty) { table } else { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala index d35ac1d709c31..26beafd7609ba 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala @@ -76,4 +76,130 @@ class PaimonOptionTest extends PaimonSparkTestBase { } } } + + test("Paimon Table Options: query one table with sql conf and table options") { + sql("CREATE TABLE T (id INT)") + sql("INSERT INTO T VALUES 1") + sql("INSERT INTO T VALUES 2") + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil) + val table = loadTable("T") + + // query with global options + withSQLConf("spark.paimon.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1)) + checkAnswer(spark.read.format("paimon").load(table.location().toString), Row(1)) + } + + // query with table options + withSQLConf("spark.paimon.*.T.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1)) + checkAnswer(spark.read.format("paimon").load(table.location().toString), Row(1)) + } + + // query with both global and table options + withSQLConf( + "spark.paimon.scan.snapshot-id" -> "1", + "spark.paimon.*.T.scan.snapshot-id" -> "2") { + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil) + checkAnswer( + spark.read.format("paimon").load(table.location().toString), + Row(1) :: Row(2) :: Nil) + } + } + + test("Paimon Table Options: query multiple tables with sql conf and table options") { + sql("CREATE TABLE T1 (id INT)") + sql("INSERT INTO T1 VALUES 1") + sql("INSERT INTO T1 VALUES 2") + + sql("CREATE TABLE T2 (id INT)") + sql("INSERT INTO T2 VALUES 1") + sql("INSERT INTO T2 VALUES 2") + checkAnswer( + sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), + Row(1, 1) :: Row(2, 2) :: Nil) + val table1 = loadTable("T1") + val table2 = loadTable("T1") + + // query with global options + withSQLConf("spark.paimon.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1)) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) + ) + } + + // query with table options + withSQLConf("spark.paimon.*.*.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1)) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) + ) + } + + // query with both global and table options + withSQLConf( + "spark.paimon.scan.snapshot-id" -> "1", + "spark.paimon.*.*.scan.snapshot-id" -> "2") { + checkAnswer( + sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), + Row(1, 1) :: Row(2, 2) :: Nil) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) :: Row(2) :: Nil + ) + } + + withSQLConf( + "spark.paimon.*.T1.scan.snapshot-id" -> "1", + "spark.paimon.*.T2.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1)) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) + ) + } + + withSQLConf( + "spark.paimon.*.T1.scan.snapshot-id" -> "1", + "spark.paimon.*.T2.scan.snapshot-id" -> "2") { + checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1)) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) + ) + } + + withSQLConf( + "spark.paimon.*.T1.scan.snapshot-id" -> "2", + "spark.paimon.*.T2.scan.snapshot-id" -> "2") { + checkAnswer( + sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), + Row(1, 1) :: Row(2, 2) :: Nil) + checkAnswer( + spark.read + .format("paimon") + .load(table1.location().toString) + .join(spark.read.format("paimon").load(table2.location().toString), "id"), + Row(1) :: Row(2) :: Nil + ) + } + } }