Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] check column nullability when write #3842

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>write.merge-schema</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, merge the data schema and the table schema automatically before write data.</td>
</tr>
<tr>
<td><h5>write.merge-schema.explicit-cast</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, allow to merge data types if the two types meet the rules for explicit casting.</td>
</tr>
<tr>
<td><h5>write.check.ignoreNullability</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, skip the check whether the nullability of incoming data is compatible with the table's.</td>
</tr>
<tr>
<td><h5>read.changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -62,17 +80,5 @@
<td>Long</td>
<td>The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.</td>
</tr>
<tr>
<td><h5>write.merge-schema</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, merge the data schema and the table schema automatically before write data.</td>
</tr>
<tr>
<td><h5>write.merge-schema.explicit-cast</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, allow to merge data types if the two types meet the rules for explicit casting.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public class SparkConnectorOptions {
.withDescription(
"If true, allow to merge data types if the two types meet the rules for explicit casting.");

public static final ConfigOption<Boolean> IGNORE_NULLABLE_CHECK =
key("write.check.ignoreNullability")
.booleanType()
.defaultValue(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just be true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to have this check by default to guarantee the nullability consistency.

And, Delta has constraint feature (eg: NOT NULL) which can check the incoming data when write. When paimon has this similar feature, we can remove this config and do not check nullability any more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this option affect compilation? I thinking about checking for null directly at runtime, but checking at compile time is too difficult to use.

.withDescription(
"If true, skip the check whether the nullability of incoming data is compatible with the table's.");

public static final ConfigOption<Integer> MAX_FILES_PER_TRIGGER =
key("read.stream.maxFilesPerTrigger")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.{SparkConnectorOptions, SparkTable}
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
import org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand, PaimonDynamicPartitionOverwriteCommand, PaimonTruncateTableCommand}
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
Expand All @@ -42,7 +42,8 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
if !schemaCompatible(
a.query.output.toStructType,
table.output.toStructType,
paimonTable.partitionKeys().asScala) =>
paimonTable.partitionKeys().asScala,
ignoreNullabilityCheck(paimonTable)) =>
val newQuery = resolveQueryColumns(a.query, table.output)
if (newQuery != a.query) {
Compatibility.withNewQuery(a, newQuery)
Expand All @@ -58,9 +59,10 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
}

private def schemaCompatible(
tableSchema: StructType,
dataSchema: StructType,
tableSchema: StructType,
partitionCols: Seq[String],
ignoreNullabilityCheck: Boolean = false,
parent: Array[String] = Array.empty): Boolean = {

if (tableSchema.size != dataSchema.size) {
Expand All @@ -70,7 +72,7 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
def dataTypeCompatible(column: String, dt1: DataType, dt2: DataType): Boolean = {
(dt1, dt2) match {
case (s1: StructType, s2: StructType) =>
schemaCompatible(s1, s2, partitionCols, Array(column))
schemaCompatible(s1, s2, partitionCols, ignoreNullabilityCheck, Array(column))
case (a1: ArrayType, a2: ArrayType) =>
dataTypeCompatible(column, a1.elementType, a2.elementType)
case (m1: MapType, m2: MapType) =>
Expand All @@ -82,9 +84,11 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
}
}

tableSchema.zip(dataSchema).forall {
dataSchema.zip(tableSchema).forall {
case (f1, f2) =>
checkNullability(f1, f2, partitionCols, parent)
if (!ignoreNullabilityCheck) {
checkNullability(f1, f2, partitionCols, parent)
}
f1.name == f2.name && dataTypeCompatible(f1.name, f1.dataType, f2.dataType)
}
}
Expand Down Expand Up @@ -126,6 +130,15 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
throw new RuntimeException("Cannot write nullable values to non-null column")
}
}

private def ignoreNullabilityCheck(paimonTable: FileStoreTable): Boolean = {
paimonTable
.options()
.asScala
.get(SparkConnectorOptions.IGNORE_NULLABLE_CHECK.key)
.map(_.toBoolean)
.getOrElse(SparkConnectorOptions.IGNORE_NULLABLE_CHECK.defaultValue)
}
}

case class PaimonPostHocResolutionRules(session: SparkSession) extends Rule[LogicalPlan] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}

test("Paimon DDL: write.check.ignoreNullability") {
withTable("T") {
sql("""
|CREATE TABLE T (id INT NOT NULL, ts TIMESTAMP NOT NULL)
|TBLPROPERTIES ("write.check.ignoreNullability" = "true")
|""".stripMargin)

sql("INSERT INTO T SELECT 1, TO_TIMESTAMP('2024-07-01 16:00:00')")

checkAnswer(
sql("SELECT * FROM T ORDER BY id"),
Row(1, Timestamp.valueOf("2024-07-01 16:00:00")) :: Nil
)
}
}

test("Paimon DDL: Create Table As Select") {
withTable("source", "t1", "t2") {
Seq((1L, "x1", "2023"), (2L, "x2", "2023"))
Expand Down
Loading