From dcd758af6e58579cc97e1d743e1efca5acf21682 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 23 Dec 2024 16:00:13 +0800 Subject: [PATCH] v1 --- .../arrow/ArrowFieldTypeConversion.java | 6 ++ .../Arrow2PaimonVectorConverter.java | 6 ++ .../ArrowFieldWriterFactoryVisitor.java | 6 ++ .../apache/paimon/codegen/GenerateUtils.scala | 4 ++ .../codegen/EqualiserCodeGeneratorTest.java | 8 +++ .../org/apache/paimon/PartitionSettedRow.java | 8 +++ .../apache/paimon/casting/CastedArray.java | 6 ++ .../org/apache/paimon/casting/CastedRow.java | 6 ++ .../paimon/casting/DefaultValueRow.java | 9 +++ .../paimon/data/AbstractBinaryWriter.java | 18 +++++ .../org/apache/paimon/data/BinaryArray.java | 10 +++ .../org/apache/paimon/data/BinaryRow.java | 9 +++ .../org/apache/paimon/data/BinaryWriter.java | 8 +++ .../org/apache/paimon/data/DataGetters.java | 4 ++ .../org/apache/paimon/data/GenericArray.java | 6 ++ .../org/apache/paimon/data/GenericRow.java | 6 ++ .../org/apache/paimon/data/InternalArray.java | 3 + .../org/apache/paimon/data/InternalRow.java | 3 + .../org/apache/paimon/data/JoinedRow.java | 10 +++ .../apache/paimon/data/LazyGenericRow.java | 6 ++ .../org/apache/paimon/data/NestedRow.java | 9 +++ .../paimon/data/columnar/ColumnarArray.java | 10 +++ .../paimon/data/columnar/ColumnarRow.java | 6 ++ .../data/columnar/VectorizedColumnBatch.java | 9 +++ .../paimon/data/safe/SafeBinaryArray.java | 6 ++ .../paimon/data/safe/SafeBinaryRow.java | 6 ++ .../data/serializer/InternalSerializers.java | 2 + .../data/serializer/VariantSerializer.java | 52 ++++++++++++++ .../paimon/data/variant/GenericVariant.java | 11 +++ .../apache/paimon/data/variant/Variant.java | 10 +++ .../fileindex/bitmap/BitmapFileIndexMeta.java | 6 ++ .../fileindex/bloomfilter/FastHash.java | 6 ++ .../org/apache/paimon/memory/BytesUtils.java | 15 ++++ .../paimon/memory/MemorySegmentUtils.java | 12 ++++ .../paimon/sort/hilbert/HilbertIndexer.java | 6 ++ .../paimon/types/DataTypeDefaultVisitor.java | 4 ++ .../paimon/types/DataTypeJsonParser.java | 3 + .../org/apache/paimon/types/DataTypeRoot.java | 2 + .../apache/paimon/types/DataTypeVisitor.java | 2 + .../org/apache/paimon/types/DataTypes.java | 4 ++ .../types/InternalRowToSizeVisitor.java | 11 +++ .../org/apache/paimon/types/VariantType.java | 63 +++++++++++++++++ .../apache/paimon/utils/InternalRowUtils.java | 2 + .../apache/paimon/utils/KeyProjectedRow.java | 6 ++ .../apache/paimon/utils/ProjectedArray.java | 6 ++ .../org/apache/paimon/utils/ProjectedRow.java | 6 ++ .../apache/paimon/utils/TypeCheckUtils.java | 11 ++- .../paimon/utils/VectorMappingUtils.java | 6 ++ .../org/apache/paimon/data/BinaryRowTest.java | 45 ++++++++++++ .../org/apache/paimon/data/RowDataTest.java | 21 ++++-- .../paimon/data/SafeBinaryArrayTest.java | 10 +++ .../paimon/format/FormatReadWriteTest.java | 70 ++++++++++++++++++- .../apache/paimon/sort/zorder/ZIndexer.java | 6 ++ .../paimon/stats/SimpleStatsEvolution.java | 6 ++ .../org/apache/paimon/utils/OffsetRow.java | 6 ++ .../org/apache/paimon/utils/PartialRow.java | 6 ++ .../paimon/flink/DataTypeToLogicalType.java | 6 ++ .../apache/paimon/flink/FlinkRowWrapper.java | 11 +++ .../format/orc/writer/FieldWriterFactory.java | 6 ++ .../parquet/ParquetSchemaConverter.java | 14 ++++ .../parquet/reader/NestedColumnReader.java | 3 +- .../reader/ParquetSplitReaderUtil.java | 36 ++++++++++ .../parquet/writer/ParquetRowDataWriter.java | 28 ++++++++ .../filter2/predicate/ParquetFilters.java | 6 ++ .../org/apache/paimon/hive/HiveTypeUtils.java | 11 +++ .../hive/objectinspector/HivePaimonArray.java | 6 ++ .../org/apache/paimon/spark/SparkRow.java | 11 +++ 67 files changed, 733 insertions(+), 9 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/serializer/VariantSerializer.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/types/VariantType.java diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java index 6572fcadb9d9..ade03918f4d6 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java @@ -40,6 +40,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.Types; @@ -150,6 +151,11 @@ public FieldType visit(LocalZonedTimestampType localZonedTimestampType) { return new FieldType(localZonedTimestampType.isNullable(), arrowType, null); } + @Override + public FieldType visit(VariantType variantType) { + throw new UnsupportedOperationException(); + } + private TimeUnit getTimeUnit(int precision) { if (precision == 0) { return TimeUnit.SECOND; diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java index 2bcaab50beb4..5478e1594086 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java @@ -63,6 +63,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; @@ -423,6 +424,11 @@ public Timestamp getTimestamp(int i, int precision) { }; } + @Override + public Arrow2PaimonVectorConverter visit(VariantType variantType) { + throw new UnsupportedOperationException(); + } + @Override public Arrow2PaimonVectorConverter visit(ArrayType arrayType) { final Arrow2PaimonVectorConverter arrowVectorConvertor = diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java index eef53009cec3..20db359754f0 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java @@ -39,6 +39,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.ListVector; @@ -138,6 +139,11 @@ public ArrowFieldWriterFactory visit(LocalZonedTimestampType localZonedTimestamp fieldVector, localZonedTimestampType.getPrecision(), null); } + @Override + public ArrowFieldWriterFactory visit(VariantType variantType) { + throw new UnsupportedOperationException("Doesn't support VariantType."); + } + @Override public ArrowFieldWriterFactory visit(ArrayType arrayType) { ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this); diff --git a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala index c6a5ab13f7ef..cf5f354ee532 100644 --- a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala +++ b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala @@ -19,6 +19,7 @@ package org.apache.paimon.codegen import org.apache.paimon.data._ +import org.apache.paimon.data.variant.Variant import org.apache.paimon.memory.MemorySegment import org.apache.paimon.types._ import org.apache.paimon.types.DataTypeChecks.{getFieldCount, getFieldTypes, getPrecision, getScale} @@ -380,6 +381,7 @@ object GenerateUtils { case ARRAY => className[InternalArray] case MULTISET | MAP => className[InternalMap] case ROW => className[InternalRow] + case VARIANT => className[Variant] case _ => throw new IllegalArgumentException("Illegal type: " + t) } @@ -418,6 +420,8 @@ object GenerateUtils { s"$rowTerm.getMap($indexTerm)" case ROW => s"$rowTerm.getRow($indexTerm, ${getFieldCount(t)})" + case VARIANT => + s"$rowTerm.getVariant($indexTerm)" case _ => throw new IllegalArgumentException("Illegal type: " + t) } diff --git a/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java b/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java index f72881dfd0ff..434175acd92f 100644 --- a/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java +++ b/paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java @@ -30,6 +30,7 @@ import org.apache.paimon.data.serializer.InternalMapSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DataTypes; @@ -179,6 +180,13 @@ public class EqualiserCodeGeneratorTest { GenericRow.of(31, BinaryString.fromString("32")), GenericRow.of(31, BinaryString.fromString("33"))), new InternalRowSerializer(DataTypes.INT(), DataTypes.VARCHAR(2)))); + TEST_DATA.put( + DataTypeRoot.VARIANT, + new GeneratedData( + DataTypes.VARIANT(), + Pair.of( + GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"), + GenericVariant.fromJson("{\"age\":27,\"city\":\"Hangzhou\"}")))); } @ParameterizedTest diff --git a/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java b/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java index 24f43e3a1557..1ddae048ead2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.PartitionInfo; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; /** An implementation of {@link InternalRow} which provides a row the fixed partition value. */ @@ -153,6 +154,13 @@ public byte[] getBinary(int pos) { : row.getBinary(partitionInfo.getRealIndex(pos)); } + @Override + public Variant getVariant(int pos) { + return partitionInfo.inPartitionRow(pos) + ? partition.getVariant(partitionInfo.getRealIndex(pos)) + : row.getVariant(partitionInfo.getRealIndex(pos)); + } + @Override public InternalArray getArray(int pos) { return partitionInfo.inPartitionRow(pos) diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java index 778b11d1f887..da75539a2f1b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; /** * An implementation of {@link InternalArray} which provides a casted view of the underlying {@link @@ -184,6 +185,11 @@ public byte[] getBinary(int pos) { return castElementGetter.getElementOrNull(array, pos); } + @Override + public Variant getVariant(int pos) { + return castElementGetter.getElementOrNull(array, pos); + } + @Override public InternalArray getArray(int pos) { return castElementGetter.getElementOrNull(array, pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java index f9216d10b3a8..f74daaabeb57 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -131,6 +132,11 @@ public byte[] getBinary(int pos) { return castMapping[pos].getFieldOrNull(row); } + @Override + public Variant getVariant(int pos) { + return castMapping[pos].getFieldOrNull(row); + } + @Override public InternalArray getArray(int pos) { return castMapping[pos].getFieldOrNull(row); diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java index 4a9da5704311..99199c0a7996 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; /** @@ -181,6 +182,14 @@ public InternalRow getRow(int pos, int numFields) { return defaultValueRow.getRow(pos, numFields); } + @Override + public Variant getVariant(int pos) { + if (!row.isNullAt(pos)) { + return row.getVariant(pos); + } + return defaultValueRow.getVariant(pos); + } + public static DefaultValueRow from(InternalRow defaultValueRow) { return new DefaultValueRow(defaultValueRow); } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java index b8bc3bd938ed..2cec2bc58063 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.serializer.InternalArraySerializer; import org.apache.paimon.data.serializer.InternalMapSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySegmentUtils; @@ -177,6 +178,23 @@ public void writeTimestamp(int pos, Timestamp value, int precision) { } } + @Override + public void writeVariant(int pos, Variant variant) { + byte[] value = variant.value(); + byte[] metadata = variant.metadata(); + int totalSize = 4 + value.length + metadata.length; + final int roundedSize = roundNumberOfBytesToNearestWord(totalSize); + ensureCapacity(roundedSize); + zeroOutPaddingBytes(totalSize); + + segment.putInt(cursor, value.length); + segment.put(cursor + 4, value, 0, value.length); + segment.put(cursor + 4 + value.length, metadata, 0, metadata.length); + + setOffsetAndSize(pos, cursor, totalSize); + cursor += roundedSize; + } + protected void zeroOutPaddingBytes(int numBytes) { if ((numBytes & 0x07) > 0) { segment.putLong(cursor + ((numBytes >> 3) << 3), 0L); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java index 0c3437fb4e94..285d7354cf2b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java @@ -19,6 +19,7 @@ package org.apache.paimon.data; import org.apache.paimon.annotation.Public; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySegmentUtils; import org.apache.paimon.types.DataType; @@ -83,6 +84,7 @@ public static int calculateFixLengthPartSize(DataType type) { case MULTISET: case MAP: case ROW: + case VARIANT: // long and double are 8 bytes; // otherwise it stores the length and offset of the variable-length part for types // such as is string, map, etc. @@ -234,6 +236,14 @@ public byte[] getBinary(int pos) { return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndSize); } + @Override + public Variant getVariant(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getElementOffset(pos, 8); + final long offsetAndLen = MemorySegmentUtils.getLong(segments, fieldOffset); + return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen); + } + @Override public InternalArray getArray(int pos) { assertIndexIsValid(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java index d08c580be5ff..5a3f7e7382b4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java @@ -19,6 +19,7 @@ package org.apache.paimon.data; import org.apache.paimon.annotation.Public; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySegmentUtils; import org.apache.paimon.types.DataType; @@ -335,6 +336,14 @@ public byte[] getBinary(int pos) { return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen); } + @Override + public Variant getVariant(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = segments[0].getLong(fieldOffset); + return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen); + } + @Override public InternalArray getArray(int pos) { assertIndexIsValid(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java index 3fb27cc4ff1e..698f261bda35 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.InternalSerializers; import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.LocalZonedTimestampType; @@ -67,6 +68,8 @@ public interface BinaryWriter { void writeTimestamp(int pos, Timestamp value, int precision); + void writeVariant(int pos, Variant variant); + void writeArray(int pos, InternalArray value, InternalArraySerializer serializer); void writeMap(int pos, InternalMap value, InternalMapSerializer serializer); @@ -139,6 +142,9 @@ static void write( case VARBINARY: writer.writeBinary(pos, (byte[]) o); break; + case VARIANT: + writer.writeVariant(pos, (Variant) o); + break; default: throw new UnsupportedOperationException("Not support type: " + type); } @@ -208,6 +214,8 @@ static ValueSetter createValueSetter(DataType elementType, Serializer seriali return (writer, pos, value) -> writer.writeRow( pos, (InternalRow) value, (InternalRowSerializer) rowSerializer); + case VARIANT: + return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value); default: String msg = String.format( diff --git a/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java b/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java index fe4235e8f28b..cf9b3fd96c93 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java @@ -19,6 +19,7 @@ package org.apache.paimon.data; import org.apache.paimon.annotation.Public; +import org.apache.paimon.data.variant.Variant; /** * Getters to get data. @@ -74,6 +75,9 @@ public interface DataGetters { /** Returns the binary value at the given position. */ byte[] getBinary(int pos); + /** Returns the variant value at the given position. */ + Variant getVariant(int pos); + /** Returns the array value at the given position. */ InternalArray getArray(int pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java b/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java index 78cfa2350c80..a5149e478947 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java @@ -19,6 +19,7 @@ package org.apache.paimon.data; import org.apache.paimon.annotation.Public; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.ArrayType; import org.apache.paimon.utils.ArrayUtils; @@ -204,6 +205,11 @@ public byte[] getBinary(int pos) { return (byte[]) getObject(pos); } + @Override + public Variant getVariant(int pos) { + return (Variant) getObject(pos); + } + @Override public BinaryString getString(int pos) { return (BinaryString) getObject(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java b/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java index 929d51b055e7..1940d04cae5e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java @@ -19,6 +19,7 @@ package org.apache.paimon.data; import org.apache.paimon.annotation.Public; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -186,6 +187,11 @@ public byte[] getBinary(int pos) { return (byte[]) this.fields[pos]; } + @Override + public Variant getVariant(int pos) { + return (Variant) this.fields[pos]; + } + @Override public InternalArray getArray(int pos) { return (InternalArray) this.fields[pos]; diff --git a/paimon-common/src/main/java/org/apache/paimon/data/InternalArray.java b/paimon-common/src/main/java/org/apache/paimon/data/InternalArray.java index ce552fa04bc6..45f7af070217 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/InternalArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalArray.java @@ -130,6 +130,9 @@ static ElementGetter createElementGetter(DataType elementType) { final int rowFieldCount = getFieldCount(elementType); elementGetter = (array, pos) -> array.getRow(pos, rowFieldCount); break; + case VARIANT: + elementGetter = InternalArray::getVariant; + break; default: String msg = String.format( diff --git a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java index a83cbaec7396..4b1bdf6531bb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java @@ -221,6 +221,9 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { final int rowFieldCount = DataTypeChecks.getFieldCount(fieldType); fieldGetter = row -> row.getRow(fieldPos, rowFieldCount); break; + case VARIANT: + fieldGetter = row -> row.getVariant(fieldPos); + break; default: String msg = String.format( diff --git a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java index 3650000b6b39..818faf154dce 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java @@ -19,6 +19,7 @@ package org.apache.paimon.data; import org.apache.paimon.annotation.Public; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; import javax.annotation.Nullable; @@ -224,6 +225,15 @@ public byte[] getBinary(int pos) { } } + @Override + public Variant getVariant(int pos) { + if (pos < row1.getFieldCount()) { + return row1.getVariant(pos); + } else { + return row2.getVariant(pos - row1.getFieldCount()); + } + } + @Override public InternalArray getArray(int pos) { if (pos < row1.getFieldCount()) { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java b/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java index ecb6bb1be346..eba7fc3b618d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/LazyGenericRow.java @@ -18,6 +18,7 @@ package org.apache.paimon.data; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; import java.util.function.Supplier; @@ -142,6 +143,11 @@ public byte[] getBinary(int pos) { return (byte[]) getField(pos); } + @Override + public Variant getVariant(int pos) { + return (Variant) getField(pos); + } + @Override public InternalArray getArray(int pos) { return (InternalArray) getField(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java index 1b7b077cdb6a..a8370af7211f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java @@ -18,6 +18,7 @@ package org.apache.paimon.data; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySegmentUtils; import org.apache.paimon.types.RowKind; @@ -280,6 +281,14 @@ public byte[] getBinary(int pos) { return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen); } + @Override + public Variant getVariant(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = MemorySegmentUtils.getLong(segments, fieldOffset); + return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen); + } + @Override public InternalRow getRow(int pos, int numFields) { assertIndexIsValid(pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java index d16ddd508da8..ebf6d93e6dca 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java @@ -25,6 +25,8 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.GenericVariant; +import org.apache.paimon.data.variant.Variant; import java.io.Serializable; import java.util.Arrays; @@ -121,6 +123,14 @@ public byte[] getBinary(int pos) { } } + @Override + public Variant getVariant(int pos) { + InternalRow row = getRow(pos, 2); + byte[] value = row.getBinary(0); + byte[] metadata = row.getBinary(1); + return new GenericVariant(value, metadata); + } + @Override public InternalArray getArray(int pos) { return ((ArrayColumnVector) data).getArray(offset + pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java index 8bfe26606dfc..8614983d09c8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; import java.io.Serializable; @@ -140,6 +141,11 @@ public byte[] getBinary(int pos) { return vectorizedColumnBatch.getBinary(rowId, pos); } + @Override + public Variant getVariant(int pos) { + return vectorizedColumnBatch.getVariant(rowId, pos); + } + @Override public InternalRow getRow(int pos, int numFields) { return vectorizedColumnBatch.getRow(rowId, pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java index 4cf5f4c7c26b..18eecd29ed59 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java @@ -25,6 +25,8 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.columnar.BytesColumnVector.Bytes; +import org.apache.paimon.data.variant.GenericVariant; +import org.apache.paimon.data.variant.Variant; import java.io.Serializable; @@ -126,6 +128,13 @@ public InternalRow getRow(int rowId, int colId) { return ((RowColumnVector) columns[colId]).getRow(rowId); } + public Variant getVariant(int rowId, int colId) { + InternalRow row = getRow(rowId, colId); + byte[] value = row.getBinary(0); + byte[] metadata = row.getBinary(1); + return new GenericVariant(value, metadata); + } + public InternalMap getMap(int rowId, int colId) { return ((MapColumnVector) columns[colId]).getMap(rowId); } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java index 4170d391473a..c5c4ae624a44 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.memory.BytesUtils; import static org.apache.paimon.data.BinaryArray.calculateHeaderInBytes; @@ -146,6 +147,11 @@ public byte[] getBinary(int pos) { return BytesUtils.readBinary(bytes, offset, getElementOffset(pos, 8), getLong(pos)); } + @Override + public Variant getVariant(int pos) { + return BytesUtils.readVariant(bytes, offset, getLong(pos)); + } + @Override public InternalArray getArray(int pos) { throw new UnsupportedOperationException(); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java index ce6b3ae62e5f..0f4c5d851f00 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.memory.BytesUtils; import org.apache.paimon.types.RowKind; @@ -152,6 +153,11 @@ public byte[] getBinary(int pos) { return BytesUtils.readBinary(bytes, offset, getFieldOffset(pos), getLong(pos)); } + @Override + public Variant getVariant(int pos) { + return BytesUtils.readVariant(bytes, offset, getLong(pos)); + } + @Override public InternalArray getArray(int pos) { return readArrayData(bytes, offset, getLong(pos)); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java index 624dda481615..ce74eed590d6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java @@ -83,6 +83,8 @@ private static Serializer createInternal(DataType type) { return new InternalMapSerializer(mapType.getKeyType(), mapType.getValueType()); case ROW: return new InternalRowSerializer(getFieldTypes(type).toArray(new DataType[0])); + case VARIANT: + return VariantSerializer.INSTANCE; default: throw new UnsupportedOperationException( "Unsupported type '" + type + "' to get internal serializer"); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/VariantSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/VariantSerializer.java new file mode 100644 index 000000000000..1a652023b8ef --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/VariantSerializer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.serializer; + +import org.apache.paimon.data.variant.GenericVariant; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; + +import java.io.IOException; + +/** Type serializer for {@code Variant}. */ +public class VariantSerializer extends SerializerSingleton { + + private static final long serialVersionUID = 1L; + + public static final VariantSerializer INSTANCE = new VariantSerializer(); + + @Override + public Variant copy(Variant from) { + return from.copy(); + } + + @Override + public void serialize(Variant record, DataOutputView target) throws IOException { + BinarySerializer.INSTANCE.serialize(record.value(), target); + BinarySerializer.INSTANCE.serialize(record.metadata(), target); + } + + @Override + public Variant deserialize(DataInputView source) throws IOException { + byte[] value = BinarySerializer.INSTANCE.deserialize(source); + byte[] metadata = BinarySerializer.INSTANCE.deserialize(source); + return new GenericVariant(value, metadata); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java index 355e9123cc2d..d79eb09054d9 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java @@ -183,6 +183,17 @@ public Object variantGet(String path) { } } + @Override + public long sizeInBytes() { + return metadata.length + value.length; + } + + @Override + public Variant copy() { + return new GenericVariant( + Arrays.copyOf(value, value.length), Arrays.copyOf(metadata, metadata.length), pos); + } + // Get a boolean value from the variant. public boolean getBoolean() { return GenericVariantUtil.getBoolean(value, pos); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java index bfecfd573ad7..af37d18b26fc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java @@ -32,6 +32,10 @@ */ public interface Variant { + String METADATA = "metadata"; + + String VALUE = "value"; + /** Returns the variant metadata. */ byte[] metadata(); @@ -49,4 +53,10 @@ public interface Variant { *

access array's first elem: `$.array[0]` */ Object variantGet(String path); + + /** Returns the size of the variant in bytes. */ + long sizeInBytes(); + + /** Returns a copy of the variant. */ + Variant copy(); } diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java index 595e1219110f..e18e4169ece6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java @@ -41,6 +41,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import java.io.DataInput; import java.io.DataOutput; @@ -373,5 +374,10 @@ public final R visit(MapType mapType) { public final R visit(RowType rowType) { throw new UnsupportedOperationException("Does not support type row"); } + + @Override + public final R visit(VariantType rowType) { + throw new UnsupportedOperationException("Does not support type variant"); + } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java index d76b86c2e48e..2e87058ae953 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java @@ -42,6 +42,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import net.openhft.hashing.LongHashFunction; @@ -159,6 +160,11 @@ public FastHash visit(LocalZonedTimestampType localZonedTimestampType) { }; } + @Override + public FastHash visit(VariantType variantType) { + throw new UnsupportedOperationException("Does not support type variant"); + } + @Override public FastHash visit(ArrayType arrayType) { throw new UnsupportedOperationException("Does not support type array"); diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java b/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java index db171707379f..c88151b42e50 100644 --- a/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/memory/BytesUtils.java @@ -18,6 +18,9 @@ package org.apache.paimon.memory; +import org.apache.paimon.data.variant.GenericVariant; +import org.apache.paimon.data.variant.Variant; + import static org.apache.paimon.data.BinarySection.HIGHEST_FIRST_BIT; import static org.apache.paimon.data.BinarySection.HIGHEST_SECOND_TO_EIGHTH_BIT; import static org.apache.paimon.memory.MemorySegment.LITTLE_ENDIAN; @@ -67,4 +70,16 @@ public static byte[] readBinary( return ret; } } + + public static Variant readVariant(byte[] bytes, int baseOffset, long offsetAndLen) { + int offset = baseOffset + (int) (offsetAndLen >> 32); + int totalSize = (int) offsetAndLen; + int valueSize = getInt(bytes, offset); + int metadataSize = totalSize - 4 - valueSize; + byte[] value = new byte[valueSize]; + byte[] metadata = new byte[metadataSize]; + System.arraycopy(bytes, offset + 4, value, 0, valueSize); + System.arraycopy(bytes, offset + 4 + valueSize, metadata, 0, metadataSize); + return new GenericVariant(value, metadata); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java index a021c47ee14b..691c1e60e37d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentUtils.java @@ -28,6 +28,8 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.NestedRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.GenericVariant; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.io.DataOutputView; import org.apache.paimon.utils.MurmurHashUtils; @@ -1121,6 +1123,16 @@ public static BinaryString readBinaryString( } } + public static Variant readVariant(MemorySegment[] segments, int baseOffset, long offsetAndLen) { + int offset = baseOffset + (int) (offsetAndLen >> 32); + int totalSize = (int) offsetAndLen; + int valueSize = getInt(segments, offset); + int metadataSize = totalSize - 4 - valueSize; + byte[] value = copyToBytes(segments, offset + 4, valueSize); + byte[] metadata = copyToBytes(segments, offset + 4 + valueSize, metadataSize); + return new GenericVariant(value, metadata); + } + /** Gets an instance of {@link InternalMap} from underlying {@link MemorySegment}. */ public static InternalMap readMapData( MemorySegment[] segments, int baseOffset, long offsetAndSize) { diff --git a/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java b/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java index 14637a6b1a29..12f3e390165b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java @@ -45,6 +45,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import org.apache.paimon.utils.ConvertBinaryUtil; import org.davidmoten.hilbert.HilbertCurve; @@ -259,6 +260,11 @@ public HProcessFunction visit(LocalZonedTimestampType localZonedTimestampType) { }; } + @Override + public HProcessFunction visit(VariantType variantType) { + throw new RuntimeException("Unsupported type"); + } + @Override public HProcessFunction visit(ArrayType arrayType) { throw new RuntimeException("Unsupported type"); diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java index ca39d43cf25b..c033dd675926 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java @@ -109,6 +109,10 @@ public R visit(LocalZonedTimestampType localZonedTimestampType) { return defaultMethod(localZonedTimestampType); } + public R visit(VariantType variantType) { + return defaultMethod(variantType); + } + @Override public R visit(ArrayType arrayType) { return defaultMethod(arrayType); diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java index 1afde1b2db03..19f2dbfe7b4f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java @@ -301,6 +301,7 @@ private enum Keyword { NULL, RAW, LEGACY, + VARIANT, NOT } @@ -515,6 +516,8 @@ private DataType parseTypeByKeyword() { return parseTimestampType(); case TIMESTAMP_LTZ: return parseTimestampLtzType(); + case VARIANT: + return new VariantType(); default: throw parsingError("Unsupported type: " + token().value); } diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeRoot.java b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeRoot.java index d014a10cc9ad..620f71875c5c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeRoot.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeRoot.java @@ -100,6 +100,8 @@ public enum DataTypeRoot { DataTypeFamily.TIMESTAMP, DataTypeFamily.EXTENSION), + VARIANT(DataTypeFamily.PREDEFINED), + ARRAY(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION), MULTISET(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION), diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeVisitor.java b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeVisitor.java index c83fcdbac569..22e8fbfdb79a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypeVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypeVisitor.java @@ -62,6 +62,8 @@ public interface DataTypeVisitor { R visit(LocalZonedTimestampType localZonedTimestampType); + R visit(VariantType variantType); + R visit(ArrayType arrayType); R visit(MultisetType multisetType); diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java b/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java index b025b6a838e0..bc2a56dfb722 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java @@ -151,6 +151,10 @@ public static MultisetType MULTISET(DataType elementType) { return new MultisetType(elementType); } + public static VariantType VARIANT() { + return new VariantType(); + } + public static OptionalInt getPrecision(DataType dataType) { return dataType.accept(PRECISION_EXTRACTOR); } diff --git a/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java b/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java index 4e1ad782cf21..e6dca10832c7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java @@ -211,6 +211,17 @@ public BiFunction visit( }; } + @Override + public BiFunction visit(VariantType variantType) { + return (row, index) -> { + if (row.isNullAt(index)) { + return NULL_SIZE; + } else { + return Math.toIntExact(row.getVariant(index).sizeInBytes()); + } + }; + } + @Override public BiFunction visit(ArrayType arrayType) { return (row, index) -> { diff --git a/paimon-common/src/main/java/org/apache/paimon/types/VariantType.java b/paimon-common/src/main/java/org/apache/paimon/types/VariantType.java new file mode 100644 index 000000000000..3537dd6e15c9 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/types/VariantType.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.types; + +import org.apache.paimon.annotation.Public; +import org.apache.paimon.data.variant.Variant; + +/** + * Data type of {@link Variant}. + * + * @since 1.1.0 + */ +@Public +public class VariantType extends DataType { + + private static final long serialVersionUID = 1L; + + private static final String FORMAT = "VARIANT"; + + public VariantType(boolean isNullable) { + super(isNullable, DataTypeRoot.VARIANT); + } + + public VariantType() { + this(true); + } + + @Override + public int defaultSize() { + return 2048; + } + + @Override + public DataType copy(boolean isNullable) { + return new VariantType(isNullable); + } + + @Override + public String asSQLString() { + return withNullability(FORMAT); + } + + @Override + public R accept(DataTypeVisitor visitor) { + return visitor.visit(this); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java index eaa077fa81ff..bd46bae6312b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java @@ -195,6 +195,8 @@ public static Object get(DataGetters dataGetters, int pos, DataType fieldType) { case BINARY: case VARBINARY: return dataGetters.getBinary(pos); + case VARIANT: + return dataGetters.getVariant(pos); default: throw new UnsupportedOperationException("Unsupported type: " + fieldType); } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java b/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java index 01d9c313ad7d..359f2d676ade 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/KeyProjectedRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; import java.util.Arrays; @@ -123,6 +124,11 @@ public byte[] getBinary(int pos) { return row.getBinary(indexMapping[pos]); } + @Override + public Variant getVariant(int pos) { + return row.getVariant(indexMapping[pos]); + } + @Override public InternalArray getArray(int pos) { return row.getArray(indexMapping[pos]); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java index 2182ea1a32bc..766d05a530ca 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.DataType; /** @@ -125,6 +126,11 @@ public byte[] getBinary(int pos) { return array.getBinary(indexMapping[pos]); } + @Override + public Variant getVariant(int pos) { + return array.getVariant(indexMapping[pos]); + } + @Override public InternalArray getArray(int pos) { return array.getArray(indexMapping[pos]); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java index ee29a0c2b243..816a1d011168 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -140,6 +141,11 @@ public byte[] getBinary(int pos) { return row.getBinary(indexMapping[pos]); } + @Override + public Variant getVariant(int pos) { + return row.getVariant(indexMapping[pos]); + } + @Override public InternalArray getArray(int pos) { return row.getArray(indexMapping[pos]); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeCheckUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeCheckUtils.java index be93a6574c04..0cb943a12cb9 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeCheckUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeCheckUtils.java @@ -31,6 +31,7 @@ import static org.apache.paimon.types.DataTypeRoot.ROW; import static org.apache.paimon.types.DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; import static org.apache.paimon.types.DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE; +import static org.apache.paimon.types.DataTypeRoot.VARIANT; /** Utils for type. */ public class TypeCheckUtils { @@ -95,8 +96,16 @@ public static boolean isRow(DataType type) { return type.getTypeRoot() == ROW; } + public static boolean isVariant(DataType type) { + return type.getTypeRoot() == VARIANT; + } + public static boolean isComparable(DataType type) { - return !isMap(type) && !isMultiset(type) && !isRow(type) && !isArray(type); + return !isMap(type) + && !isMultiset(type) + && !isRow(type) + && !isArray(type) + && !isVariant(type); } public static boolean isMutable(DataType type) { diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java index 02b011a2f1cf..dc8a910dd692 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java @@ -62,6 +62,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; /** * This is a util about how to expand the {@link ColumnVector}s with the partition row and index @@ -321,6 +322,11 @@ public boolean isNullAt(int i) { }; } + @Override + public ColumnVector visit(VariantType variantType) { + throw new UnsupportedOperationException("VariantType is not supported."); + } + @Override public ColumnVector visit(ArrayType arrayType) { return new ArrayColumnVector() { diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java index f7f64a536c20..005197d2ce64 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.InternalSerializers; import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -915,4 +916,48 @@ public void testNestedRowWithBinaryRowEquals() { assertThat(nestedBinaryRow.getRow(1, 2)).isEqualTo(innerBinaryRow); assertThat(innerBinaryRow).isEqualTo(nestedBinaryRow.getRow(1, 2)); } + + @Test + public void testVariant() { + BinaryRow row = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(row); + + writer.writeVariant(0, GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}")); + writer.setNullAt(1); + writer.complete(); + + assertThat(row.getVariant(0).toJson()).isEqualTo("{\"age\":27,\"city\":\"Beijing\"}"); + assertThat(row.isNullAt(1)).isTrue(); + } + + @Test + public void testVariantArray() { + // 1. array test + BinaryArray array = new BinaryArray(); + BinaryArrayWriter arrayWriter = + new BinaryArrayWriter( + array, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.VARIANT())); + + arrayWriter.writeVariant(0, GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}")); + arrayWriter.setNullAt(1); + arrayWriter.writeVariant(2, GenericVariant.fromJson("{\"age\":27,\"city\":\"Hangzhou\"}")); + arrayWriter.complete(); + + assertThat(array.getVariant(0).toJson()).isEqualTo("{\"age\":27,\"city\":\"Beijing\"}"); + assertThat(array.isNullAt(1)).isTrue(); + assertThat(array.getVariant(2).toJson()).isEqualTo("{\"age\":27,\"city\":\"Hangzhou\"}"); + + // 2. test write array to binary row + BinaryRow row = new BinaryRow(1); + BinaryRowWriter rowWriter = new BinaryRowWriter(row); + InternalArraySerializer serializer = new InternalArraySerializer(DataTypes.VARIANT()); + rowWriter.writeArray(0, array, serializer); + rowWriter.complete(); + + BinaryArray array2 = (BinaryArray) row.getArray(0); + assertThat(array2).isEqualTo(array); + assertThat(array2.getVariant(0).toJson()).isEqualTo("{\"age\":27,\"city\":\"Beijing\"}"); + assertThat(array2.isNullAt(1)).isTrue(); + assertThat(array2.getVariant(2).toJson()).isEqualTo("{\"age\":27,\"city\":\"Hangzhou\"}"); + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java b/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java index 2998eade6400..05e5ba13d623 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/RowDataTest.java @@ -22,6 +22,8 @@ import org.apache.paimon.data.serializer.InternalArraySerializer; import org.apache.paimon.data.serializer.InternalMapSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.variant.GenericVariant; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowKind; @@ -47,6 +49,7 @@ public class RowDataTest { private byte[] bytes; private Timestamp timestamp1; private Timestamp timestamp2; + private Variant variant; @BeforeEach public void before() { @@ -71,6 +74,7 @@ public void before() { bytes = new byte[] {1, 5, 6}; timestamp1 = Timestamp.fromEpochMillis(123L); timestamp2 = Timestamp.fromLocalDateTime(LocalDateTime.of(1969, 1, 1, 0, 0, 0, 123456789)); + variant = GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"); } @Test @@ -87,7 +91,7 @@ public void testNestedRow() { writer.writeRow(0, getBinaryRow(), null); writer.complete(); - InternalRow nestedRow = row.getRow(0, 18); + InternalRow nestedRow = row.getRow(0, 19); testGetters(nestedRow); testSetters(nestedRow); } @@ -96,7 +100,7 @@ public void testNestedRow() { public void testSafeBinaryRow() { BinaryRow binaryRow = getBinaryRow(); SafeBinaryRow row = new SafeBinaryRow(binaryRow.getFieldCount(), binaryRow.toBytes(), 0); - assertThat(row.getFieldCount()).isEqualTo(18); + assertThat(row.getFieldCount()).isEqualTo(19); // test header assertThat(row.getRowKind()).isEqualTo(RowKind.INSERT); @@ -128,6 +132,7 @@ public void testSafeBinaryRow() { assertThat(row.getBinary(15)).isEqualTo(bytes); assertThat(row.getTimestamp(16, 3)).isEqualTo(timestamp1); assertThat(row.getTimestamp(17, 9)).isEqualTo(timestamp2); + assertThat(row.getVariant(18).toJson()).isEqualTo(variant.toJson()); // test isNull binaryRow.setNullAt(10); @@ -136,7 +141,7 @@ public void testSafeBinaryRow() { } private BinaryRow getBinaryRow() { - BinaryRow row = new BinaryRow(18); + BinaryRow row = new BinaryRow(19); BinaryRowWriter writer = new BinaryRowWriter(row); writer.writeBoolean(0, true); writer.writeByte(1, (byte) 1); @@ -156,12 +161,13 @@ private BinaryRow getBinaryRow() { writer.writeBinary(15, bytes); writer.writeTimestamp(16, timestamp1, 3); writer.writeTimestamp(17, timestamp2, 9); + writer.writeVariant(18, variant); return row; } @Test public void testGenericRow() { - GenericRow row = new GenericRow(18); + GenericRow row = new GenericRow(19); row.setField(0, true); row.setField(1, (byte) 1); row.setField(2, (short) 2); @@ -180,6 +186,7 @@ public void testGenericRow() { row.setField(15, bytes); row.setField(16, timestamp1); row.setField(17, timestamp2); + row.setField(18, variant); testGetters(row); } @@ -192,7 +199,7 @@ public void testJoinedRow() { row1.setField(3, 3); row1.setField(4, (long) 4); - GenericRow row2 = new GenericRow(13); + GenericRow row2 = new GenericRow(14); row2.setField(0, (float) 5); row2.setField(1, (double) 6); row2.setField(2, (char) 7); @@ -206,11 +213,12 @@ public void testJoinedRow() { row2.setField(10, bytes); row2.setField(11, timestamp1); row2.setField(12, timestamp2); + row2.setField(13, variant); testGetters(new JoinedRow(row1, row2)); } private void testGetters(InternalRow row) { - assertThat(row.getFieldCount()).isEqualTo(18); + assertThat(row.getFieldCount()).isEqualTo(19); // test header assertThat(row.getRowKind()).isEqualTo(RowKind.INSERT); @@ -236,6 +244,7 @@ private void testGetters(InternalRow row) { assertThat(row.getBinary(15)).isEqualTo(bytes); assertThat(row.getTimestamp(16, 3)).isEqualTo(timestamp1); assertThat(row.getTimestamp(17, 9)).isEqualTo(timestamp2); + assertThat(row.getVariant(18).toJson()).isEqualTo(variant.toJson()); } private void testSetters(InternalRow row) { diff --git a/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java b/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java index 08490f18c66c..4163a7eaad2b 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/SafeBinaryArrayTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.safe.SafeBinaryArray; import org.apache.paimon.data.serializer.InternalArraySerializer; +import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -125,6 +126,15 @@ public void test() { BinaryString.fromString("14611asdfadsaf").toBytes()); converted = toBinaryArray(DataTypes.BYTES(), new SafeBinaryArray(expected.toBytes(), 0)); assertThat(converted).isEqualTo(expected); + + expected = + toBinaryArray( + DataTypes.VARIANT(), + GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"), + null, + GenericVariant.fromJson("{\"age\":27,\"city\":\"Hangzhou\"}")); + converted = toBinaryArray(DataTypes.VARIANT(), new SafeBinaryArray(expected.toBytes(), 0)); + assertThat(converted).isEqualTo(expected); } private BinaryArray toBinaryArray(DataType eleType, Object... values) { diff --git a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java index d3114cee6d76..717439072c59 100644 --- a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.format; +import org.apache.paimon.data.BinaryArray; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericMap; @@ -27,6 +28,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.PositionOutputStream; @@ -36,6 +38,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VariantType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -70,7 +73,7 @@ protected FormatReadWriteTest(String formatType) { @BeforeEach public void beforeEach() { this.fileIO = LocalFileIO.create(); - this.file = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString()); + this.file = new Path(new Path(tempPath.toUri()), UUID.randomUUID() + "." + formatType); } protected abstract FileFormat fileFormat(); @@ -171,6 +174,71 @@ public void testNestedReadPruning() throws Exception { assertThat(result).containsExactly(GenericRow.of(GenericRow.of(10, 12))); } + @Test + public void testReadWriteVariant() throws IOException { + FileFormat format = fileFormat(); + // todo: support other format types + if (!format.getFormatIdentifier().equals("parquet")) { + return; + } + + RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v", DataTypes.VARIANT())); + + try (PositionOutputStream out = fileIO.newOutputStream(file, false); + FormatWriter writer = format.createWriterFactory(writeType).create(out, "zstd")) { + writer.addElement( + GenericRow.of(GenericVariant.fromJson("{\"age\":35,\"city\":\"Chicago\"}"))); + } + + List result = new ArrayList<>(); + try (RecordReader reader = + format.createReaderFactory(writeType) + .createReader( + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)))) { + InternalRowSerializer serializer = new InternalRowSerializer(writeType); + reader.forEachRemaining(row -> result.add(serializer.copy(row))); + } + + assertThat(result.get(0).getVariant(0).toJson()) + .isEqualTo("{\"age\":35,\"city\":\"Chicago\"}"); + } + + @Test + public void testReadWriteVariantList() throws IOException { + FileFormat format = fileFormat(); + // todo: support other format types + if (!format.getFormatIdentifier().equals("parquet")) { + return; + } + + RowType writeType = DataTypes.ROW(new ArrayType(true, new VariantType())); + + try (PositionOutputStream out = fileIO.newOutputStream(file, false); + FormatWriter writer = format.createWriterFactory(writeType).create(out, "zstd")) { + writer.addElement( + GenericRow.of( + new GenericArray( + new Object[] { + GenericVariant.fromJson( + "{\"age\":35,\"city\":\"Chicago\"}"), + GenericVariant.fromJson("{\"age\":45,\"city\":\"Beijing\"}") + }))); + } + + List result = new ArrayList<>(); + try (RecordReader reader = + format.createReaderFactory(writeType) + .createReader( + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)))) { + InternalRowSerializer serializer = new InternalRowSerializer(writeType); + reader.forEachRemaining(row -> result.add(serializer.copy(row))); + } + InternalRow internalRow = result.get(0); + BinaryArray array = (BinaryArray) internalRow.getArray(0); + assertThat(array.getVariant(0).toJson()).isEqualTo("{\"age\":35,\"city\":\"Chicago\"}"); + assertThat(array.getVariant(1).toJson()).isEqualTo("{\"age\":45,\"city\":\"Beijing\"}"); + } + private RowType rowTypeForFullTypesTest() { RowType.Builder builder = RowType.builder() diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java index 4a4071476c71..14af2a95a7b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java +++ b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java @@ -46,6 +46,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import org.apache.paimon.utils.ZOrderByteUtils; import java.io.Serializable; @@ -348,6 +349,11 @@ public ZProcessFunction visit(LocalZonedTimestampType localZonedTimestampType) { }; } + @Override + public ZProcessFunction visit(VariantType variantType) { + throw new RuntimeException("Unsupported type"); + } + @Override public ZProcessFunction visit(ArrayType arrayType) { throw new RuntimeException("Unsupported type"); diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java index 079300a89dd2..fb029eccdb9f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java @@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.format.SimpleColStats; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ProjectedArray; @@ -214,6 +215,11 @@ public byte[] getBinary(int pos) { throw new UnsupportedOperationException(); } + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException(); + } + @Override public InternalArray getArray(int pos) { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java b/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java index 4078e2b9257f..822488f27124 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; /** A {@link InternalRow} to wrap row with offset. */ @@ -120,6 +121,11 @@ public byte[] getBinary(int pos) { return row.getBinary(offset + pos); } + @Override + public Variant getVariant(int pos) { + return row.getVariant(offset + pos); + } + @Override public InternalArray getArray(int pos) { return row.getArray(offset + pos); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PartialRow.java b/paimon-core/src/main/java/org/apache/paimon/utils/PartialRow.java index 7794084d4c2f..a090352a9da8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/PartialRow.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartialRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; /** A {@link InternalRow} to wrap row with partial fields. */ @@ -122,6 +123,11 @@ public byte[] getBinary(int pos) { return row.getBinary(pos); } + @Override + public Variant getVariant(int pos) { + return row.getVariant(pos); + } + @Override public InternalArray getArray(int pos) { return row.getArray(pos); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java index 03f2f367bba2..8ffc9c4efc52 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java @@ -41,6 +41,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import org.apache.flink.table.types.logical.LogicalType; @@ -140,6 +141,11 @@ public LogicalType visit(LocalZonedTimestampType localZonedTimestampType) { localZonedTimestampType.isNullable(), localZonedTimestampType.getPrecision()); } + @Override + public LogicalType visit(VariantType variantType) { + throw new UnsupportedOperationException("VariantType is not supported."); + } + @Override public LogicalType visit(ArrayType arrayType) { return new org.apache.flink.table.types.logical.ArrayType( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java index 08359784db60..91ab7cfd5be3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; import org.apache.flink.table.data.DecimalData; @@ -118,6 +119,11 @@ public byte[] getBinary(int pos) { return row.getBinary(pos); } + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException(); + } + @Override public InternalArray getArray(int pos) { return new FlinkArrayWrapper(row.getArray(pos)); @@ -206,6 +212,11 @@ public byte[] getBinary(int pos) { return array.getBinary(pos); } + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException(); + } + @Override public InternalArray getArray(int pos) { return new FlinkArrayWrapper(array.getArray(pos)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java index 77fe130a1963..26b008ab1c60 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java @@ -43,6 +43,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -193,6 +194,11 @@ public FieldWriter visit(LocalZonedTimestampType localZonedTimestampType) { }; } + @Override + public FieldWriter visit(VariantType variantType) { + throw new UnsupportedOperationException("Unsupported type: " + variantType); + } + @Override public FieldWriter visit(DecimalType decimalType) { return (rowId, column, getters, columnId) -> { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index 708e5eb7ea3d..ba6d4e5e0009 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -18,6 +18,7 @@ package org.apache.paimon.format.parquet; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; @@ -206,6 +207,19 @@ private static Type convertToParquetType(String name, DataType type, int fieldId RowType rowType = (RowType) type; return new GroupType(repetition, name, convertToParquetTypes(rowType)) .withId(fieldId); + case VARIANT: + return Types.buildGroup(repetition) + .addField( + Types.primitive( + PrimitiveType.PrimitiveTypeName.BINARY, + Type.Repetition.REQUIRED) + .named(Variant.VALUE)) + .addField( + Types.primitive( + PrimitiveType.PrimitiveTypeName.BINARY, + Type.Repetition.REQUIRED) + .named(Variant.METADATA)) + .named(name); default: throw new UnsupportedOperationException("Unsupported type: " + type); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index 0dab88320863..3e6da303b3d4 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -35,6 +35,7 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VariantType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; @@ -94,7 +95,7 @@ public void readToVector(int readNumber, WritableColumnVector vector) throws IOE private Pair readData( ParquetField field, int readNumber, ColumnVector vector, boolean inside) throws IOException { - if (field.getType() instanceof RowType) { + if (field.getType() instanceof RowType || field.getType() instanceof VariantType) { return readRow((ParquetGroupField) field, readNumber, vector, inside); } else if (field.getType() instanceof MapType || field.getType() instanceof MultisetType) { return readMap((ParquetGroupField) field, readNumber, vector, inside); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java index a2be77414d5a..0f88167eafd7 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java @@ -31,11 +31,13 @@ import org.apache.paimon.data.columnar.heap.HeapShortVector; import org.apache.paimon.data.columnar.heap.HeapTimestampVector; import org.apache.paimon.data.columnar.writable.WritableColumnVector; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.format.parquet.ParquetSchemaConverter; import org.apache.paimon.format.parquet.type.ParquetField; import org.apache.paimon.format.parquet.type.ParquetGroupField; import org.apache.paimon.format.parquet.type.ParquetPrimitiveField; import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BinaryType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeChecks; @@ -44,6 +46,7 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VariantType; import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -128,6 +131,11 @@ public static ColumnReader createColumnReader( pages, ((DecimalType) fieldType).getPrecision()); } + case VARIANT: + List fieldReaders = new ArrayList<>(); + fieldReaders.add(new BytesColumnReader(descriptors.get(0), pages)); + fieldReaders.add(new BytesColumnReader(descriptors.get(1), pages)); + return new RowColumnReader(fieldReaders); case ARRAY: case MAP: case MULTISET: @@ -331,6 +339,11 @@ public static WritableColumnVector createWritableColumnVector( depth + 1); } return new HeapRowVector(batchSize, columnVectors); + case VARIANT: + WritableColumnVector[] vectors = new WritableColumnVector[2]; + vectors[0] = new HeapBytesVector(batchSize); + vectors[1] = new HeapBytesVector(batchSize); + return new HeapRowVector(batchSize, vectors); default: throw new UnsupportedOperationException(fieldType + " is not supported now."); } @@ -390,6 +403,29 @@ private static ParquetField constructField(DataField dataField, ColumnIO columnI type, repetitionLevel, definitionLevel, required, fieldsBuilder.build()); } + if (type instanceof VariantType) { + GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO; + ImmutableList.Builder fieldsBuilder = ImmutableList.builder(); + PrimitiveColumnIO value = + (PrimitiveColumnIO) lookupColumnByName(groupColumnIO, Variant.VALUE); + fieldsBuilder.add( + new ParquetPrimitiveField( + new BinaryType(), + required, + value.getColumnDescriptor(), + value.getId())); + PrimitiveColumnIO metadata = + (PrimitiveColumnIO) lookupColumnByName(groupColumnIO, Variant.METADATA); + fieldsBuilder.add( + new ParquetPrimitiveField( + new BinaryType(), + required, + metadata.getColumnDescriptor(), + metadata.getId())); + return new ParquetGroupField( + type, repetitionLevel, definitionLevel, required, fieldsBuilder.build()); + } + if (type instanceof MapType) { GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO; GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java index 2ba2812508d0..3f9f14af45be 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.format.parquet.ParquetSchemaConverter; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; @@ -33,6 +34,7 @@ import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.VariantType; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; @@ -127,6 +129,8 @@ private FieldWriter createWriter(DataType t, Type type) { ((MultisetType) t).getElementType(), new IntType(false), groupType); } else if (t instanceof RowType && type instanceof GroupType) { return new RowWriter((RowType) t, groupType); + } else if (t instanceof VariantType && type instanceof GroupType) { + return new VariantWriter(); } else { throw new UnsupportedOperationException("Unsupported type: " + type); } @@ -543,6 +547,30 @@ public void write(InternalArray arrayData, int ordinal) { } } + private class VariantWriter implements FieldWriter { + + @Override + public void write(InternalRow row, int ordinal) { + writeVariant(row.getVariant(ordinal)); + } + + @Override + public void write(InternalArray arrayData, int ordinal) { + writeVariant(arrayData.getVariant(ordinal)); + } + + private void writeVariant(Variant variant) { + recordConsumer.startGroup(); + recordConsumer.startField(Variant.VALUE, 0); + recordConsumer.addBinary(Binary.fromReusedByteArray(variant.value())); + recordConsumer.endField(Variant.VALUE, 0); + recordConsumer.startField(Variant.METADATA, 1); + recordConsumer.addBinary(Binary.fromReusedByteArray(variant.metadata())); + recordConsumer.endField(Variant.METADATA, 1); + recordConsumer.endGroup(); + } + } + private Binary timestampToInt96(Timestamp timestamp) { int julianDay; long nanosOfDay; diff --git a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java index 96cf2fe726cf..cb2e95122773 100644 --- a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java +++ b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java @@ -43,6 +43,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.io.api.Binary; @@ -281,6 +282,11 @@ public Operators.Column visit(LocalZonedTimestampType localZonedTimestampType throw new UnsupportedOperationException(); } + @Override + public Operators.Column visit(VariantType variantType) { + throw new UnsupportedOperationException(); + } + // ===================== can not support ========================= @Override diff --git a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java index 33cd45a351a4..844128539d35 100644 --- a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java +++ b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.hive; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; @@ -42,6 +43,7 @@ import org.apache.paimon.types.TinyIntType; import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveVarchar; @@ -56,6 +58,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -218,6 +221,14 @@ public TypeInfo visit(RowType rowType) { return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos); } + @Override + public TypeInfo visit(VariantType variantType) { + List fieldNames = Arrays.asList(Variant.VALUE, Variant.METADATA); + List typeInfos = + Arrays.asList(TypeInfoFactory.binaryTypeInfo, TypeInfoFactory.binaryTypeInfo); + return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos); + } + @Override protected TypeInfo defaultMethod(org.apache.paimon.types.DataType dataType) { throw new UnsupportedOperationException("Unsupported type: " + dataType); diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java index e79ed82bf544..31a99624e541 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; @@ -114,6 +115,11 @@ public byte[] getBinary(int i) { return getAs(i); } + @Override + public Variant getVariant(int pos) { + return getAs(pos); + } + @Override public InternalArray getArray(int i) { return new HivePaimonArray( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java index 60648a8385dc..1ad50286a2af 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.spark.util.shim.TypeUtils; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; @@ -143,6 +144,11 @@ public byte[] getBinary(int i) { return row.getAs(i); } + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException(); + } + @Override public InternalArray getArray(int i) { return new PaimonArray(((ArrayType) type.getTypeAt(i)).getElementType(), row.getList(i)); @@ -300,6 +306,11 @@ public byte[] getBinary(int i) { return getAs(i); } + @Override + public Variant getVariant(int pos) { + throw new UnsupportedOperationException(); + } + @Override public InternalArray getArray(int i) { Object o = getAs(i);