Skip to content

Commit

Permalink
[spark] Support table dynamic options via SQL conf for Spark Engine
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf committed Oct 31, 2024
1 parent 74a7783 commit fa99bea
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 22 deletions.
17 changes: 16 additions & 1 deletion docs/content/spark/auxiliary.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ under the License.
## Set / Reset
The SET command sets a property, returns the value of an existing property or returns all SQLConf properties with value and meaning.
The RESET command resets runtime configurations specific to the current session which were set via the SET command to their default values.
To set paimon configs specifically, you need add the `spark.paimon.` prefix.
To set paimon configs specifically, you need add the `spark.paimon.` prefix. You can also set table specific options at this format:
`paimon.spark.${dbName}.${tableName}.${config_key}`. The dbName/tableName can be `*`, which means matching all the specific parts. Table
options will override global options if there are conflicts.

```sql
-- set spark conf
Expand All @@ -40,6 +42,19 @@ SET spark.paimon.file.block-size=512M;

-- reset conf
RESET spark.paimon.file.block-size;

-- set scan.snapshot-id=1 for the table default.T
SET paimon.spark.default.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=1 for the table T in any databases
SET paimon.spark.*.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=2 for the table default.T1 and scan.snapshot-id=1 on default.T2
SET paimon.spark.scan.snapshot-id=1;
SET paimon.spark.default.T1.scan.snapshot-id=2;
SELECT * FROM default.T1 JOIN default.T2 ON xxxx;
```

## Describe table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
+ formatTable.format().name());
}
} else {
return new SparkTable(copyWithSQLConf(paimonTable, extraOptions));
return new SparkTable(
copyWithSQLConf(paimonTable, toIdentifier(ident), extraOptions));
}
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.paimon.spark

import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.{CatalogContext, CatalogUtils, Identifier}
import org.apache.paimon.options.Options
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.sources.PaimonSink
import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf
import org.apache.paimon.spark.util.OptionUtils.mergeSQLConfWithIdentifier
import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory}
import org.apache.paimon.table.system.AuditLogTable

Expand Down Expand Up @@ -80,9 +81,14 @@ class SparkSource
}

private def loadTable(options: JMap[String, String]): DataTable = {
val path = CoreOptions.path(options)
val catalogContext = CatalogContext.create(
Options.fromMap(mergeSQLConf(options)),
SparkSession.active.sessionState.newHadoopConf())
Options.fromMap(
mergeSQLConfWithIdentifier(
options,
Identifier.create(CatalogUtils.database(path), CatalogUtils.table(path)))),
SparkSession.active.sessionState.newHadoopConf()
)
val table = FileStoreTableFactory.create(catalogContext)
if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
new AuditLogTable(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,56 @@

package org.apache.paimon.spark.util

import org.apache.paimon.catalog.Identifier
import org.apache.paimon.table.Table

import org.apache.spark.sql.catalyst.SQLConfHelper

import java.util.{HashMap => JHashMap, Map => JMap}

import scala.collection.JavaConverters._
import java.util.regex.Pattern

object OptionUtils extends SQLConfHelper {

private val PAIMON_OPTION_PREFIX = "spark.paimon."

def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, String] = {
val mergedOptions = new JHashMap[String, String](
conf.getAllConfs
.filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
.map {
case (key, value) =>
key.stripPrefix(PAIMON_OPTION_PREFIX) -> value
}
.toMap
.asJava)
mergedOptions.putAll(extraOptions)
mergedOptions
def mergeSQLConfWithIdentifier(
extraOptions: JMap[String, String],
ident: Identifier): JMap[String, String] = {

val tableOptionsTemplate = String.format(
"(%s)(%s|\\*)\\.(%s|\\*)\\.(.+)",
PAIMON_OPTION_PREFIX,
ident.getDatabaseName,
ident.getObjectName)
val tableOptionsPattern = Pattern.compile(tableOptionsTemplate)
val globalOptions = new JHashMap[String, String]
val tableOptions = new JHashMap[String, String]

conf.getAllConfs
.filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
.foreach {
case (key, value) =>
val matcher = tableOptionsPattern.matcher(key)
if (matcher.find) {
tableOptions.put(matcher.group(4), value)
} else {
globalOptions.put(key.stripPrefix(PAIMON_OPTION_PREFIX), value)
}
}

// table options should override global options for the same key
globalOptions.putAll(tableOptions)
globalOptions.putAll(extraOptions)
globalOptions
}

def copyWithSQLConf[T <: Table](table: T, extraOptions: JMap[String, String]): T = {
val mergedOptions = mergeSQLConf(extraOptions)
def copyWithSQLConf[T <: Table](
table: T,
ident: Identifier,
extraOptions: JMap[String, String]): T = {
val mergedOptions: JMap[String, String] =
mergeSQLConfWithIdentifier(extraOptions, ident)

if (mergedOptions.isEmpty) {
table
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,130 @@ class PaimonOptionTest extends PaimonSparkTestBase {
}
}
}

test("Paimon Table Options: query one table with sql conf and table options") {
sql("CREATE TABLE T (id INT)")
sql("INSERT INTO T VALUES 1")
sql("INSERT INTO T VALUES 2")
checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil)
val table = loadTable("T")

// query with global options
withSQLConf("spark.paimon.scan.snapshot-id" -> "1") {
checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1))
checkAnswer(spark.read.format("paimon").load(table.location().toString), Row(1))
}

// query with table options
withSQLConf("spark.paimon.*.T.scan.snapshot-id" -> "1") {
checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1))
checkAnswer(spark.read.format("paimon").load(table.location().toString), Row(1))
}

// query with both global and table options
withSQLConf(
"spark.paimon.scan.snapshot-id" -> "1",
"spark.paimon.*.T.scan.snapshot-id" -> "2") {
checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil)
checkAnswer(
spark.read.format("paimon").load(table.location().toString),
Row(1) :: Row(2) :: Nil)
}
}

test("Paimon Table Options: query multiple tables with sql conf and table options") {
sql("CREATE TABLE T1 (id INT)")
sql("INSERT INTO T1 VALUES 1")
sql("INSERT INTO T1 VALUES 2")

sql("CREATE TABLE T2 (id INT)")
sql("INSERT INTO T2 VALUES 1")
sql("INSERT INTO T2 VALUES 2")
checkAnswer(
sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"),
Row(1, 1) :: Row(2, 2) :: Nil)
val table1 = loadTable("T1")
val table2 = loadTable("T1")

// query with global options
withSQLConf("spark.paimon.scan.snapshot-id" -> "1") {
checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1))
checkAnswer(
spark.read
.format("paimon")
.load(table1.location().toString)
.join(spark.read.format("paimon").load(table2.location().toString), "id"),
Row(1)
)
}

// query with table options
withSQLConf("spark.paimon.*.*.scan.snapshot-id" -> "1") {
checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1))
checkAnswer(
spark.read
.format("paimon")
.load(table1.location().toString)
.join(spark.read.format("paimon").load(table2.location().toString), "id"),
Row(1)
)
}

// query with both global and table options
withSQLConf(
"spark.paimon.scan.snapshot-id" -> "1",
"spark.paimon.*.*.scan.snapshot-id" -> "2") {
checkAnswer(
sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"),
Row(1, 1) :: Row(2, 2) :: Nil)
checkAnswer(
spark.read
.format("paimon")
.load(table1.location().toString)
.join(spark.read.format("paimon").load(table2.location().toString), "id"),
Row(1) :: Row(2) :: Nil
)
}

withSQLConf(
"spark.paimon.*.T1.scan.snapshot-id" -> "1",
"spark.paimon.*.T2.scan.snapshot-id" -> "1") {
checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1))
checkAnswer(
spark.read
.format("paimon")
.load(table1.location().toString)
.join(spark.read.format("paimon").load(table2.location().toString), "id"),
Row(1)
)
}

withSQLConf(
"spark.paimon.*.T1.scan.snapshot-id" -> "1",
"spark.paimon.*.T2.scan.snapshot-id" -> "2") {
checkAnswer(sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"), Row(1, 1))
checkAnswer(
spark.read
.format("paimon")
.load(table1.location().toString)
.join(spark.read.format("paimon").load(table2.location().toString), "id"),
Row(1)
)
}

withSQLConf(
"spark.paimon.*.T1.scan.snapshot-id" -> "2",
"spark.paimon.*.T2.scan.snapshot-id" -> "2") {
checkAnswer(
sql("SELECT * FROM T1 join T2 on T1.id = T2.id ORDER BY T1.id"),
Row(1, 1) :: Row(2, 2) :: Nil)
checkAnswer(
spark.read
.format("paimon")
.load(table1.location().toString)
.join(spark.read.format("paimon").load(table2.location().toString), "id"),
Row(1) :: Row(2) :: Nil
)
}
}
}

0 comments on commit fa99bea

Please sign in to comment.