Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Nov 7, 2024
1 parent 6fb8dee commit 783afb8
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,8 @@ public MergeFileSplitRead withReadType(RowType readType) {
this.outerProjection = projection.outerProjection;
if (pushdownProjection != null) {
RowType pushdownRowType =
tableSchema
.logicalRowType()
.project(
Arrays.stream(pushdownProjection)
.mapToInt(arr -> arr[0])
.toArray());
readType.project(
Arrays.stream(pushdownProjection).mapToInt(arr -> arr[0]).toArray());
readerFactoryBuilder.withReadValueType(pushdownRowType);
mergeSorter.setProjectedValueType(pushdownRowType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,29 @@ public void testCreateAndDropTable() {
innerTest("MyTable6", false, true);
}

@Test
public void testReadNestedColumnTable() {
String tableName = "testAddNestedColumnTable";
spark.sql(
"CREATE TABLE paimon.default."
+ tableName
+ " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: STRING, f2: INT>>) "
+ "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = 'parquet')");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, STRUCT(10, STRUCT('apple', 100)))");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (2, STRUCT(20, STRUCT('banana', 200)))");
assertThat(
spark.sql("SELECT v.f2.f1, k FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
}

private void innerTest(String tableName, boolean hasPk, boolean partitioned) {
String ddlTemplate =
"CREATE TABLE default.%s (\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,108 +219,113 @@ class PaimonQueryTest extends PaimonSparkTestBase {
}

test("Paimon Query: query nested cols") {
fileFormats.foreach {
fileFormat =>
bucketModes.foreach {
bucketMode =>
val bucketProp = if (bucketMode != -1) {
s", 'bucket-key'='name', 'bucket' = '$bucketMode' "
} else {
""
}
withTable("students") {
sql(s"""
|CREATE TABLE students (
| name STRING,
| course STRUCT<course_name: STRING, grade: DOUBLE>,
| teacher STRUCT<name: STRING, address: STRUCT<street: STRING, city: STRING>>,
| m MAP<STRING, STRUCT<s:STRING, i INT, d: DOUBLE>>,
| l ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>,
| s STRUCT<s1: STRING, s2: MAP<STRING, STRUCT<s:STRING, i INT, a: ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>>>>,
| m2 MAP<STRUCT<s:STRING, i INT, d: DOUBLE>, STRUCT<s:STRING, i INT, d: DOUBLE>>
|) USING paimon
|TBLPROPERTIES ('file.format'='$fileFormat' $bucketProp)
|""".stripMargin)

sql(s"""
|INSERT INTO students VALUES (
| 'Alice',
| STRUCT('Math', 85.0),
| STRUCT('John', STRUCT('Street 1', 'City 1')),
| MAP('k1', STRUCT('s1', 1, 1.0), 'k2', STRUCT('s11', 11, 11.0)),
| ARRAY(STRUCT('s1', 1, 1.0), STRUCT('s11', 11, 11.0)),
| STRUCT('a', MAP('k1', STRUCT('s1', 1, ARRAY(STRUCT('s1', 1, 1.0))), 'k3', STRUCT('s11', 11, ARRAY(STRUCT('s11', 11, 11.0))))),
| MAP(STRUCT('k1', 1, 1.0), STRUCT('s1', 1, 1.0), STRUCT('k2', 1, 1.0), STRUCT('s11', 11, 11.0)))
|""".stripMargin)

sql(s"""
|INSERT INTO students VALUES (
| 'Bob',
| STRUCT('Biology', 92.0),
| STRUCT('Jane', STRUCT('Street 2', 'City 2')),
| MAP('k2', STRUCT('s2', 2, 2.0)),
| ARRAY(STRUCT('s2', 2, 2.0), STRUCT('s22', 22, 22.0)),
| STRUCT('b', MAP('k2', STRUCT('s22', 22, ARRAY(STRUCT('s22', 22, 22.0))))),
| MAP(STRUCT('k2', 2, 2.0), STRUCT('s22', 22, 22.0)))
|""".stripMargin)

sql(s"""
|INSERT INTO students VALUES (
| 'Cathy',
| STRUCT('History', 95.0),
| STRUCT('Jane', STRUCT('Street 3', 'City 3')),
| MAP('k1', STRUCT('s3', 3, 3.0), 'k2', STRUCT('s33', 33, 33.0)),
| ARRAY(STRUCT('s3', 3, 3.0)),
| STRUCT('c', MAP('k1', STRUCT('s3', 3, ARRAY(STRUCT('s3', 3, 3.0))), 'k2', STRUCT('s33', 33, ARRAY(STRUCT('s33', 33, 33.0))))),
| MAP(STRUCT('k1', 3, 3.0), STRUCT('s3', 3, 3.0), STRUCT('k2', 3, 3.0), STRUCT('s33', 33, 33.0)))
|""".stripMargin)

checkAnswer(
sql(s"""
|SELECT
| course.grade, name, teacher.address, course.course_name,
| m['k1'].d, m['k1'].s,
| l[1].d, l[1].s,
| s.s2['k2'].a[0].i,
| map_keys(m2).i
|FROM students ORDER BY name
|""".stripMargin),
Seq(
Row(
85.0,
"Alice",
Row("Street 1", "City 1"),
"Math",
1.0,
"s1",
11.0,
"s11",
null,
Seq(1, 1)),
Row(
92.0,
"Bob",
Row("Street 2", "City 2"),
"Biology",
null,
null,
22.0,
"s22",
22,
Seq(2)),
Row(
95.0,
"Cathy",
Row("Street 3", "City 3"),
"History",
3.0,
"s3",
null,
null,
33,
Seq(3, 3))
)
)
withPk.foreach {
hasPk =>
fileFormats.foreach {
fileFormat =>
bucketModes.foreach {
bucketMode =>
val key = if (hasPk) "primary-key" else "bucket-key"
val props = if (bucketMode != -1) {
s", '$key'='name', 'bucket' = '$bucketMode' "
} else {
""
}
withTable("students") {
sql(s"""
|CREATE TABLE students (
| name STRING,
| course STRUCT<course_name: STRING, grade: DOUBLE>,
| teacher STRUCT<name: STRING, address: STRUCT<street: STRING, city: STRING>>,
| m MAP<STRING, STRUCT<s:STRING, i INT, d: DOUBLE>>,
| l ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>,
| s STRUCT<s1: STRING, s2: MAP<STRING, STRUCT<s:STRING, i INT, a: ARRAY<STRUCT<s:STRING, i INT, d: DOUBLE>>>>>,
| m2 MAP<STRUCT<s:STRING, i INT, d: DOUBLE>, STRUCT<s:STRING, i INT, d: DOUBLE>>
|) USING paimon
|TBLPROPERTIES ('file.format'='$fileFormat' $props)
|""".stripMargin)

sql(s"""
|INSERT INTO students VALUES (
| 'Alice',
| STRUCT('Math', 85.0),
| STRUCT('John', STRUCT('Street 1', 'City 1')),
| MAP('k1', STRUCT('s1', 1, 1.0), 'k2', STRUCT('s11', 11, 11.0)),
| ARRAY(STRUCT('s1', 1, 1.0), STRUCT('s11', 11, 11.0)),
| STRUCT('a', MAP('k1', STRUCT('s1', 1, ARRAY(STRUCT('s1', 1, 1.0))), 'k3', STRUCT('s11', 11, ARRAY(STRUCT('s11', 11, 11.0))))),
| MAP(STRUCT('k1', 1, 1.0), STRUCT('s1', 1, 1.0), STRUCT('k2', 1, 1.0), STRUCT('s11', 11, 11.0)))
|""".stripMargin)

sql(
s"""
|INSERT INTO students VALUES (
| 'Bob',
| STRUCT('Biology', 92.0),
| STRUCT('Jane', STRUCT('Street 2', 'City 2')),
| MAP('k2', STRUCT('s2', 2, 2.0)),
| ARRAY(STRUCT('s2', 2, 2.0), STRUCT('s22', 22, 22.0)),
| STRUCT('b', MAP('k2', STRUCT('s22', 22, ARRAY(STRUCT('s22', 22, 22.0))))),
| MAP(STRUCT('k2', 2, 2.0), STRUCT('s22', 22, 22.0)))
|""".stripMargin)

sql(s"""
|INSERT INTO students VALUES (
| 'Cathy',
| STRUCT('History', 95.0),
| STRUCT('Jane', STRUCT('Street 3', 'City 3')),
| MAP('k1', STRUCT('s3', 3, 3.0), 'k2', STRUCT('s33', 33, 33.0)),
| ARRAY(STRUCT('s3', 3, 3.0)),
| STRUCT('c', MAP('k1', STRUCT('s3', 3, ARRAY(STRUCT('s3', 3, 3.0))), 'k2', STRUCT('s33', 33, ARRAY(STRUCT('s33', 33, 33.0))))),
| MAP(STRUCT('k1', 3, 3.0), STRUCT('s3', 3, 3.0), STRUCT('k2', 3, 3.0), STRUCT('s33', 33, 33.0)))
|""".stripMargin)

checkAnswer(
sql(s"""
|SELECT
| course.grade, name, teacher.address, course.course_name,
| m['k1'].d, m['k1'].s,
| l[1].d, l[1].s,
| s.s2['k2'].a[0].i,
| map_keys(m2).i
|FROM students ORDER BY name
|""".stripMargin),
Seq(
Row(
85.0,
"Alice",
Row("Street 1", "City 1"),
"Math",
1.0,
"s1",
11.0,
"s11",
null,
Seq(1, 1)),
Row(
92.0,
"Bob",
Row("Street 2", "City 2"),
"Biology",
null,
null,
22.0,
"s22",
22,
Seq(2)),
Row(
95.0,
"Cathy",
Row("Street 3", "City 3"),
"History",
3.0,
"s3",
null,
null,
33,
Seq(3, 3))
)
)
}
}
}
}
Expand Down

0 comments on commit 783afb8

Please sign in to comment.