Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Jun 7, 2024
1 parent ecf7809 commit 8d9c897
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.TimestampType;
Expand Down Expand Up @@ -150,6 +151,11 @@ static void write(
* @param elementType the element type
*/
static ValueSetter createValueSetter(DataType elementType) {
return createValueSetter(elementType, null);
}

static ValueSetter createValueSetter(DataType elementType, Serializer<?> serializer) {
Serializer<?> finalSerializer = createSerializerIfNeed(elementType, serializer);
// ordered by type root definition
switch (elementType.getTypeRoot()) {
case CHAR:
Expand Down Expand Up @@ -184,81 +190,35 @@ static ValueSetter createValueSetter(DataType elementType) {
return (writer, pos, value) ->
writer.writeTimestamp(pos, (Timestamp) value, timestampPrecision);
case ARRAY:
final Serializer<InternalArray> arraySerializer =
InternalSerializers.create(elementType);
return (writer, pos, value) ->
writer.writeArray(
pos,
(InternalArray) value,
(InternalArraySerializer) arraySerializer);
(InternalArraySerializer) finalSerializer);
case MULTISET:
case MAP:
final Serializer<InternalMap> mapSerializer =
InternalSerializers.create(elementType);
return (writer, pos, value) ->
writer.writeMap(
pos, (InternalMap) value, (InternalMapSerializer) mapSerializer);
pos, (InternalMap) value, (InternalMapSerializer) finalSerializer);
case ROW:
final Serializer<InternalRow> rowSerializer =
InternalSerializers.create(elementType);
return (writer, pos, value) ->
writer.writeRow(
pos, (InternalRow) value, (InternalRowSerializer) rowSerializer);
pos, (InternalRow) value, (InternalRowSerializer) finalSerializer);
default:
throw new IllegalArgumentException();
}
}

static ValueSetter createValueSetter(DataType elementType, Serializer<?> serializer) {
// ordered by type root definition
switch (elementType.getTypeRoot()) {
case CHAR:
case VARCHAR:
return (writer, pos, value) -> writer.writeString(pos, (BinaryString) value);
case BOOLEAN:
return (writer, pos, value) -> writer.writeBoolean(pos, (boolean) value);
case BINARY:
case VARBINARY:
return (writer, pos, value) -> writer.writeBinary(pos, (byte[]) value);
case DECIMAL:
final int decimalPrecision = getPrecision(elementType);
return (writer, pos, value) ->
writer.writeDecimal(pos, (Decimal) value, decimalPrecision);
case TINYINT:
return (writer, pos, value) -> writer.writeByte(pos, (byte) value);
case SMALLINT:
return (writer, pos, value) -> writer.writeShort(pos, (short) value);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return (writer, pos, value) -> writer.writeInt(pos, (int) value);
case BIGINT:
return (writer, pos, value) -> writer.writeLong(pos, (long) value);
case FLOAT:
return (writer, pos, value) -> writer.writeFloat(pos, (float) value);
case DOUBLE:
return (writer, pos, value) -> writer.writeDouble(pos, (double) value);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(elementType);
return (writer, pos, value) ->
writer.writeTimestamp(pos, (Timestamp) value, timestampPrecision);
case ARRAY:
return (writer, pos, value) ->
writer.writeArray(
pos, (InternalArray) value, (InternalArraySerializer) serializer);
case MULTISET:
case MAP:
return (writer, pos, value) ->
writer.writeMap(
pos, (InternalMap) value, (InternalMapSerializer) serializer);
case ROW:
return (writer, pos, value) ->
writer.writeRow(
pos, (InternalRow) value, (InternalRowSerializer) serializer);
default:
throw new IllegalArgumentException();
static Serializer<?> createSerializerIfNeed(DataType elementType, Serializer<?> serializer) {
Serializer<?> finalSerializer = serializer;
DataTypeRoot typeRoot = elementType.getTypeRoot();
if (finalSerializer == null
&& (typeRoot == DataTypeRoot.MAP
|| typeRoot == DataTypeRoot.ROW
|| typeRoot == DataTypeRoot.ARRAY)) {
finalSerializer = InternalSerializers.create(elementType);
}
return finalSerializer;
}

/** Accessor for setting the elements of a binary writer during runtime. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public InternalRowSerializer(DataType[] types, Serializer<?>[] fieldSerializers)
for (int i = 0; i < types.length; i++) {
DataType type = types[i];
fieldGetters[i] = InternalRow.createFieldGetter(type, i);
valueSetters[i] = BinaryWriter.createValueSetter(type, this);
valueSetters[i] = BinaryWriter.createValueSetter(type, fieldSerializers[i]);
}
}

Expand Down

0 comments on commit 8d9c897

Please sign in to comment.