Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce VariantType #4757

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.Types;
Expand Down Expand Up @@ -150,6 +151,11 @@ public FieldType visit(LocalZonedTimestampType localZonedTimestampType) {
return new FieldType(localZonedTimestampType.isNullable(), arrowType, null);
}

@Override
public FieldType visit(VariantType variantType) {
throw new UnsupportedOperationException();
}

private TimeUnit getTimeUnit(int precision) {
if (precision == 0) {
return TimeUnit.SECOND;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
Expand Down Expand Up @@ -423,6 +424,11 @@ public Timestamp getTimestamp(int i, int precision) {
};
}

@Override
public Arrow2PaimonVectorConverter visit(VariantType variantType) {
throw new UnsupportedOperationException();
}

@Override
public Arrow2PaimonVectorConverter visit(ArrayType arrayType) {
final Arrow2PaimonVectorConverter arrowVectorConvertor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.ListVector;
Expand Down Expand Up @@ -138,6 +139,11 @@ public ArrowFieldWriterFactory visit(LocalZonedTimestampType localZonedTimestamp
fieldVector, localZonedTimestampType.getPrecision(), null);
}

@Override
public ArrowFieldWriterFactory visit(VariantType variantType) {
throw new UnsupportedOperationException("Doesn't support VariantType.");
}

@Override
public ArrowFieldWriterFactory visit(ArrayType arrayType) {
ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.codegen

import org.apache.paimon.data._
import org.apache.paimon.data.variant.Variant
import org.apache.paimon.memory.MemorySegment
import org.apache.paimon.types._
import org.apache.paimon.types.DataTypeChecks.{getFieldCount, getFieldTypes, getPrecision, getScale}
Expand Down Expand Up @@ -380,6 +381,7 @@ object GenerateUtils {
case ARRAY => className[InternalArray]
case MULTISET | MAP => className[InternalMap]
case ROW => className[InternalRow]
case VARIANT => className[Variant]
case _ =>
throw new IllegalArgumentException("Illegal type: " + t)
}
Expand Down Expand Up @@ -418,6 +420,8 @@ object GenerateUtils {
s"$rowTerm.getMap($indexTerm)"
case ROW =>
s"$rowTerm.getRow($indexTerm, ${getFieldCount(t)})"
case VARIANT =>
s"$rowTerm.getVariant($indexTerm)"
case _ =>
throw new IllegalArgumentException("Illegal type: " + t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -179,6 +180,13 @@ public class EqualiserCodeGeneratorTest {
GenericRow.of(31, BinaryString.fromString("32")),
GenericRow.of(31, BinaryString.fromString("33"))),
new InternalRowSerializer(DataTypes.INT(), DataTypes.VARCHAR(2))));
TEST_DATA.put(
DataTypeRoot.VARIANT,
new GeneratedData(
DataTypes.VARIANT(),
Pair.of(
GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"),
GenericVariant.fromJson("{\"age\":27,\"city\":\"Hangzhou\"}"))));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

/** An implementation of {@link InternalRow} which provides a row the fixed partition value. */
Expand Down Expand Up @@ -153,6 +154,13 @@ public byte[] getBinary(int pos) {
: row.getBinary(partitionInfo.getRealIndex(pos));
}

@Override
public Variant getVariant(int pos) {
return partitionInfo.inPartitionRow(pos)
? partition.getVariant(partitionInfo.getRealIndex(pos))
: row.getVariant(partitionInfo.getRealIndex(pos));
}

@Override
public InternalArray getArray(int pos) {
return partitionInfo.inPartitionRow(pos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;

/**
* An implementation of {@link InternalArray} which provides a casted view of the underlying {@link
Expand Down Expand Up @@ -184,6 +185,11 @@ public byte[] getBinary(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public Variant getVariant(int pos) {
return castElementGetter.getElementOrNull(array, pos);
}

@Override
public InternalArray getArray(int pos) {
return castElementGetter.getElementOrNull(array, pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -131,6 +132,11 @@ public byte[] getBinary(int pos) {
return castMapping[pos].getFieldOrNull(row);
}

@Override
public Variant getVariant(int pos) {
return castMapping[pos].getFieldOrNull(row);
}

@Override
public InternalArray getArray(int pos) {
return castMapping[pos].getFieldOrNull(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

/**
Expand Down Expand Up @@ -181,6 +182,14 @@ public InternalRow getRow(int pos, int numFields) {
return defaultValueRow.getRow(pos, numFields);
}

@Override
public Variant getVariant(int pos) {
if (!row.isNullAt(pos)) {
return row.getVariant(pos);
}
return defaultValueRow.getVariant(pos);
}

public static DefaultValueRow from(InternalRow defaultValueRow) {
return new DefaultValueRow(defaultValueRow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;

Expand Down Expand Up @@ -177,6 +178,23 @@ public void writeTimestamp(int pos, Timestamp value, int precision) {
}
}

@Override
public void writeVariant(int pos, Variant variant) {
byte[] value = variant.value();
byte[] metadata = variant.metadata();
int totalSize = 4 + value.length + metadata.length;
final int roundedSize = roundNumberOfBytesToNearestWord(totalSize);
ensureCapacity(roundedSize);
zeroOutPaddingBytes(totalSize);

segment.putInt(cursor, value.length);
segment.put(cursor + 4, value, 0, value.length);
segment.put(cursor + 4 + value.length, metadata, 0, metadata.length);

setOffsetAndSize(pos, cursor, totalSize);
cursor += roundedSize;
}

protected void zeroOutPaddingBytes(int numBytes) {
if ((numBytes & 0x07) > 0) {
segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -83,6 +84,7 @@ public static int calculateFixLengthPartSize(DataType type) {
case MULTISET:
case MAP:
case ROW:
case VARIANT:
// long and double are 8 bytes;
// otherwise it stores the length and offset of the variable-length part for types
// such as is string, map, etc.
Expand Down Expand Up @@ -234,6 +236,14 @@ public byte[] getBinary(int pos) {
return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndSize);
}

@Override
public Variant getVariant(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndLen = MemorySegmentUtils.getLong(segments, fieldOffset);
return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen);
}

@Override
public InternalArray getArray(int pos) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -335,6 +336,14 @@ public byte[] getBinary(int pos) {
return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen);
}

@Override
public Variant getVariant(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = segments[0].getLong(fieldOffset);
return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen);
}

@Override
public InternalArray getArray(int pos) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
Expand Down Expand Up @@ -67,6 +68,8 @@ public interface BinaryWriter {

void writeTimestamp(int pos, Timestamp value, int precision);

void writeVariant(int pos, Variant variant);

void writeArray(int pos, InternalArray value, InternalArraySerializer serializer);

void writeMap(int pos, InternalMap value, InternalMapSerializer serializer);
Expand Down Expand Up @@ -139,6 +142,9 @@ static void write(
case VARBINARY:
writer.writeBinary(pos, (byte[]) o);
break;
case VARIANT:
writer.writeVariant(pos, (Variant) o);
break;
default:
throw new UnsupportedOperationException("Not support type: " + type);
}
Expand Down Expand Up @@ -208,6 +214,8 @@ static ValueSetter createValueSetter(DataType elementType, Serializer<?> seriali
return (writer, pos, value) ->
writer.writeRow(
pos, (InternalRow) value, (InternalRowSerializer) rowSerializer);
case VARIANT:
return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value);
default:
String msg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;

/**
* Getters to get data.
Expand Down Expand Up @@ -74,6 +75,9 @@ public interface DataGetters {
/** Returns the binary value at the given position. */
byte[] getBinary(int pos);

/** Returns the variant value at the given position. */
Variant getVariant(int pos);

/** Returns the array value at the given position. */
InternalArray getArray(int pos);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.utils.ArrayUtils;

Expand Down Expand Up @@ -204,6 +205,11 @@ public byte[] getBinary(int pos) {
return (byte[]) getObject(pos);
}

@Override
public Variant getVariant(int pos) {
return (Variant) getObject(pos);
}

@Override
public BinaryString getString(int pos) {
return (BinaryString) getObject(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

Expand Down Expand Up @@ -186,6 +187,11 @@ public byte[] getBinary(int pos) {
return (byte[]) this.fields[pos];
}

@Override
public Variant getVariant(int pos) {
return (Variant) this.fields[pos];
}

@Override
public InternalArray getArray(int pos) {
return (InternalArray) this.fields[pos];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ static ElementGetter createElementGetter(DataType elementType) {
final int rowFieldCount = getFieldCount(elementType);
elementGetter = (array, pos) -> array.getRow(pos, rowFieldCount);
break;
case VARIANT:
elementGetter = InternalArray::getVariant;
break;
default:
String msg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
final int rowFieldCount = DataTypeChecks.getFieldCount(fieldType);
fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
break;
case VARIANT:
fieldGetter = row -> row.getVariant(fieldPos);
break;
default:
String msg =
String.format(
Expand Down
Loading
Loading