Skip to content

Commit

Permalink
squash
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 16, 2024
1 parent 9b281bc commit 8213c6b
Show file tree
Hide file tree
Showing 85 changed files with 4,203 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.Types;
Expand Down Expand Up @@ -150,6 +151,11 @@ public FieldType visit(LocalZonedTimestampType localZonedTimestampType) {
return new FieldType(localZonedTimestampType.isNullable(), arrowType, null);
}

@Override
public FieldType visit(VariantType variantType) {
throw new UnsupportedOperationException("");
}

private TimeUnit getTimeUnit(int precision) {
if (precision == 0) {
return TimeUnit.SECOND;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
Expand Down Expand Up @@ -423,6 +424,11 @@ public Timestamp getTimestamp(int i, int precision) {
};
}

@Override
public Arrow2PaimonVectorConverter visit(VariantType variantType) {
throw new UnsupportedOperationException();
}

@Override
public Arrow2PaimonVectorConverter visit(ArrayType arrayType) {
final Arrow2PaimonVectorConverter arrowVectorConvertor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.ListVector;
Expand Down Expand Up @@ -138,6 +139,11 @@ public ArrowFieldWriterFactory visit(LocalZonedTimestampType localZonedTimestamp
fieldVector, localZonedTimestampType.getPrecision(), null);
}

@Override
public ArrowFieldWriterFactory visit(VariantType variantType) {
throw new UnsupportedOperationException("Doesn't support VariantType.");
}

@Override
public ArrowFieldWriterFactory visit(ArrayType arrayType) {
ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

/** An implementation of {@link InternalRow} which provides a row the fixed partition value. */
Expand Down Expand Up @@ -153,6 +154,13 @@ public byte[] getBinary(int pos) {
: row.getBinary(partitionInfo.getRealIndex(pos));
}

@Override
public Variant getVariant(int pos) {
return partitionInfo.inPartitionRow(pos)
? partition.getVariant(partitionInfo.getRealIndex(pos))
: row.getVariant(partitionInfo.getRealIndex(pos));
}

@Override
public InternalArray getArray(int pos) {
return partitionInfo.inPartitionRow(pos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -133,6 +134,11 @@ public byte[] getBinary(int pos) {
return castMapping[pos].getFieldOrNull(row);
}

@Override
public Variant getVariant(int pos) {
return castMapping[pos].getFieldOrNull(row);
}

@Override
public InternalArray getArray(int pos) {
return castMapping[pos].getFieldOrNull(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

/**
Expand Down Expand Up @@ -181,6 +182,14 @@ public InternalRow getRow(int pos, int numFields) {
return defaultValueRow.getRow(pos, numFields);
}

@Override
public Variant getVariant(int pos) {
if (!row.isNullAt(pos)) {
return row.getVariant(pos);
}
return defaultValueRow.getVariant(pos);
}

public static DefaultValueRow from(InternalRow defaultValueRow) {
return new DefaultValueRow(defaultValueRow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;

Expand Down Expand Up @@ -177,6 +178,23 @@ public void writeTimestamp(int pos, Timestamp value, int precision) {
}
}

@Override
public void writeVariant(int pos, Variant variant) {
byte[] value = variant.getValue();
byte[] metadata = variant.getMetadata();
int totalSize = 4 + value.length + metadata.length;
final int roundedSize = roundNumberOfBytesToNearestWord(totalSize);
ensureCapacity(roundedSize);
zeroOutPaddingBytes(totalSize);

segment.putInt(cursor, value.length);
segment.put(cursor + 4, value, 0, value.length);
segment.put(cursor + 4 + value.length, metadata, 0, metadata.length);

setOffsetAndSize(pos, cursor, totalSize);
cursor += roundedSize;
}

protected void zeroOutPaddingBytes(int numBytes) {
if ((numBytes & 0x07) > 0) {
segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -83,6 +84,7 @@ public static int calculateFixLengthPartSize(DataType type) {
case MULTISET:
case MAP:
case ROW:
case VARIANT:
// long and double are 8 bytes;
// otherwise it stores the length and offset of the variable-length part for types
// such as is string, map, etc.
Expand Down Expand Up @@ -234,6 +236,14 @@ public byte[] getBinary(int pos) {
return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndSize);
}

@Override
public Variant getVariant(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndLen = MemorySegmentUtils.getLong(segments, fieldOffset);
return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen);
}

@Override
public InternalArray getArray(int pos) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -335,6 +336,14 @@ public byte[] getBinary(int pos) {
return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen);
}

@Override
public Variant getVariant(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = segments[0].getLong(fieldOffset);
return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen);
}

@Override
public InternalArray getArray(int pos) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
Expand Down Expand Up @@ -67,6 +68,8 @@ public interface BinaryWriter {

void writeTimestamp(int pos, Timestamp value, int precision);

void writeVariant(int pos, Variant variant);

void writeArray(int pos, InternalArray value, InternalArraySerializer serializer);

void writeMap(int pos, InternalMap value, InternalMapSerializer serializer);
Expand Down Expand Up @@ -139,6 +142,9 @@ static void write(
case VARBINARY:
writer.writeBinary(pos, (byte[]) o);
break;
case VARIANT:
writer.writeVariant(pos, (Variant) o);
break;
default:
throw new UnsupportedOperationException("Not support type: " + type);
}
Expand Down Expand Up @@ -208,6 +214,8 @@ static ValueSetter createValueSetter(DataType elementType, Serializer<?> seriali
return (writer, pos, value) ->
writer.writeRow(
pos, (InternalRow) value, (InternalRowSerializer) rowSerializer);
case VARIANT:
return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value);
default:
String msg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;

/**
* Getters to get data.
Expand Down Expand Up @@ -74,6 +75,8 @@ public interface DataGetters {
/** Returns the binary value at the given position. */
byte[] getBinary(int pos);

Variant getVariant(int pos);

/** Returns the array value at the given position. */
InternalArray getArray(int pos);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.utils.ArrayUtils;

Expand Down Expand Up @@ -204,6 +205,11 @@ public byte[] getBinary(int pos) {
return (byte[]) getObject(pos);
}

@Override
public Variant getVariant(int pos) {
return (Variant) getObject(pos);
}

@Override
public BinaryString getString(int pos) {
return (BinaryString) getObject(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

Expand Down Expand Up @@ -186,6 +187,11 @@ public byte[] getBinary(int pos) {
return (byte[]) this.fields[pos];
}

@Override
public Variant getVariant(int pos) {
return (Variant) this.fields[pos];
}

@Override
public InternalArray getArray(int pos) {
return (InternalArray) this.fields[pos];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ static ElementGetter createElementGetter(DataType elementType) {
final int rowFieldCount = getFieldCount(elementType);
elementGetter = (array, pos) -> array.getRow(pos, rowFieldCount);
break;
case VARIANT:
elementGetter = InternalArray::getVariant;
break;
default:
String msg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
final int rowFieldCount = DataTypeChecks.getFieldCount(fieldType);
fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
break;
case VARIANT:
fieldGetter = row -> row.getVariant(fieldPos);
break;
default:
String msg =
String.format(
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/data/JoinedRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.data;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -224,6 +225,15 @@ public byte[] getBinary(int pos) {
}
}

@Override
public Variant getVariant(int pos) {
if (pos < row1.getFieldCount()) {
return row1.getVariant(pos);
} else {
return row2.getVariant(pos - row1.getFieldCount());
}
}

@Override
public InternalArray getArray(int pos) {
if (pos < row1.getFieldCount()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.data;

import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;

import java.util.function.Supplier;
Expand Down Expand Up @@ -142,6 +143,11 @@ public byte[] getBinary(int pos) {
return (byte[]) getField(pos);
}

@Override
public Variant getVariant(int pos) {
return (Variant) getField(pos);
}

@Override
public InternalArray getArray(int pos) {
return (InternalArray) getField(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.data;

import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.RowKind;
Expand Down Expand Up @@ -280,6 +281,14 @@ public byte[] getBinary(int pos) {
return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen);
}

@Override
public Variant getVariant(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = MemorySegmentUtils.getLong(segments, fieldOffset);
return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen);
}

@Override
public InternalRow getRow(int pos, int numFields) {
assertIndexIsValid(pos);
Expand Down
Loading

0 comments on commit 8213c6b

Please sign in to comment.