From 6435dd2053f7b094a33c0358615d7a3ddf1c4e92 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Tue, 30 Jul 2024 19:39:05 +0800 Subject: [PATCH] [core][spark] check column nullability when write (#3842) --- .../table/AppendOnlyFileStoreTable.java | 1 + .../table/PrimaryKeyFileStoreTable.java | 1 + .../paimon/table/sink/TableWriteImpl.java | 27 ++++++++ .../catalyst/analysis/PaimonAnalysis.scala | 20 ++---- .../apache/paimon/spark/sql/DDLTestBase.scala | 64 +++++++++++++++---- 5 files changed, 84 insertions(+), 29 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 40eeb4d28789..0af78a5dac8b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -139,6 +139,7 @@ public TableWriteImpl newWrite( AppendOnlyFileStoreWrite writer = store().newWrite(commitUser, manifestFilter).withBucketMode(bucketMode()); return new TableWriteImpl<>( + rowType(), writer, createRowKeyExtractor(), (record, rowKind) -> { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 6ac2763ace66..b1e5b5366c3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -160,6 +160,7 @@ public TableWriteImpl newWrite( String commitUser, ManifestCacheFilter manifestFilter) { KeyValue kv = new KeyValue(); return new TableWriteImpl<>( + rowType(), store().newWrite(commitUser, manifestFilter), createRowKeyExtractor(), (record, rowKind) -> diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 6e2194646d2a..580d7f4c4f6e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -30,13 +30,16 @@ import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.operation.FileStoreWrite.State; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Restorable; import javax.annotation.Nullable; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkState; @@ -47,6 +50,7 @@ */ public class TableWriteImpl implements InnerTableWrite, Restorable>> { + private final RowType rowType; private final FileStoreWrite write; private final KeyAndBucketExtractor keyAndBucketExtractor; private final RecordExtractor recordExtractor; @@ -56,17 +60,28 @@ public class TableWriteImpl implements InnerTableWrite, Restorable write, KeyAndBucketExtractor keyAndBucketExtractor, RecordExtractor recordExtractor, @Nullable RowKindGenerator rowKindGenerator, boolean ignoreDelete) { + this.rowType = rowType; this.write = write; this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; this.rowKindGenerator = rowKindGenerator; this.ignoreDelete = ignoreDelete; + + List notNullColumnNames = + rowType.getFields().stream() + .filter(field -> !field.type().isNullable()) + .map(DataField::name) + .collect(Collectors.toList()); + this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames); } @Override @@ -137,6 +152,7 @@ public void write(InternalRow row, int bucket) throws Exception { @Nullable public SinkRecord writeAndReturn(InternalRow row) throws Exception { + checkNullability(row); RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); if (ignoreDelete && rowKind.isRetract()) { return null; @@ -148,6 +164,7 @@ public SinkRecord writeAndReturn(InternalRow row) throws Exception { @Nullable public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { + checkNullability(row); RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); if (ignoreDelete && rowKind.isRetract()) { return null; @@ -157,6 +174,16 @@ public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { return record; } + private void checkNullability(InternalRow row) { + for (int idx : notNullFieldIndex) { + if (row.isNullAt(idx)) { + String columnName = rowType.getFields().get(idx).name(); + throw new RuntimeException( + String.format("Cannot write null to non-null column(%s)", columnName)); + } + } + } + private SinkRecord toSinkRecord(InternalRow row) { keyAndBucketExtractor.setRecord(row); return new SinkRecord( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala index 67685612664d..3dc0e40c9eff 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala @@ -26,11 +26,11 @@ import org.apache.paimon.table.FileStoreTable import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.ResolvedTable -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} import scala.collection.JavaConverters._ @@ -58,8 +58,8 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { } private def schemaCompatible( - tableSchema: StructType, dataSchema: StructType, + tableSchema: StructType, partitionCols: Seq[String], parent: Array[String] = Array.empty): Boolean = { @@ -82,9 +82,8 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { } } - tableSchema.zip(dataSchema).forall { + dataSchema.zip(tableSchema).forall { case (f1, f2) => - checkNullability(f1, f2, partitionCols, parent) f1.name == f2.name && dataTypeCompatible(f1.name, f1.dataType, f2.dataType) } } @@ -115,17 +114,6 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { cast.setTagValue(Compatibility.castByTableInsertionTag, ()) cast } - - private def checkNullability( - input: StructField, - expected: StructField, - partitionCols: Seq[String], - parent: Array[String] = Array.empty): Unit = { - val fullColumnName = (parent ++ Array(input.name)).mkString(".") - if (!partitionCols.contains(fullColumnName) && input.nullable && !expected.nullable) { - throw new RuntimeException("Cannot write nullable values to non-null column") - } - } } case class PaimonPostHocResolutionRules(session: SparkSession) extends Rule[LogicalPlan] { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index da40171042a1..db749a63619b 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -23,6 +23,7 @@ import org.apache.paimon.schema.Schema import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.types.DataTypes +import org.apache.spark.SparkException import org.apache.spark.sql.Row import org.junit.jupiter.api.Assertions @@ -33,33 +34,70 @@ abstract class DDLTestBase extends PaimonSparkTestBase { import testImplicits._ - test("Paimon DDL: create table with not null") { + test("Paimon DDL: create append table with not null") { withTable("T") { - sql(""" - |CREATE TABLE T (id INT NOT NULL, name STRING) - |""".stripMargin) + sql("CREATE TABLE T (id INT NOT NULL, name STRING)") - val exception = intercept[RuntimeException] { - sql(""" - |INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c") - |""".stripMargin) + val e1 = intercept[SparkException] { + sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")""") } - Assertions.assertTrue( - exception.getMessage().contains("Cannot write nullable values to non-null column")) + Assertions.assertTrue(e1.getMessage().contains("Cannot write null to non-null column")) + + sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)""") + checkAnswer( + sql("SELECT * FROM T ORDER BY id"), + Seq((1, "a"), (2, "b"), (3, null)).toDF() + ) + val schema = spark.table("T").schema + Assertions.assertEquals(schema.size, 2) + Assertions.assertFalse(schema("id").nullable) + Assertions.assertTrue(schema("name").nullable) + } + } + test("Paimon DDL: create primary-key table with not null") { + withTable("T") { sql(""" - |INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null) + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ('primary-key' = 'id,pt') |""".stripMargin) + val e1 = intercept[SparkException] { + sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", null)""") + } + Assertions.assertTrue(e1.getMessage().contains("Cannot write null to non-null column")) + + val e2 = intercept[SparkException] { + sql("""INSERT INTO T VALUES (1, "a", "pt1"), (null, "b", "pt2")""") + } + Assertions.assertTrue(e2.getMessage().contains("Cannot write null to non-null column")) + + sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", "pt1"), (3, null, "pt2")""") checkAnswer( sql("SELECT * FROM T ORDER BY id"), - Seq((1, "a"), (2, "b"), (3, null)).toDF() + Seq((1, "a", "pt1"), (2, "b", "pt1"), (3, null, "pt2")).toDF() ) val schema = spark.table("T").schema - Assertions.assertEquals(schema.size, 2) + Assertions.assertEquals(schema.size, 3) Assertions.assertFalse(schema("id").nullable) Assertions.assertTrue(schema("name").nullable) + Assertions.assertFalse(schema("pt").nullable) + } + } + + test("Paimon DDL: write nullable expression to non-null column") { + withTable("T") { + sql(""" + |CREATE TABLE T (id INT NOT NULL, ts TIMESTAMP NOT NULL) + |""".stripMargin) + + sql("INSERT INTO T SELECT 1, TO_TIMESTAMP('2024-07-01 16:00:00')") + + checkAnswer( + sql("SELECT * FROM T ORDER BY id"), + Row(1, Timestamp.valueOf("2024-07-01 16:00:00")) :: Nil + ) } }