From 32d05f31e1aae54e7d7504fc5f66cbde4a111a91 Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Fri, 8 Nov 2024 10:37:15 +0800 Subject: [PATCH] [core][spark] Fix read nested column with pk table (#4476) This closes #4476. --- .../paimon/operation/MergeFileSplitRead.java | 26 ++- .../apache/paimon/spark/SparkReadITCase.java | 27 +++ .../paimon/spark/sql/PaimonQueryTest.scala | 209 +++++++++--------- 3 files changed, 149 insertions(+), 113 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java index c21c3683ce08..23a3a576e4a6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java @@ -44,6 +44,7 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.Projection; @@ -131,11 +132,9 @@ public MergeFileSplitRead withReadKeyType(RowType readKeyType) { @Override public MergeFileSplitRead withReadType(RowType readType) { // todo: replace projectedFields with readType + RowType tableRowType = tableSchema.logicalRowType(); int[][] projectedFields = - Arrays.stream( - tableSchema - .logicalRowType() - .getFieldIndices(readType.getFieldNames())) + Arrays.stream(tableRowType.getFieldIndices(readType.getFieldNames())) .mapToObj(i -> new int[] {i}) .toArray(int[][]::new); int[][] newProjectedFields = projectedFields; @@ -161,13 +160,18 @@ public MergeFileSplitRead withReadType(RowType readType) { this.pushdownProjection = projection.pushdownProjection; this.outerProjection = projection.outerProjection; if (pushdownProjection != null) { - RowType pushdownRowType = - tableSchema - .logicalRowType() - .project( - Arrays.stream(pushdownProjection) - .mapToInt(arr -> arr[0]) - .toArray()); + List tableFields = tableRowType.getFields(); + List readFields = readType.getFields(); + List finalReadFields = new ArrayList<>(); + for (int i : Arrays.stream(pushdownProjection).mapToInt(arr -> arr[0]).toArray()) { + DataField requiredField = tableFields.get(i); + finalReadFields.add( + readFields.stream() + .filter(x -> x.name().equals(requiredField.name())) + .findFirst() + .orElse(requiredField)); + } + RowType pushdownRowType = new RowType(finalReadFields); readerFactoryBuilder.withReadValueType(pushdownRowType); mergeSorter.setProjectedValueType(pushdownRowType); } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index be6264f7b2d0..32c3498a7cc9 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -440,6 +440,33 @@ public void testCreateAndDropTable() { innerTest("MyTable6", false, true); } + @Test + public void testReadNestedColumnTable() { + String tableName = "testAddNestedColumnTable"; + spark.sql( + "CREATE TABLE paimon.default." + + tableName + + " (k INT NOT NULL, v STRUCT>) " + + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = 'parquet')"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (1, STRUCT(10, STRUCT('apple', 100)))"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (2, STRUCT(20, STRUCT('banana', 200)))"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (1, STRUCT(30, STRUCT('cat', 100)))"); + assertThat( + spark.sql("SELECT v.f2.f1, k FROM paimon.default." + tableName) + .collectAsList().stream() + .map(Row::toString)) + .containsExactlyInAnyOrder("[cat,1]", "[banana,2]"); + } + private void innerTest(String tableName, boolean hasPk, boolean partitioned) { String ddlTemplate = "CREATE TABLE default.%s (\n" diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala index b08b342ca503..fc2f9ac0c7d0 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala @@ -219,108 +219,113 @@ class PaimonQueryTest extends PaimonSparkTestBase { } test("Paimon Query: query nested cols") { - fileFormats.foreach { - fileFormat => - bucketModes.foreach { - bucketMode => - val bucketProp = if (bucketMode != -1) { - s", 'bucket-key'='name', 'bucket' = '$bucketMode' " - } else { - "" - } - withTable("students") { - sql(s""" - |CREATE TABLE students ( - | name STRING, - | course STRUCT, - | teacher STRUCT>, - | m MAP>, - | l ARRAY>, - | s STRUCT>>>>, - | m2 MAP, STRUCT> - |) USING paimon - |TBLPROPERTIES ('file.format'='$fileFormat' $bucketProp) - |""".stripMargin) - - sql(s""" - |INSERT INTO students VALUES ( - | 'Alice', - | STRUCT('Math', 85.0), - | STRUCT('John', STRUCT('Street 1', 'City 1')), - | MAP('k1', STRUCT('s1', 1, 1.0), 'k2', STRUCT('s11', 11, 11.0)), - | ARRAY(STRUCT('s1', 1, 1.0), STRUCT('s11', 11, 11.0)), - | STRUCT('a', MAP('k1', STRUCT('s1', 1, ARRAY(STRUCT('s1', 1, 1.0))), 'k3', STRUCT('s11', 11, ARRAY(STRUCT('s11', 11, 11.0))))), - | MAP(STRUCT('k1', 1, 1.0), STRUCT('s1', 1, 1.0), STRUCT('k2', 1, 1.0), STRUCT('s11', 11, 11.0))) - |""".stripMargin) - - sql(s""" - |INSERT INTO students VALUES ( - | 'Bob', - | STRUCT('Biology', 92.0), - | STRUCT('Jane', STRUCT('Street 2', 'City 2')), - | MAP('k2', STRUCT('s2', 2, 2.0)), - | ARRAY(STRUCT('s2', 2, 2.0), STRUCT('s22', 22, 22.0)), - | STRUCT('b', MAP('k2', STRUCT('s22', 22, ARRAY(STRUCT('s22', 22, 22.0))))), - | MAP(STRUCT('k2', 2, 2.0), STRUCT('s22', 22, 22.0))) - |""".stripMargin) - - sql(s""" - |INSERT INTO students VALUES ( - | 'Cathy', - | STRUCT('History', 95.0), - | STRUCT('Jane', STRUCT('Street 3', 'City 3')), - | MAP('k1', STRUCT('s3', 3, 3.0), 'k2', STRUCT('s33', 33, 33.0)), - | ARRAY(STRUCT('s3', 3, 3.0)), - | STRUCT('c', MAP('k1', STRUCT('s3', 3, ARRAY(STRUCT('s3', 3, 3.0))), 'k2', STRUCT('s33', 33, ARRAY(STRUCT('s33', 33, 33.0))))), - | MAP(STRUCT('k1', 3, 3.0), STRUCT('s3', 3, 3.0), STRUCT('k2', 3, 3.0), STRUCT('s33', 33, 33.0))) - |""".stripMargin) - - checkAnswer( - sql(s""" - |SELECT - | course.grade, name, teacher.address, course.course_name, - | m['k1'].d, m['k1'].s, - | l[1].d, l[1].s, - | s.s2['k2'].a[0].i, - | map_keys(m2).i - |FROM students ORDER BY name - |""".stripMargin), - Seq( - Row( - 85.0, - "Alice", - Row("Street 1", "City 1"), - "Math", - 1.0, - "s1", - 11.0, - "s11", - null, - Seq(1, 1)), - Row( - 92.0, - "Bob", - Row("Street 2", "City 2"), - "Biology", - null, - null, - 22.0, - "s22", - 22, - Seq(2)), - Row( - 95.0, - "Cathy", - Row("Street 3", "City 3"), - "History", - 3.0, - "s3", - null, - null, - 33, - Seq(3, 3)) - ) - ) + withPk.foreach { + hasPk => + fileFormats.foreach { + fileFormat => + bucketModes.foreach { + bucketMode => + val key = if (hasPk) "primary-key" else "bucket-key" + val props = if (bucketMode != -1) { + s", '$key'='name', 'bucket' = '$bucketMode' " + } else { + "" + } + withTable("students") { + sql(s""" + |CREATE TABLE students ( + | name STRING, + | course STRUCT, + | teacher STRUCT>, + | m MAP>, + | l ARRAY>, + | s STRUCT>>>>, + | m2 MAP, STRUCT> + |) USING paimon + |TBLPROPERTIES ('file.format'='$fileFormat' $props) + |""".stripMargin) + + sql(s""" + |INSERT INTO students VALUES ( + | 'Alice', + | STRUCT('Math', 85.0), + | STRUCT('John', STRUCT('Street 1', 'City 1')), + | MAP('k1', STRUCT('s1', 1, 1.0), 'k2', STRUCT('s11', 11, 11.0)), + | ARRAY(STRUCT('s1', 1, 1.0), STRUCT('s11', 11, 11.0)), + | STRUCT('a', MAP('k1', STRUCT('s1', 1, ARRAY(STRUCT('s1', 1, 1.0))), 'k3', STRUCT('s11', 11, ARRAY(STRUCT('s11', 11, 11.0))))), + | MAP(STRUCT('k1', 1, 1.0), STRUCT('s1', 1, 1.0), STRUCT('k2', 1, 1.0), STRUCT('s11', 11, 11.0))) + |""".stripMargin) + + sql( + s""" + |INSERT INTO students VALUES ( + | 'Bob', + | STRUCT('Biology', 92.0), + | STRUCT('Jane', STRUCT('Street 2', 'City 2')), + | MAP('k2', STRUCT('s2', 2, 2.0)), + | ARRAY(STRUCT('s2', 2, 2.0), STRUCT('s22', 22, 22.0)), + | STRUCT('b', MAP('k2', STRUCT('s22', 22, ARRAY(STRUCT('s22', 22, 22.0))))), + | MAP(STRUCT('k2', 2, 2.0), STRUCT('s22', 22, 22.0))) + |""".stripMargin) + + sql(s""" + |INSERT INTO students VALUES ( + | 'Cathy', + | STRUCT('History', 95.0), + | STRUCT('Jane', STRUCT('Street 3', 'City 3')), + | MAP('k1', STRUCT('s3', 3, 3.0), 'k2', STRUCT('s33', 33, 33.0)), + | ARRAY(STRUCT('s3', 3, 3.0)), + | STRUCT('c', MAP('k1', STRUCT('s3', 3, ARRAY(STRUCT('s3', 3, 3.0))), 'k2', STRUCT('s33', 33, ARRAY(STRUCT('s33', 33, 33.0))))), + | MAP(STRUCT('k1', 3, 3.0), STRUCT('s3', 3, 3.0), STRUCT('k2', 3, 3.0), STRUCT('s33', 33, 33.0))) + |""".stripMargin) + + checkAnswer( + sql(s""" + |SELECT + | course.grade, name, teacher.address, course.course_name, + | m['k1'].d, m['k1'].s, + | l[1].d, l[1].s, + | s.s2['k2'].a[0].i, + | map_keys(m2).i + |FROM students ORDER BY name + |""".stripMargin), + Seq( + Row( + 85.0, + "Alice", + Row("Street 1", "City 1"), + "Math", + 1.0, + "s1", + 11.0, + "s11", + null, + Seq(1, 1)), + Row( + 92.0, + "Bob", + Row("Street 2", "City 2"), + "Biology", + null, + null, + 22.0, + "s22", + 22, + Seq(2)), + Row( + 95.0, + "Cathy", + Row("Street 3", "City 3"), + "History", + 3.0, + "s3", + null, + null, + 33, + Seq(3, 3)) + ) + ) + } } } }