Skip to content

Commit

Permalink
[core][flink] Introduce HashMapLocalMerger to 'local-merge-buffer-size'
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Nov 11, 2024
1 parent b8a4def commit 9605534
Show file tree
Hide file tree
Showing 15 changed files with 1,925 additions and 41 deletions.
32 changes: 32 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.apache.paimon.annotation.Public;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.TimestampType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -442,4 +446,32 @@ public static BinaryRow singleColumn(@Nullable BinaryString string) {
writer.complete();
return row;
}

/**
* If it is a fixed-length field, we can call this BinaryRowData's setXX method for in-place
* updates. If it is variable-length field, can't use this method, because the underlying data
* is stored continuously.
*/
public static boolean isInFixedLengthPart(DataType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case BIGINT:
case FLOAT:
case DOUBLE:
return true;
case DECIMAL:
return Decimal.isCompact(((DecimalType) type).getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return Timestamp.isCompact(((TimestampType) type).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return Timestamp.isCompact(((LocalZonedTimestampType) type).getPrecision());
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,98 @@ interface FieldGetter extends Serializable {
@Nullable
Object getFieldOrNull(InternalRow row);
}

/**
* Creates a {@link FieldSetter} for setting elements to a row from a row at the given position.
*
* @param fieldType the element type of the row
* @param fieldPos the element position of the row
*/
static FieldSetter createFieldSetter(DataType fieldType, int fieldPos) {
final FieldSetter fieldSetter;
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
fieldSetter = (from, to) -> to.setBoolean(fieldPos, from.getBoolean(fieldPos));
break;
case DECIMAL:
final int decimalPrecision = getPrecision(fieldType);
final int decimalScale = getScale(fieldType);
fieldSetter =
(from, to) ->
to.setDecimal(
fieldPos,
from.getDecimal(fieldPos, decimalPrecision, decimalScale),
decimalPrecision);
if (fieldType.isNullable() && !Decimal.isCompact(decimalPrecision)) {
return (from, to) -> {
if (from.isNullAt(fieldPos)) {
to.setNullAt(fieldPos);
to.setDecimal(fieldPos, null, decimalPrecision);
} else {
fieldSetter.setFieldFrom(from, to);
}
};
}
break;
case TINYINT:
fieldSetter = (from, to) -> to.setByte(fieldPos, from.getByte(fieldPos));
break;
case SMALLINT:
fieldSetter = (from, to) -> to.setShort(fieldPos, from.getShort(fieldPos));
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
fieldSetter = (from, to) -> to.setInt(fieldPos, from.getInt(fieldPos));
break;
case BIGINT:
fieldSetter = (from, to) -> to.setLong(fieldPos, from.getLong(fieldPos));
break;
case FLOAT:
fieldSetter = (from, to) -> to.setFloat(fieldPos, from.getFloat(fieldPos));
break;
case DOUBLE:
fieldSetter = (from, to) -> to.setDouble(fieldPos, from.getDouble(fieldPos));
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(fieldType);
fieldSetter =
(from, to) ->
to.setTimestamp(
fieldPos,
from.getTimestamp(fieldPos, timestampPrecision),
timestampPrecision);
if (fieldType.isNullable() && !Timestamp.isCompact(timestampPrecision)) {
return (from, to) -> {
if (from.isNullAt(fieldPos)) {
to.setNullAt(fieldPos);
to.setTimestamp(fieldPos, null, timestampPrecision);
} else {
fieldSetter.setFieldFrom(from, to);
}
};
}
break;
default:
throw new IllegalArgumentException(
String.format("type %s not support for setting", fieldType));
}
if (!fieldType.isNullable()) {
return fieldSetter;
}
return (from, to) -> {
if (from.isNullAt(fieldPos)) {
to.setNullAt(fieldPos);
} else {
fieldSetter.setFieldFrom(from, to);
}
};
}

/** Accessor for setting the field of a row during runtime. */
interface FieldSetter extends Serializable {
void setFieldFrom(DataGetters from, DataSetters to);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public BinaryRow toBinaryRow(BinaryRow rowData) throws IOException {

// ============================ Page related operations ===================================

@Override
public BinaryRow createReuseInstance() {
return new BinaryRow(numFields);
}

@Override
public int serializeToPages(BinaryRow record, AbstractPagedOutputView headerLessView)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public BinaryRow toBinaryRow(InternalRow row) {
return reuseRow;
}

@Override
public InternalRow createReuseInstance() {
return binarySerializer.createReuseInstance();
}

@Override
public int serializeToPages(InternalRow row, AbstractPagedOutputView target)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
/** A type serializer which provides paged serialize and deserialize methods. */
public interface PagedTypeSerializer<T> extends Serializer<T> {

/** Creates a new instance for reusing. */
T createReuseInstance();

/**
* Serializes the given record to the given target paged output view. Some implementations may
* skip some bytes if current page does not have enough space left, .e.g {@link BinaryRow}.
Expand Down
Loading

0 comments on commit 9605534

Please sign in to comment.