Skip to content

Commit

Permalink
[core] Introduce VariantType (apache#4757)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Dec 23, 2024
1 parent 9977760 commit c0023f0
Show file tree
Hide file tree
Showing 67 changed files with 732 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.Types;
Expand Down Expand Up @@ -150,6 +151,11 @@ public FieldType visit(LocalZonedTimestampType localZonedTimestampType) {
return new FieldType(localZonedTimestampType.isNullable(), arrowType, null);
}

@Override
public FieldType visit(VariantType variantType) {
throw new UnsupportedOperationException();
}

private TimeUnit getTimeUnit(int precision) {
if (precision == 0) {
return TimeUnit.SECOND;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
Expand Down Expand Up @@ -423,6 +424,11 @@ public Timestamp getTimestamp(int i, int precision) {
};
}

@Override
public Arrow2PaimonVectorConverter visit(VariantType variantType) {
throw new UnsupportedOperationException();
}

@Override
public Arrow2PaimonVectorConverter visit(ArrayType arrayType) {
final Arrow2PaimonVectorConverter arrowVectorConvertor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.ListVector;
Expand Down Expand Up @@ -138,6 +139,11 @@ public ArrowFieldWriterFactory visit(LocalZonedTimestampType localZonedTimestamp
fieldVector, localZonedTimestampType.getPrecision(), null);
}

@Override
public ArrowFieldWriterFactory visit(VariantType variantType) {
throw new UnsupportedOperationException("Doesn't support VariantType.");
}

@Override
public ArrowFieldWriterFactory visit(ArrayType arrayType) {
ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.codegen

import org.apache.paimon.data._
import org.apache.paimon.data.variant.Variant
import org.apache.paimon.memory.MemorySegment
import org.apache.paimon.types._
import org.apache.paimon.types.DataTypeChecks.{getFieldCount, getFieldTypes, getPrecision, getScale}
Expand Down Expand Up @@ -380,6 +381,7 @@ object GenerateUtils {
case ARRAY => className[InternalArray]
case MULTISET | MAP => className[InternalMap]
case ROW => className[InternalRow]
case VARIANT => className[Variant]
case _ =>
throw new IllegalArgumentException("Illegal type: " + t)
}
Expand Down Expand Up @@ -418,6 +420,8 @@ object GenerateUtils {
s"$rowTerm.getMap($indexTerm)"
case ROW =>
s"$rowTerm.getRow($indexTerm, ${getFieldCount(t)})"
case VARIANT =>
s"$rowTerm.getVariant($indexTerm)"
case _ =>
throw new IllegalArgumentException("Illegal type: " + t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -179,6 +180,13 @@ public class EqualiserCodeGeneratorTest {
GenericRow.of(31, BinaryString.fromString("32")),
GenericRow.of(31, BinaryString.fromString("33"))),
new InternalRowSerializer(DataTypes.INT(), DataTypes.VARCHAR(2))));
TEST_DATA.put(
DataTypeRoot.VARIANT,
new GeneratedData(
DataTypes.VARIANT(),
Pair.of(
GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"),
GenericVariant.fromJson("{\"age\":27,\"city\":\"Hangzhou\"}"))));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

/** An implementation of {@link InternalRow} which provides a row the fixed partition value. */
Expand Down Expand Up @@ -153,6 +154,13 @@ public byte[] getBinary(int pos) {
: row.getBinary(partitionInfo.getRealIndex(pos));
}

@Override
public Variant getVariant(int pos) {
return partitionInfo.inPartitionRow(pos)
? partition.getVariant(partitionInfo.getRealIndex(pos))
: row.getVariant(partitionInfo.getRealIndex(pos));
}

@Override
public InternalArray getArray(int pos) {
return partitionInfo.inPartitionRow(pos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;

/**
* An implementation of {@link InternalArray} which provides a casted view of the underlying {@link
Expand Down Expand Up @@ -184,6 +185,11 @@ public byte[] getBinary(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public Variant getVariant(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public InternalArray getArray(int pos) {
return castElementGetter.getElementOrNull(array, pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -131,6 +132,11 @@ public byte[] getBinary(int pos) {
return castMapping[pos].getFieldOrNull(row);
}

@Override
public Variant getVariant(int pos) {
return castMapping[pos].getFieldOrNull(row);
}

@Override
public InternalArray getArray(int pos) {
return castMapping[pos].getFieldOrNull(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

/**
Expand Down Expand Up @@ -181,6 +182,14 @@ public InternalRow getRow(int pos, int numFields) {
return defaultValueRow.getRow(pos, numFields);
}

@Override
public Variant getVariant(int pos) {
if (!row.isNullAt(pos)) {
return row.getVariant(pos);
}
return defaultValueRow.getVariant(pos);
}

public static DefaultValueRow from(InternalRow defaultValueRow) {
return new DefaultValueRow(defaultValueRow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;

Expand Down Expand Up @@ -177,6 +178,23 @@ public void writeTimestamp(int pos, Timestamp value, int precision) {
}
}

@Override
public void writeVariant(int pos, Variant variant) {
byte[] value = variant.value();
byte[] metadata = variant.metadata();
int totalSize = 4 + value.length + metadata.length;
final int roundedSize = roundNumberOfBytesToNearestWord(totalSize);
ensureCapacity(roundedSize);
zeroOutPaddingBytes(totalSize);

segment.putInt(cursor, value.length);
segment.put(cursor + 4, value, 0, value.length);
segment.put(cursor + 4 + value.length, metadata, 0, metadata.length);

setOffsetAndSize(pos, cursor, totalSize);
cursor += roundedSize;
}

protected void zeroOutPaddingBytes(int numBytes) {
if ((numBytes & 0x07) > 0) {
segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -83,6 +84,7 @@ public static int calculateFixLengthPartSize(DataType type) {
case MULTISET:
case MAP:
case ROW:
case VARIANT:
// long and double are 8 bytes;
// otherwise it stores the length and offset of the variable-length part for types
// such as is string, map, etc.
Expand Down Expand Up @@ -234,6 +236,14 @@ public byte[] getBinary(int pos) {
return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndSize);
}

@Override
public Variant getVariant(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndLen = MemorySegmentUtils.getLong(segments, fieldOffset);
return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen);
}

@Override
public InternalArray getArray(int pos) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -335,6 +336,14 @@ public byte[] getBinary(int pos) {
return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen);
}

@Override
public Variant getVariant(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = segments[0].getLong(fieldOffset);
return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen);
}

@Override
public InternalArray getArray(int pos) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
Expand Down Expand Up @@ -67,6 +68,8 @@ public interface BinaryWriter {

void writeTimestamp(int pos, Timestamp value, int precision);

void writeVariant(int pos, Variant variant);

void writeArray(int pos, InternalArray value, InternalArraySerializer serializer);

void writeMap(int pos, InternalMap value, InternalMapSerializer serializer);
Expand Down Expand Up @@ -139,6 +142,9 @@ static void write(
case VARBINARY:
writer.writeBinary(pos, (byte[]) o);
break;
case VARIANT:
writer.writeVariant(pos, (Variant) o);
break;
default:
throw new UnsupportedOperationException("Not support type: " + type);
}
Expand Down Expand Up @@ -208,6 +214,8 @@ static ValueSetter createValueSetter(DataType elementType, Serializer<?> seriali
return (writer, pos, value) ->
writer.writeRow(
pos, (InternalRow) value, (InternalRowSerializer) rowSerializer);
case VARIANT:
return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value);
default:
String msg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;

/**
* Getters to get data.
Expand Down Expand Up @@ -74,6 +75,9 @@ public interface DataGetters {
/** Returns the binary value at the given position. */
byte[] getBinary(int pos);

/** Returns the variant value at the given position. */
Variant getVariant(int pos);

/** Returns the array value at the given position. */
InternalArray getArray(int pos);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.utils.ArrayUtils;

Expand Down Expand Up @@ -204,6 +205,11 @@ public byte[] getBinary(int pos) {
return (byte[]) getObject(pos);
}

@Override
public Variant getVariant(int pos) {
return (Variant) getObject(pos);
}

@Override
public BinaryString getString(int pos) {
return (BinaryString) getObject(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

Expand Down Expand Up @@ -186,6 +187,11 @@ public byte[] getBinary(int pos) {
return (byte[]) this.fields[pos];
}

@Override
public Variant getVariant(int pos) {
return (Variant) this.fields[pos];
}

@Override
public InternalArray getArray(int pos) {
return (InternalArray) this.fields[pos];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ static ElementGetter createElementGetter(DataType elementType) {
final int rowFieldCount = getFieldCount(elementType);
elementGetter = (array, pos) -> array.getRow(pos, rowFieldCount);
break;
case VARIANT:
elementGetter = InternalArray::getVariant;
break;
default:
String msg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
final int rowFieldCount = DataTypeChecks.getFieldCount(fieldType);
fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
break;
case VARIANT:
fieldGetter = row -> row.getVariant(fieldPos);
break;
default:
String msg =
String.format(
Expand Down
Loading

0 comments on commit c0023f0

Please sign in to comment.