diff --git a/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java b/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java index 0b5512c4cba6..00cf0c072886 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java @@ -57,6 +57,10 @@ public DataType getElementType() { return elementType; } + public DataType newElementType(DataType newElementType) { + return new ArrayType(isNullable(), newElementType); + } + @Override public int defaultSize() { return elementType.defaultSize(); @@ -96,6 +100,21 @@ public boolean equals(Object o) { return elementType.equals(arrayType.elementType); } + @Override + public boolean isPrunedFrom(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + ArrayType arrayType = (ArrayType) o; + return elementType.isPrunedFrom(arrayType.elementType); + } + @Override public int hashCode() { return Objects.hash(super.hashCode(), elementType); diff --git a/paimon-common/src/main/java/org/apache/paimon/types/MapType.java b/paimon-common/src/main/java/org/apache/paimon/types/MapType.java index 238f681ee47d..57cb1a9724b3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/MapType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/MapType.java @@ -74,6 +74,10 @@ public DataType copy(boolean isNullable) { return new MapType(isNullable, keyType.copy(), valueType.copy()); } + public DataType newKeyValueType(DataType newKeyType, DataType newValueType) { + return new MapType(isNullable(), newKeyType, newValueType); + } + @Override public String asSQLString() { return withNullability(FORMAT, keyType.asSQLString(), valueType.asSQLString()); @@ -105,6 +109,21 @@ public boolean equals(Object o) { return keyType.equals(mapType.keyType) && valueType.equals(mapType.valueType); } + @Override + public boolean isPrunedFrom(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + MapType mapType = (MapType) o; + return keyType.isPrunedFrom(mapType.keyType) && valueType.isPrunedFrom(mapType.valueType); + } + @Override public int hashCode() { return Objects.hash(super.hashCode(), keyType, valueType); diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java index af63401b6d37..b31937f90658 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java @@ -122,6 +122,15 @@ public boolean containsField(String fieldName) { return false; } + public boolean containsField(int fieldId) { + for (DataField field : fields) { + if (field.id() == fieldId) { + return true; + } + } + return false; + } + public boolean notContainsField(String fieldName) { return !containsField(fieldName); } @@ -136,6 +145,15 @@ public DataField getField(String fieldName) { throw new RuntimeException("Cannot find field: " + fieldName); } + public DataField getField(int fieldId) { + for (DataField field : fields) { + if (field.id() == fieldId) { + return field; + } + } + throw new RuntimeException("Cannot find field by field id: " + fieldId); + } + public int getFieldIndexByFieldId(int fieldId) { for (int i = 0; i < fields.size(); i++) { if (fields.get(i).id() == fieldId) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index e0894a23c09a..037622f95f1e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -26,7 +26,10 @@ import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import javax.annotation.Nullable; @@ -37,7 +40,6 @@ import java.util.function.Function; import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; -import static org.apache.paimon.utils.Preconditions.checkState; /** Class with index mapping and bulk format. */ public class BulkFormatMapping { @@ -152,27 +154,48 @@ private List readDataFields(TableSchema dataSchema) { .filter(f -> f.id() == dataField.id()) .findFirst() .ifPresent( - f -> { - if (f.type() instanceof RowType) { - RowType tableFieldType = (RowType) f.type(); - RowType dataFieldType = (RowType) dataField.type(); - checkState(tableFieldType.isPrunedFrom(dataFieldType)); - // Since the nested type schema evolution is not supported, - // directly copy the fields from tableField's type to - // dataField's type. - // todo: support nested type schema evolutions. + field -> readDataFields.add( dataField.newType( - dataFieldType.copy( - tableFieldType.getFields()))); - } else { - readDataFields.add(dataField); - } - }); + pruneDataType( + field.type(), dataField.type())))); } return readDataFields; } + private DataType pruneDataType(DataType readType, DataType dataType) { + switch (readType.getTypeRoot()) { + case ROW: + RowType r = (RowType) readType; + RowType d = (RowType) dataType; + ArrayList newFields = new ArrayList<>(); + for (DataField rf : r.getFields()) { + if (d.containsField(rf.id())) { + DataField df = d.getField(rf.id()); + newFields.add(df.newType(pruneDataType(rf.type(), df.type()))); + } + } + return d.copy(newFields); + case MAP: + return ((MapType) dataType) + .newKeyValueType( + pruneDataType( + ((MapType) readType).getKeyType(), + ((MapType) dataType).getKeyType()), + pruneDataType( + ((MapType) readType).getValueType(), + ((MapType) dataType).getValueType())); + case ARRAY: + return ((ArrayType) dataType) + .newElementType( + pruneDataType( + ((ArrayType) readType).getElementType(), + ((ArrayType) dataType).getElementType())); + default: + return dataType; + } + } + private List readFilters( List filters, TableSchema tableSchema, TableSchema dataSchema) { List dataFilters = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index b0715bb5389d..2feebb321222 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -33,7 +33,10 @@ import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReader.RecordIterator; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pool; @@ -45,6 +48,7 @@ import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -55,11 +59,17 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_ELEMENT_NAME; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_NAME; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_KEY_NAME; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_REPEATED_NAME; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_VALUE_NAME; import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.buildFieldsList; import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createColumnReader; import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createWritableColumnVector; @@ -155,13 +165,59 @@ private MessageType clipParquetSchema(GroupType parquetSchema) { ParquetSchemaConverter.convertToParquetType(fieldName, projectedTypes[i]); unknownFieldsIndices.add(i); } else { - types[i] = parquetSchema.getType(fieldName); + Type parquetType = parquetSchema.getType(fieldName); + types[i] = clipParquetType(projectedTypes[i], parquetType); } } return Types.buildMessage().addFields(types).named("paimon-parquet"); } + /** Clips `parquetType` by `readType`. */ + private Type clipParquetType(DataType readType, Type parquetType) { + switch (readType.getTypeRoot()) { + case ROW: + RowType rowType = (RowType) readType; + GroupType rowGroup = (GroupType) parquetType; + List rowGroupFields = new ArrayList<>(); + for (DataField field : rowType.getFields()) { + String fieldName = field.name(); + if (rowGroup.containsField(fieldName)) { + Type type = rowGroup.getType(fieldName); + rowGroupFields.add(clipParquetType(field.type(), type)); + } else { + // todo: support nested field missing + throw new RuntimeException("field " + fieldName + " is missing"); + } + } + return rowGroup.withNewFields(rowGroupFields); + case MAP: + MapType mapType = (MapType) readType; + GroupType mapGroup = (GroupType) parquetType; + GroupType keyValue = mapGroup.getType(MAP_REPEATED_NAME).asGroupType(); + return ConversionPatterns.mapType( + mapGroup.getRepetition(), + mapGroup.getName(), + MAP_REPEATED_NAME, + clipParquetType(mapType.getKeyType(), keyValue.getType(MAP_KEY_NAME)), + keyValue.containsField(MAP_VALUE_NAME) + ? clipParquetType( + mapType.getValueType(), keyValue.getType(MAP_VALUE_NAME)) + : null); + case ARRAY: + ArrayType arrayType = (ArrayType) readType; + GroupType arrayGroup = (GroupType) parquetType; + GroupType list = arrayGroup.getType(LIST_NAME).asGroupType(); + return ConversionPatterns.listOfElements( + arrayGroup.getRepetition(), + arrayGroup.getName(), + clipParquetType( + arrayType.getElementType(), list.getType(LIST_ELEMENT_NAME))); + default: + return parquetType; + } + } + private void checkSchema(MessageType fileSchema, MessageType requestedSchema) throws IOException, UnsupportedOperationException { if (projectedFields.length != requestedSchema.getFieldCount()) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index 345c9944c9ec..43489f9d7f9f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -47,6 +47,9 @@ public class ParquetSchemaConverter { static final String MAP_REPEATED_NAME = "key_value"; + static final String MAP_KEY_NAME = "key"; + static final String MAP_VALUE_NAME = "value"; + static final String LIST_NAME = "list"; static final String LIST_ELEMENT_NAME = "element"; public static MessageType convertToParquetMessageType(String name, RowType rowType) { @@ -149,8 +152,8 @@ private static Type convertToParquetType( repetition, name, MAP_REPEATED_NAME, - convertToParquetType("key", keyType), - convertToParquetType("value", mapType.getValueType())); + convertToParquetType(MAP_KEY_NAME, keyType), + convertToParquetType(MAP_VALUE_NAME, mapType.getValueType())); case MULTISET: MultisetType multisetType = (MultisetType) type; DataType elementType = multisetType.getElementType(); @@ -163,8 +166,8 @@ private static Type convertToParquetType( repetition, name, MAP_REPEATED_NAME, - convertToParquetType("key", elementType), - convertToParquetType("value", new IntType(false))); + convertToParquetType(MAP_KEY_NAME, elementType), + convertToParquetType(MAP_VALUE_NAME, new IntType(false))); case ROW: RowType rowType = (RowType) type; return new GroupType(repetition, name, convertToParquetTypes(rowType)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java index 90abaa992c17..860ec54fa88b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java @@ -370,12 +370,12 @@ private static List getAllColumnDescriptorByType( } public static List buildFieldsList( - List childrens, List fieldNames, MessageColumnIO columnIO) { + List children, List fieldNames, MessageColumnIO columnIO) { List list = new ArrayList<>(); - for (int i = 0; i < childrens.size(); i++) { + for (int i = 0; i < children.size(); i++) { list.add( constructField( - childrens.get(i), lookupColumnByName(columnIO, fieldNames.get(i)))); + children.get(i), lookupColumnByName(columnIO, fieldNames.get(i)))); } return list; } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java index 1d7f80f63a85..8bba676200ce 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java @@ -85,6 +85,42 @@ public static org.apache.paimon.types.DataType toPaimonType(DataType dataType) { return SparkToPaimonTypeVisitor.visit(dataType); } + /** + * Prune Paimon `RowType` by required Spark `StructType`, use this method instead of {@link + * #toPaimonType(DataType)} when need to retain the field id. + */ + public static RowType prunePaimonRowType(StructType requiredStructType, RowType rowType) { + return (RowType) prunePaimonType(requiredStructType, rowType); + } + + private static org.apache.paimon.types.DataType prunePaimonType( + DataType sparkDataType, org.apache.paimon.types.DataType paimonDataType) { + if (sparkDataType instanceof StructType) { + StructType s = (StructType) sparkDataType; + RowType p = (RowType) paimonDataType; + List newFields = new ArrayList<>(); + for (StructField field : s.fields()) { + DataField f = p.getField(field.name()); + newFields.add(f.newType(prunePaimonType(field.dataType(), f.type()))); + } + return p.copy(newFields); + } else if (sparkDataType instanceof org.apache.spark.sql.types.MapType) { + org.apache.spark.sql.types.MapType s = + (org.apache.spark.sql.types.MapType) sparkDataType; + MapType p = (MapType) paimonDataType; + return p.newKeyValueType( + prunePaimonType(s.keyType(), p.getKeyType()), + prunePaimonType(s.valueType(), p.getValueType())); + } else if (sparkDataType instanceof org.apache.spark.sql.types.ArrayType) { + org.apache.spark.sql.types.ArrayType s = + (org.apache.spark.sql.types.ArrayType) sparkDataType; + ArrayType r = (ArrayType) paimonDataType; + return r.newElementType(prunePaimonType(s.elementType(), r.getElementType())); + } else { + return paimonDataType; + } + } + private static class PaimonToSparkTypeVisitor extends DataTypeDefaultVisitor { private static final PaimonToSparkTypeVisitor INSTANCE = new PaimonToSparkTypeVisitor(); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala index c36c2fff2ca9..95c8f4b3a9a8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala @@ -23,38 +23,40 @@ import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.table.Table import org.apache.paimon.table.source.ReadBuilder import org.apache.paimon.types.RowType +import org.apache.paimon.utils.Preconditions.checkState +import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.types.StructType -trait ColumnPruningAndPushDown extends Scan { +trait ColumnPruningAndPushDown extends Scan with Logging { def table: Table def requiredSchema: StructType def filters: Seq[Predicate] def pushDownLimit: Option[Int] = None - val tableRowType: RowType = table.rowType - val tableSchema: StructType = SparkTypeUtils.fromPaimonRowType(tableRowType) + lazy val tableRowType: RowType = table.rowType + lazy val tableSchema: StructType = SparkTypeUtils.fromPaimonRowType(tableRowType) final def partitionType: StructType = { SparkTypeUtils.toSparkPartitionType(table) } - private[paimon] val (requiredTableFields, metadataFields) = { - val nameToField = tableSchema.map(field => (field.name, field)).toMap - val _tableFields = requiredSchema.flatMap(field => nameToField.get(field.name)) - val _metadataFields = - requiredSchema - .filterNot(field => tableSchema.fieldNames.contains(field.name)) - .filter(field => PaimonMetadataColumn.SUPPORTED_METADATA_COLUMNS.contains(field.name)) - (_tableFields, _metadataFields) + private[paimon] val (readTableRowType, metadataFields) = { + checkState( + requiredSchema.fields.forall( + field => + tableRowType.containsField(field.name) || + PaimonMetadataColumn.SUPPORTED_METADATA_COLUMNS.contains(field.name))) + val (_requiredTableFields, _metadataFields) = + requiredSchema.fields.partition(field => tableRowType.containsField(field.name)) + val _readTableRowType = + SparkTypeUtils.prunePaimonRowType(StructType(_requiredTableFields), tableRowType) + (_readTableRowType, _metadataFields) } lazy val readBuilder: ReadBuilder = { - val _readBuilder = table.newReadBuilder() - val projection = - requiredTableFields.map(field => tableSchema.fieldNames.indexOf(field.name)).toArray - _readBuilder.withProjection(projection) + val _readBuilder = table.newReadBuilder().withReadType(readTableRowType) if (filters.nonEmpty) { val pushedPredicate = PredicateBuilder.and(filters: _*) _readBuilder.withFilter(pushedPredicate) @@ -68,6 +70,12 @@ trait ColumnPruningAndPushDown extends Scan { } override def readSchema(): StructType = { - StructType(requiredTableFields ++ metadataFields) + val _readSchema = StructType( + SparkTypeUtils.fromPaimonRowType(readTableRowType).fields ++ metadataFields) + if (!_readSchema.equals(requiredSchema)) { + logInfo( + s"Actual readSchema: ${_readSchema} is not equal to spark pushed requiredSchema: $requiredSchema") + } + _readSchema } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala index 7fbd286d1640..b9d235a9de1d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala @@ -60,7 +60,8 @@ abstract class PaimonBaseScan( private lazy val paimonMetricsRegistry: SparkMetricRegistry = SparkMetricRegistry() lazy val requiredStatsSchema: StructType = { - val fieldNames = requiredTableFields.map(_.name) ++ reservedFilters.flatMap(_.references) + val fieldNames = + readTableRowType.getFields.asScala.map(_.name) ++ reservedFilters.flatMap(_.references) StructType(tableSchema.filter(field => fieldNames.contains(field.name))) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala index 963d9fadd297..15a62f266c56 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala @@ -18,7 +18,6 @@ package org.apache.paimon.spark -import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.stats.ColStats import org.apache.paimon.types.{DataField, DataType, RowType} @@ -65,19 +64,13 @@ case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics { val wholeSchemaSize = getSizeForRow(scan.tableRowType) - val requiredDataSchemaSize = scan.requiredTableFields.map { - field => - val dataField = scan.tableRowType.getField(field.name) - getSizeForField(dataField) - }.sum + val requiredDataSchemaSize = + scan.readTableRowType.getFields.asScala.map(field => getSizeForField(field)).sum val requiredDataSizeInBytes = paimonStats.mergedRecordSize().getAsLong * (requiredDataSchemaSize.toDouble / wholeSchemaSize) - val metadataSchemaSize = scan.metadataFields.map { - field => - val dataField = PaimonMetadataColumn.get(field.name, scan.partitionType).toPaimonDataField - getSizeForField(dataField) - }.sum + val metadataSchemaSize = + scan.metadataColumns.map(field => getSizeForField(field.toPaimonDataField)).sum val metadataSizeInBytes = paimonStats.mergedRecordCount().getAsLong * metadataSchemaSize val sizeInBytes = (requiredDataSizeInBytes + metadataSizeInBytes).toLong diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala index c1814096fb7d..955b29501ad0 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala @@ -218,6 +218,114 @@ class PaimonQueryTest extends PaimonSparkTestBase { } } + test("Paimon Query: query nested cols") { + fileFormats.foreach { + fileFormat => + bucketModes.foreach { + bucketMode => + val bucketProp = if (bucketMode != -1) { + s", 'bucket-key'='name', 'bucket' = '$bucketMode' " + } else { + "" + } + withTable("students") { + sql(s""" + |CREATE TABLE students ( + | name STRING, + | course STRUCT, + | teacher STRUCT>, + | m MAP>, + | l ARRAY>, + | s STRUCT>>>>, + | m2 MAP, STRUCT> + |) USING paimon + |TBLPROPERTIES ('file.format'='$fileFormat' $bucketProp) + |""".stripMargin) + + sql(s""" + |INSERT INTO students VALUES ( + | 'Alice', + | STRUCT('Math', 85.0), + | STRUCT('John', STRUCT('Street 1', 'City 1')), + | MAP('k1', STRUCT('s1', 1, 1.0), 'k2', STRUCT('s11', 11, 11.0)), + | ARRAY(STRUCT('s1', 1, 1.0), STRUCT('s11', 11, 11.0)), + | STRUCT('a', MAP('k1', STRUCT('s1', 1, ARRAY(STRUCT('s1', 1, 1.0))), 'k3', STRUCT('s11', 11, ARRAY(STRUCT('s11', 11, 11.0))))), + | MAP(STRUCT('k1', 1, 1.0), STRUCT('s1', 1, 1.0), STRUCT('k2', 1, 1.0), STRUCT('s11', 11, 11.0))) + |""".stripMargin) + + sql(s""" + |INSERT INTO students VALUES ( + | 'Bob', + | STRUCT('Biology', 92.0), + | STRUCT('Jane', STRUCT('Street 2', 'City 2')), + | MAP('k2', STRUCT('s2', 2, 2.0)), + | ARRAY(STRUCT('s2', 2, 2.0), STRUCT('s22', 22, 22.0)), + | STRUCT('b', MAP('k2', STRUCT('s22', 22, ARRAY(STRUCT('s22', 22, 22.0))))), + | MAP(STRUCT('k2', 2, 2.0), STRUCT('s22', 22, 22.0))) + |""".stripMargin) + + sql(s""" + |INSERT INTO students VALUES ( + | 'Cathy', + | STRUCT('History', 95.0), + | STRUCT('Jane', STRUCT('Street 3', 'City 3')), + | MAP('k1', STRUCT('s3', 3, 3.0), 'k2', STRUCT('s33', 33, 33.0)), + | ARRAY(STRUCT('s3', 3, 3.0)), + | STRUCT('c', MAP('k1', STRUCT('s3', 3, ARRAY(STRUCT('s3', 3, 3.0))), 'k2', STRUCT('s33', 33, ARRAY(STRUCT('s33', 33, 33.0))))), + | MAP(STRUCT('k1', 3, 3.0), STRUCT('s3', 3, 3.0), STRUCT('k2', 3, 3.0), STRUCT('s33', 33, 33.0))) + |""".stripMargin) + + checkAnswer( + sql(s""" + |SELECT + | course.grade, name, teacher.address, course.course_name, + | m['k1'].d, m['k1'].s, + | l[1].d, l[1].s, + | s.s2['k2'].a[0].i, + | map_keys(m2).i + |FROM students ORDER BY name + |""".stripMargin), + Seq( + Row( + 85.0, + "Alice", + Row("Street 1", "City 1"), + "Math", + 1.0, + "s1", + 11.0, + "s11", + null, + Seq(1, 1)), + Row( + 92.0, + "Bob", + Row("Street 2", "City 2"), + "Biology", + null, + null, + 22.0, + "s22", + 22, + Seq(2)), + Row( + 95.0, + "Cathy", + Row("Street 3", "City 3"), + "History", + 3.0, + "s3", + null, + null, + 33, + Seq(3, 3)) + ) + ) + } + } + } + } + private def getAllFiles( tableName: String, partitions: Seq[String],