diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java index 7068e25e6e60..d08c580be5ff 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java @@ -21,7 +21,11 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySegmentUtils; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.TimestampType; import javax.annotation.Nullable; @@ -442,4 +446,32 @@ public static BinaryRow singleColumn(@Nullable BinaryString string) { writer.complete(); return row; } + + /** + * If it is a fixed-length field, we can call this BinaryRowData's setXX method for in-place + * updates. If it is variable-length field, can't use this method, because the underlying data + * is stored continuously. + */ + public static boolean isInFixedLengthPart(DataType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case BIGINT: + case FLOAT: + case DOUBLE: + return true; + case DECIMAL: + return Decimal.isCompact(((DecimalType) type).getPrecision()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return Timestamp.isCompact(((TimestampType) type).getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return Timestamp.isCompact(((LocalZonedTimestampType) type).getPrecision()); + default: + return false; + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java index 4c4f3f978d56..a83cbaec7396 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java @@ -244,4 +244,98 @@ interface FieldGetter extends Serializable { @Nullable Object getFieldOrNull(InternalRow row); } + + /** + * Creates a {@link FieldSetter} for setting elements to a row from a row at the given position. + * + * @param fieldType the element type of the row + * @param fieldPos the element position of the row + */ + static FieldSetter createFieldSetter(DataType fieldType, int fieldPos) { + final FieldSetter fieldSetter; + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case BOOLEAN: + fieldSetter = (from, to) -> to.setBoolean(fieldPos, from.getBoolean(fieldPos)); + break; + case DECIMAL: + final int decimalPrecision = getPrecision(fieldType); + final int decimalScale = getScale(fieldType); + fieldSetter = + (from, to) -> + to.setDecimal( + fieldPos, + from.getDecimal(fieldPos, decimalPrecision, decimalScale), + decimalPrecision); + if (fieldType.isNullable() && !Decimal.isCompact(decimalPrecision)) { + return (from, to) -> { + if (from.isNullAt(fieldPos)) { + to.setNullAt(fieldPos); + to.setDecimal(fieldPos, null, decimalPrecision); + } else { + fieldSetter.setFieldFrom(from, to); + } + }; + } + break; + case TINYINT: + fieldSetter = (from, to) -> to.setByte(fieldPos, from.getByte(fieldPos)); + break; + case SMALLINT: + fieldSetter = (from, to) -> to.setShort(fieldPos, from.getShort(fieldPos)); + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + fieldSetter = (from, to) -> to.setInt(fieldPos, from.getInt(fieldPos)); + break; + case BIGINT: + fieldSetter = (from, to) -> to.setLong(fieldPos, from.getLong(fieldPos)); + break; + case FLOAT: + fieldSetter = (from, to) -> to.setFloat(fieldPos, from.getFloat(fieldPos)); + break; + case DOUBLE: + fieldSetter = (from, to) -> to.setDouble(fieldPos, from.getDouble(fieldPos)); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = getPrecision(fieldType); + fieldSetter = + (from, to) -> + to.setTimestamp( + fieldPos, + from.getTimestamp(fieldPos, timestampPrecision), + timestampPrecision); + if (fieldType.isNullable() && !Timestamp.isCompact(timestampPrecision)) { + return (from, to) -> { + if (from.isNullAt(fieldPos)) { + to.setNullAt(fieldPos); + to.setTimestamp(fieldPos, null, timestampPrecision); + } else { + fieldSetter.setFieldFrom(from, to); + } + }; + } + break; + default: + throw new IllegalArgumentException( + String.format("type %s not support for setting", fieldType)); + } + if (!fieldType.isNullable()) { + return fieldSetter; + } + return (from, to) -> { + if (from.isNullAt(fieldPos)) { + to.setNullAt(fieldPos); + } else { + fieldSetter.setFieldFrom(from, to); + } + }; + } + + /** Accessor for setting the field of a row during runtime. */ + interface FieldSetter extends Serializable { + void setFieldFrom(DataGetters from, DataSetters to); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java index e5f3c9e7d0f5..8773e734d891 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java @@ -107,6 +107,11 @@ public BinaryRow toBinaryRow(BinaryRow rowData) throws IOException { // ============================ Page related operations =================================== + @Override + public BinaryRow createReuseInstance() { + return new BinaryRow(numFields); + } + @Override public int serializeToPages(BinaryRow record, AbstractPagedOutputView headerLessView) throws IOException { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index 8a32a222c1df..ac8cc34e0c01 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -167,6 +167,11 @@ public BinaryRow toBinaryRow(InternalRow row) { return reuseRow; } + @Override + public InternalRow createReuseInstance() { + return binarySerializer.createReuseInstance(); + } + @Override public int serializeToPages(InternalRow row, AbstractPagedOutputView target) throws IOException { diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java index f916d01b5e20..ede6c9b103be 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java @@ -27,6 +27,9 @@ /** A type serializer which provides paged serialize and deserialize methods. */ public interface PagedTypeSerializer extends Serializer { + /** Creates a new instance for reusing. */ + T createReuseInstance(); + /** * Serializes the given record to the given target paged output view. Some implementations may * skip some bytes if current page does not have enough space left, .e.g {@link BinaryRow}. diff --git a/paimon-core/src/main/java/org/apache/paimon/hash/BytesHashMap.java b/paimon-core/src/main/java/org/apache/paimon/hash/BytesHashMap.java new file mode 100644 index 000000000000..d739289e6bee --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/hash/BytesHashMap.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hash; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.AbstractPagedInputView; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.RandomAccessInputView; +import org.apache.paimon.data.SimpleCollectingOutputView; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.data.serializer.PagedTypeSerializer; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.KeyValueIterator; +import org.apache.paimon.utils.MathUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Bytes based hash map. It can be used for performing aggregations where the aggregated values are + * fixed-width, because the data is stored in continuous memory, AggBuffer of variable length cannot + * be applied to this HashMap. The KeyValue form in hash map is designed to reduce the cost of key + * fetching in lookup. The memory is divided into two areas: + * + *

Bucket area: pointer + hashcode. + * + *

+ * + *

Record area: the actual data in linked list records, a record has four parts: + * + *

+ * + *

{@code BytesHashMap} are influenced by Apache Spark BytesToBytesMap. + */ +public class BytesHashMap extends BytesMap { + + private static final Logger LOG = LoggerFactory.getLogger(BytesHashMap.class); + + /** + * Set true when valueTypeInfos.length == 0. Usually in this case the BytesHashMap will be used + * as a HashSet. The value from {@link BytesHashMap#append(LookupInfo info, BinaryRow value)} + * will be ignored when hashSetMode set. The reusedValue will always point to a 16 bytes long + * MemorySegment acted as each BytesHashMap entry's value part when appended to make the + * BytesHashMap's spilling work compatible. + */ + private final boolean hashSetMode; + + /** Used to serialize map key into RecordArea's MemorySegments. */ + protected final PagedTypeSerializer keySerializer; + + /** Used to serialize hash map value into RecordArea's MemorySegments. */ + private final BinaryRowSerializer valueSerializer; + + private volatile RecordArea.EntryIterator destructiveIterator = null; + + public BytesHashMap( + MemorySegmentPool memoryPool, PagedTypeSerializer keySerializer, int valueArity) { + super(memoryPool, keySerializer); + + this.recordArea = new RecordArea(); + + this.keySerializer = keySerializer; + this.valueSerializer = new BinaryRowSerializer(valueArity); + if (valueArity == 0) { + this.hashSetMode = true; + this.reusedValue = new BinaryRow(0); + this.reusedValue.pointTo(MemorySegment.wrap(new byte[8]), 0, 8); + LOG.info("BytesHashMap with hashSetMode = true."); + } else { + this.hashSetMode = false; + this.reusedValue = this.valueSerializer.createInstance(); + } + + final int initBucketSegmentNum = + MathUtils.roundDownToPowerOf2((int) (INIT_BUCKET_MEMORY_IN_BYTES / segmentSize)); + + // allocate and initialize MemorySegments for bucket area + initBucketSegments(initBucketSegmentNum); + + LOG.info( + "BytesHashMap with initial memory segments {}, {} in bytes, init allocating {} for bucket area.", + reservedNumBuffers, + reservedNumBuffers * segmentSize, + initBucketSegmentNum); + } + + // ----------------------- Abstract Interface ----------------------- + + @Override + public long getNumKeys() { + return numElements; + } + + // ----------------------- Public interface ----------------------- + + /** + * Append an value into the hash map's record area. + * + * @return An BinaryRow mapping to the memory segments in the map's record area belonging to the + * newly appended value. + * @throws EOFException if the map can't allocate much more memory. + */ + public BinaryRow append(LookupInfo lookupInfo, BinaryRow value) + throws IOException { + try { + if (numElements >= growthThreshold) { + growAndRehash(); + // update info's bucketSegmentIndex and bucketOffset + lookup(lookupInfo.key); + } + BinaryRow toAppend = hashSetMode ? reusedValue : value; + int pointerToAppended = recordArea.appendRecord(lookupInfo, toAppend); + bucketSegments + .get(lookupInfo.bucketSegmentIndex) + .putInt(lookupInfo.bucketOffset, pointerToAppended); + bucketSegments + .get(lookupInfo.bucketSegmentIndex) + .putInt(lookupInfo.bucketOffset + ELEMENT_POINT_LENGTH, lookupInfo.keyHashCode); + numElements++; + recordArea.setReadPosition(pointerToAppended); + ((RecordArea) recordArea).skipKey(); + return recordArea.readValue(reusedValue); + } catch (EOFException e) { + numSpillFiles++; + spillInBytes += recordArea.getSegmentsSize(); + throw e; + } + } + + public long getNumSpillFiles() { + return numSpillFiles; + } + + public long getUsedMemoryInBytes() { + return bucketSegments.size() * ((long) segmentSize) + recordArea.getSegmentsSize(); + } + + public long getSpillInBytes() { + return spillInBytes; + } + + public int getNumElements() { + return numElements; + } + + /** Returns an iterator for iterating over the entries of this map. */ + @SuppressWarnings("WeakerAccess") + public KeyValueIterator getEntryIterator(boolean requiresCopy) { + if (destructiveIterator != null) { + throw new IllegalArgumentException( + "DestructiveIterator is not null, so this method can't be invoke!"); + } + return ((RecordArea) recordArea).entryIterator(requiresCopy); + } + + /** @return the underlying memory segments of the hash map's record area */ + @SuppressWarnings("WeakerAccess") + public ArrayList getRecordAreaMemorySegments() { + return ((RecordArea) recordArea).segments; + } + + @SuppressWarnings("WeakerAccess") + public List getBucketAreaMemorySegments() { + return bucketSegments; + } + + public void free() { + recordArea.release(); + destructiveIterator = null; + super.free(); + } + + /** reset the map's record and bucket area's memory segments for reusing. */ + public void reset() { + // reset the record segments. + recordArea.reset(); + destructiveIterator = null; + super.reset(); + } + + /** + * @return true when BytesHashMap's valueTypeInfos.length == 0. Any appended value will be + * ignored and replaced with a reusedValue as a present tag. + */ + @VisibleForTesting + boolean isHashSetMode() { + return hashSetMode; + } + + // ----------------------- Private methods ----------------------- + + static int getVariableLength(DataType[] types) { + int length = 0; + for (DataType type : types) { + if (!BinaryRow.isInFixedLengthPart(type)) { + // find a better way of computing generic type field variable-length + // right now we use a small value assumption + length += 16; + } + } + return length; + } + + // ----------------------- Record Area ----------------------- + + private final class RecordArea implements BytesMap.RecordArea { + private final ArrayList segments = new ArrayList<>(); + + private final RandomAccessInputView inView; + private final SimpleCollectingOutputView outView; + + RecordArea() { + this.outView = new SimpleCollectingOutputView(segments, memoryPool, segmentSize); + this.inView = new RandomAccessInputView(segments, segmentSize); + } + + public void release() { + returnSegments(segments); + segments.clear(); + } + + public void reset() { + release(); + // request a new memory segment from freeMemorySegments + // reset segmentNum and positionInSegment + outView.reset(); + inView.setReadPosition(0); + } + + // ----------------------- Append ----------------------- + public int appendRecord(LookupInfo lookupInfo, BinaryRow value) + throws IOException { + final long oldLastPosition = outView.getCurrentOffset(); + // serialize the key into the BytesHashMap record area + int skip = keySerializer.serializeToPages(lookupInfo.getKey(), outView); + long offset = oldLastPosition + skip; + + // serialize the value into the BytesHashMap record area + valueSerializer.serializeToPages(value, outView); + if (offset > Integer.MAX_VALUE) { + LOG.warn( + "We can't handle key area with more than Integer.MAX_VALUE bytes," + + " because the pointer is a integer."); + throw new EOFException(); + } + return (int) offset; + } + + @Override + public long getSegmentsSize() { + return segments.size() * ((long) segmentSize); + } + + // ----------------------- Read ----------------------- + public void setReadPosition(int position) { + inView.setReadPosition(position); + } + + public boolean readKeyAndEquals(K lookupKey) throws IOException { + reusedKey = keySerializer.mapFromPages(reusedKey, inView); + return lookupKey.equals(reusedKey); + } + + /** @throws IOException when invalid memory address visited. */ + void skipKey() throws IOException { + keySerializer.skipRecordFromPages(inView); + } + + public BinaryRow readValue(BinaryRow reuse) throws IOException { + // depends on BinaryRowSerializer to check writing skip + // and to find the real start offset of the data + return valueSerializer.mapFromPages(reuse, inView); + } + + // ----------------------- Iterator ----------------------- + + private KeyValueIterator entryIterator(boolean requiresCopy) { + return new EntryIterator(requiresCopy); + } + + private final class EntryIterator extends AbstractPagedInputView + implements KeyValueIterator { + + private int count = 0; + private int currentSegmentIndex = 0; + private final boolean requiresCopy; + + private EntryIterator(boolean requiresCopy) { + super(segments.get(0), segmentSize); + destructiveIterator = this; + this.requiresCopy = requiresCopy; + } + + @Override + public boolean advanceNext() throws IOException { + if (count < numElements) { + count++; + // segment already is useless any more. + keySerializer.mapFromPages(reusedKey, this); + valueSerializer.mapFromPages(reusedValue, this); + return true; + } + return false; + } + + @Override + public K getKey() { + return requiresCopy ? keySerializer.copy(reusedKey) : reusedKey; + } + + @Override + public BinaryRow getValue() { + return requiresCopy ? reusedValue.copy() : reusedValue; + } + + public boolean hasNext() { + return count < numElements; + } + + @Override + protected int getLimitForSegment(MemorySegment segment) { + return segmentSize; + } + + @Override + protected MemorySegment nextSegment(MemorySegment current) { + return segments.get(++currentSegmentIndex); + } + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/hash/BytesMap.java b/paimon-core/src/main/java/org/apache/paimon/hash/BytesMap.java new file mode 100644 index 000000000000..215cfface98e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/hash/BytesMap.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hash; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.serializer.PagedTypeSerializer; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.utils.MathUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Base class for {@code BytesHashMap}. + * + * @param type of the map key. + * @param type of the map value. + */ +public abstract class BytesMap { + + private static final Logger LOG = LoggerFactory.getLogger(BytesMap.class); + + public static final int BUCKET_SIZE = 8; + protected static final int END_OF_LIST = Integer.MAX_VALUE; + protected static final int STEP_INCREMENT = 1; + protected static final int ELEMENT_POINT_LENGTH = 4; + public static final int RECORD_EXTRA_LENGTH = 8; + protected static final int BUCKET_SIZE_BITS = 3; + + protected final int numBucketsPerSegment; + protected final int numBucketsPerSegmentBits; + protected final int numBucketsPerSegmentMask; + protected final int lastBucketPosition; + + protected final int segmentSize; + protected final MemorySegmentPool memoryPool; + protected List bucketSegments; + + protected final int reservedNumBuffers; + + protected int numElements = 0; + protected int numBucketsMask; + // get the second hashcode based log2NumBuckets and numBucketsMask2 + protected int log2NumBuckets; + protected int numBucketsMask2; + + protected static final double LOAD_FACTOR = 0.75; + // a smaller bucket can make the best of l1/l2/l3 cache. + protected static final long INIT_BUCKET_MEMORY_IN_BYTES = 1024 * 1024L; + + /** The map will be expanded once the number of elements exceeds this threshold. */ + protected int growthThreshold; + + /** The segments where the actual data is stored. */ + protected RecordArea recordArea; + + /** Used as a reused object when lookup and iteration. */ + protected K reusedKey; + + /** Used as a reused object when retrieve the map's value by key and iteration. */ + protected V reusedValue; + + /** Used as a reused object which lookup returned. */ + private final LookupInfo reuseLookupInfo; + + // metric + protected long numSpillFiles; + protected long spillInBytes; + + public BytesMap(MemorySegmentPool memoryPool, PagedTypeSerializer keySerializer) { + this.memoryPool = memoryPool; + this.segmentSize = memoryPool.pageSize(); + this.reservedNumBuffers = memoryPool.freePages(); + this.numBucketsPerSegment = segmentSize / BUCKET_SIZE; + this.numBucketsPerSegmentBits = MathUtils.log2strict(this.numBucketsPerSegment); + this.numBucketsPerSegmentMask = (1 << this.numBucketsPerSegmentBits) - 1; + this.lastBucketPosition = (numBucketsPerSegment - 1) * BUCKET_SIZE; + + this.reusedKey = keySerializer.createReuseInstance(); + this.reuseLookupInfo = new LookupInfo<>(); + } + + /** Returns the number of keys in this map. */ + public abstract long getNumKeys(); + + protected void initBucketSegments(int numBucketSegments) { + if (numBucketSegments < 1) { + throw new RuntimeException("Too small memory allocated for BytesHashMap"); + } + this.bucketSegments = new ArrayList<>(numBucketSegments); + for (int i = 0; i < numBucketSegments; i++) { + MemorySegment segment = memoryPool.nextSegment(); + if (segment == null) { + throw new RuntimeException("Memory for hash map is too small."); + } + bucketSegments.add(i, segment); + } + + resetBucketSegments(this.bucketSegments); + int numBuckets = numBucketSegments * numBucketsPerSegment; + this.log2NumBuckets = MathUtils.log2strict(numBuckets); + this.numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1; + this.numBucketsMask2 = (1 << MathUtils.log2strict(numBuckets >> 1)) - 1; + this.growthThreshold = (int) (numBuckets * LOAD_FACTOR); + } + + protected void resetBucketSegments(List resetBucketSegs) { + for (MemorySegment segment : resetBucketSegs) { + for (int j = 0; j <= lastBucketPosition; j += BUCKET_SIZE) { + segment.putInt(j, END_OF_LIST); + } + } + } + + public long getNumSpillFiles() { + return numSpillFiles; + } + + public long getSpillInBytes() { + return spillInBytes; + } + + public int getNumElements() { + return numElements; + } + + public void free() { + returnSegments(this.bucketSegments); + this.bucketSegments.clear(); + numElements = 0; + } + + /** reset the map's record and bucket area's memory segments for reusing. */ + public void reset() { + setBucketVariables(bucketSegments); + resetBucketSegments(bucketSegments); + numElements = 0; + LOG.debug( + "reset BytesHashMap with record memory segments {}, {} in bytes, init allocating {} for bucket area.", + memoryPool.freePages(), + memoryPool.freePages() * segmentSize, + bucketSegments.size()); + } + + /** + * @param key by which looking up the value in the hash map. Only support the key in the + * BinaryRowData form who has only one MemorySegment. + * @return {@link LookupInfo} + */ + public LookupInfo lookup(K key) { + final int hashCode1 = key.hashCode(); + int newPos = hashCode1 & numBucketsMask; + // which segment contains the bucket + int bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits; + // offset of the bucket in the segment + int bucketOffset = (newPos & numBucketsPerSegmentMask) << BUCKET_SIZE_BITS; + + boolean found = false; + int step = STEP_INCREMENT; + int hashCode2 = 0; + int findElementPtr; + try { + do { + findElementPtr = bucketSegments.get(bucketSegmentIndex).getInt(bucketOffset); + if (findElementPtr == END_OF_LIST) { + // This is a new key. + break; + } else { + final int storedHashCode = + bucketSegments + .get(bucketSegmentIndex) + .getInt(bucketOffset + ELEMENT_POINT_LENGTH); + if (hashCode1 == storedHashCode) { + recordArea.setReadPosition(findElementPtr); + if (recordArea.readKeyAndEquals(key)) { + // we found an element with a matching key, and not just a hash + // collision + found = true; + reusedValue = recordArea.readValue(reusedValue); + break; + } + } + } + if (step == 1) { + hashCode2 = calcSecondHashCode(hashCode1); + } + newPos = (hashCode1 + step * hashCode2) & numBucketsMask; + // which segment contains the bucket + bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits; + // offset of the bucket in the segment + bucketOffset = (newPos & numBucketsPerSegmentMask) << BUCKET_SIZE_BITS; + step += STEP_INCREMENT; + } while (true); + } catch (IOException ex) { + throw new RuntimeException( + "Error reading record from the aggregate map: " + ex.getMessage(), ex); + } + reuseLookupInfo.set(found, hashCode1, key, reusedValue, bucketSegmentIndex, bucketOffset); + return reuseLookupInfo; + } + + /** @throws EOFException if the map can't allocate much more memory. */ + protected void growAndRehash() throws EOFException { + // allocate the new data structures + int required = 2 * bucketSegments.size(); + if (required * (long) numBucketsPerSegment > Integer.MAX_VALUE) { + LOG.warn( + "We can't handle more than Integer.MAX_VALUE buckets (eg. because hash functions return int)"); + throw new EOFException(); + } + + int numAllocatedSegments = required - memoryPool.freePages(); + if (numAllocatedSegments > 0) { + LOG.warn( + "BytesHashMap can't allocate {} pages, and now used {} pages", + required, + reservedNumBuffers); + throw new EOFException(); + } + + List newBucketSegments = new ArrayList<>(required); + for (int i = 0; i < required; i++) { + newBucketSegments.add(memoryPool.nextSegment()); + } + setBucketVariables(newBucketSegments); + + long reHashStartTime = System.currentTimeMillis(); + resetBucketSegments(newBucketSegments); + // Re-mask (we don't recompute the hashcode because we stored all 32 bits of it) + for (MemorySegment memorySegment : bucketSegments) { + for (int j = 0; j < numBucketsPerSegment; j++) { + final int recordPointer = memorySegment.getInt(j * BUCKET_SIZE); + if (recordPointer != END_OF_LIST) { + final int hashCode1 = + memorySegment.getInt(j * BUCKET_SIZE + ELEMENT_POINT_LENGTH); + int newPos = hashCode1 & numBucketsMask; + int bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits; + int bucketOffset = (newPos & numBucketsPerSegmentMask) << BUCKET_SIZE_BITS; + int step = STEP_INCREMENT; + long hashCode2 = 0; + while (newBucketSegments.get(bucketSegmentIndex).getInt(bucketOffset) + != END_OF_LIST) { + if (step == 1) { + hashCode2 = calcSecondHashCode(hashCode1); + } + newPos = (int) ((hashCode1 + step * hashCode2) & numBucketsMask); + // which segment contains the bucket + bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits; + // offset of the bucket in the segment + bucketOffset = (newPos & numBucketsPerSegmentMask) << BUCKET_SIZE_BITS; + step += STEP_INCREMENT; + } + newBucketSegments.get(bucketSegmentIndex).putInt(bucketOffset, recordPointer); + newBucketSegments + .get(bucketSegmentIndex) + .putInt(bucketOffset + ELEMENT_POINT_LENGTH, hashCode1); + } + } + } + LOG.info( + "The rehash take {} ms for {} segments", + (System.currentTimeMillis() - reHashStartTime), + required); + this.memoryPool.returnAll(this.bucketSegments); + this.bucketSegments = newBucketSegments; + } + + protected void returnSegments(List segments) { + memoryPool.returnAll(segments); + } + + private void setBucketVariables(List bucketSegments) { + int numBuckets = bucketSegments.size() * numBucketsPerSegment; + this.log2NumBuckets = MathUtils.log2strict(numBuckets); + this.numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1; + this.numBucketsMask2 = (1 << MathUtils.log2strict(numBuckets >> 1)) - 1; + this.growthThreshold = (int) (numBuckets * LOAD_FACTOR); + } + + // M(the num of buckets) is the nth power of 2, so the second hash code must be odd, and always + // is + // H2(K) = 1 + 2 * ((H1(K)/M) mod (M-1)) + protected int calcSecondHashCode(final int firstHashCode) { + return ((((firstHashCode >> log2NumBuckets)) & numBucketsMask2) << 1) + 1; + } + + /** Record area. */ + interface RecordArea { + + void setReadPosition(int position); + + boolean readKeyAndEquals(K lookupKey) throws IOException; + + V readValue(V reuse) throws IOException; + + int appendRecord(LookupInfo lookupInfo, BinaryRow value) throws IOException; + + long getSegmentsSize(); + + void release(); + + void reset(); + } + + /** Result fetched when looking up a key. */ + public static final class LookupInfo { + boolean found; + K key; + V value; + + /** + * The hashcode of the look up key passed to {@link BytesMap#lookup(K)}, Caching this + * hashcode here allows us to avoid re-hashing the key when inserting a value for that key. + * The same purpose with bucketSegmentIndex, bucketOffset. + */ + int keyHashCode; + + int bucketSegmentIndex; + int bucketOffset; + + LookupInfo() { + this.found = false; + this.keyHashCode = -1; + this.key = null; + this.value = null; + this.bucketSegmentIndex = -1; + this.bucketOffset = -1; + } + + void set( + boolean found, + int keyHashCode, + K key, + V value, + int bucketSegmentIndex, + int bucketOffset) { + this.found = found; + this.keyHashCode = keyHashCode; + this.key = key; + this.value = value; + this.bucketSegmentIndex = bucketSegmentIndex; + this.bucketOffset = bucketOffset; + } + + public boolean isFound() { + return found; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/HashMapLocalMerger.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/HashMapLocalMerger.java new file mode 100644 index 000000000000..1a395fb36c56 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/HashMapLocalMerger.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.localmerge; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalRow.FieldSetter; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.hash.BytesHashMap; +import org.apache.paimon.hash.BytesMap.LookupInfo; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.mergetree.compact.MergeFunction; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FieldsComparator; +import org.apache.paimon.utils.KeyValueIterator; + +import javax.annotation.Nullable; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.paimon.data.InternalRow.createFieldSetter; + +/** A {@link LocalMerger} which stores records in {@link BytesHashMap}. */ +public class HashMapLocalMerger implements LocalMerger { + + private final InternalRowSerializer valueSerializer; + private final MergeFunction mergeFunction; + @Nullable private final FieldsComparator udsComparator; + private final BytesHashMap buffer; + private final List nonKeySetters; + + public HashMapLocalMerger( + RowType rowType, + List primaryKeys, + MemorySegmentPool memoryPool, + MergeFunction mergeFunction, + @Nullable FieldsComparator userDefinedSeqComparator) { + this.valueSerializer = new InternalRowSerializer(rowType); + this.mergeFunction = mergeFunction; + this.udsComparator = userDefinedSeqComparator; + this.buffer = + new BytesHashMap<>( + memoryPool, + new BinaryRowSerializer(primaryKeys.size()), + rowType.getFieldCount()); + + this.nonKeySetters = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + DataField field = rowType.getFields().get(i); + if (primaryKeys.contains(field.name())) { + continue; + } + nonKeySetters.add(createFieldSetter(field.type(), i)); + } + } + + @Override + public boolean put(RowKind rowKind, BinaryRow key, InternalRow value) throws IOException { + // we store row kind in value + value.setRowKind(rowKind); + + LookupInfo lookup = buffer.lookup(key); + if (!lookup.isFound()) { + try { + buffer.append(lookup, valueSerializer.toBinaryRow(value)); + return true; + } catch (EOFException eof) { + return false; + } + } + + mergeFunction.reset(); + BinaryRow stored = lookup.getValue(); + KeyValue previousKv = new KeyValue().replace(key, stored.getRowKind(), stored); + KeyValue newKv = new KeyValue().replace(key, value.getRowKind(), value); + if (udsComparator != null && udsComparator.compare(stored, value) > 0) { + mergeFunction.add(newKv); + mergeFunction.add(previousKv); + } else { + mergeFunction.add(previousKv); + mergeFunction.add(newKv); + } + + KeyValue result = mergeFunction.getResult(); + stored.setRowKind(result.valueKind()); + for (FieldSetter setter : nonKeySetters) { + setter.setFieldFrom(result.value(), stored); + } + return true; + } + + @Override + public int size() { + return buffer.getNumElements(); + } + + @Override + public void forEach(Consumer consumer) throws IOException { + KeyValueIterator iterator = buffer.getEntryIterator(false); + while (iterator.advanceNext()) { + consumer.accept(iterator.getValue()); + } + } + + @Override + public void clear() { + buffer.reset(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/LocalMerger.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/LocalMerger.java new file mode 100644 index 000000000000..bec71808a75e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/LocalMerger.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.localmerge; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.RowKind; + +import java.io.IOException; +import java.util.function.Consumer; + +/** Local merger to merge in memory. */ +public interface LocalMerger { + + boolean put(RowKind rowKind, BinaryRow key, InternalRow value) throws IOException; + + int size(); + + void forEach(Consumer consumer) throws IOException; + + void clear(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/SortBufferLocalMerger.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/SortBufferLocalMerger.java new file mode 100644 index 000000000000..198e6c67d304 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/SortBufferLocalMerger.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.localmerge; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.mergetree.SortBufferWriteBuffer; +import org.apache.paimon.mergetree.compact.MergeFunction; +import org.apache.paimon.types.RowKind; + +import java.io.IOException; +import java.util.function.Consumer; + +/** A {@link LocalMerger} which stores records in {@link SortBufferWriteBuffer}. */ +public class SortBufferLocalMerger implements LocalMerger { + + private final SortBufferWriteBuffer sortBuffer; + private final RecordComparator keyComparator; + private final MergeFunction mergeFunction; + + private long recordCount; + + public SortBufferLocalMerger( + SortBufferWriteBuffer sortBuffer, + RecordComparator keyComparator, + MergeFunction mergeFunction) { + this.sortBuffer = sortBuffer; + this.keyComparator = keyComparator; + this.mergeFunction = mergeFunction; + this.recordCount = 0; + } + + @Override + public boolean put(RowKind rowKind, BinaryRow key, InternalRow value) throws IOException { + return sortBuffer.put(recordCount++, rowKind, key, value); + } + + @Override + public int size() { + return sortBuffer.size(); + } + + @Override + public void forEach(Consumer consumer) throws IOException { + sortBuffer.forEach( + keyComparator, + mergeFunction, + null, + kv -> { + InternalRow row = kv.value(); + row.setRowKind(kv.valueKind()); + consumer.accept(row); + }); + } + + @Override + public void clear() { + sortBuffer.clear(); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTest.java b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTest.java new file mode 100644 index 000000000000..4020e3d489d1 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hash; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.types.DataType; + +/** Test case for {@link BytesHashMap}. */ +public class BytesHashMapTest extends BytesHashMapTestBase { + + public BytesHashMapTest() { + super(new BinaryRowSerializer(KEY_TYPES.length)); + } + + @Override + public BytesHashMap createBytesHashMap( + MemorySegmentPool pool, DataType[] keyTypes, DataType[] valueTypes) { + return new BytesHashMap<>( + pool, new BinaryRowSerializer(keyTypes.length), valueTypes.length); + } + + @Override + public BinaryRow[] generateRandomKeys(int num) { + return getRandomizedInputs(num); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTestBase.java b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTestBase.java new file mode 100644 index 000000000000..01bbd46e63aa --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTestBase.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hash; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.RandomAccessInputView; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.data.serializer.PagedTypeSerializer; +import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.KeyValueIterator; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Base test class for both {@link BytesHashMap}. */ +abstract class BytesHashMapTestBase extends BytesMapTestBase { + + private static final int NUM_REWRITES = 10; + + static final DataType[] KEY_TYPES = + new DataType[] { + new IntType(), + VarCharType.STRING_TYPE, + new DoubleType(), + new BigIntType(), + new BooleanType(), + new FloatType(), + new SmallIntType() + }; + + static final DataType[] VALUE_TYPES = + new DataType[] { + new DoubleType(), + new BigIntType(), + new BooleanType(), + new FloatType(), + new SmallIntType() + }; + + protected final BinaryRow defaultValue; + protected final PagedTypeSerializer keySerializer; + protected final BinaryRowSerializer valueSerializer; + + public BytesHashMapTestBase(PagedTypeSerializer keySerializer) { + this.keySerializer = keySerializer; + this.valueSerializer = new BinaryRowSerializer(VALUE_TYPES.length); + this.defaultValue = valueSerializer.createInstance(); + int valueSize = defaultValue.getFixedLengthPartSize(); + this.defaultValue.pointTo(MemorySegment.wrap(new byte[valueSize]), 0, valueSize); + } + + /** Creates the specific BytesHashMap, either {@link BytesHashMap}. */ + public abstract BytesHashMap createBytesHashMap( + MemorySegmentPool memorySegmentPool, DataType[] keyTypes, DataType[] valueTypes); + + /** + * Generates {@code num} random keys, the types of key fields are defined in {@link #KEY_TYPES}. + */ + public abstract K[] generateRandomKeys(int num); + + // ------------------------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------------------------ + + @Test + void testHashSetMode() throws IOException { + final int numMemSegments = + needNumMemSegments( + NUM_ENTRIES, + rowLength(RowType.of(VALUE_TYPES)), + rowLength(RowType.of(KEY_TYPES)), + PAGE_SIZE); + int memorySize = numMemSegments * PAGE_SIZE; + MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, PAGE_SIZE); + + BytesHashMap table = createBytesHashMap(pool, KEY_TYPES, new DataType[] {}); + assertThat(table.isHashSetMode()).isTrue(); + + K[] keys = generateRandomKeys(NUM_ENTRIES); + verifyKeyInsert(keys, table); + verifyKeyPresent(keys, table); + table.free(); + } + + @Test + void testBuildAndRetrieve() throws Exception { + + final int numMemSegments = + needNumMemSegments( + NUM_ENTRIES, + rowLength(RowType.of(VALUE_TYPES)), + rowLength(RowType.of(KEY_TYPES)), + PAGE_SIZE); + int memorySize = numMemSegments * PAGE_SIZE; + MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, PAGE_SIZE); + + BytesHashMap table = createBytesHashMap(pool, KEY_TYPES, VALUE_TYPES); + + K[] keys = generateRandomKeys(NUM_ENTRIES); + List expected = new ArrayList<>(NUM_ENTRIES); + verifyInsert(keys, expected, table); + verifyRetrieve(table, keys, expected); + table.free(); + } + + @Test + void testBuildAndUpdate() throws Exception { + final int numMemSegments = + needNumMemSegments( + NUM_ENTRIES, + rowLength(RowType.of(VALUE_TYPES)), + rowLength(RowType.of(KEY_TYPES)), + PAGE_SIZE); + int memorySize = numMemSegments * PAGE_SIZE; + MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, PAGE_SIZE); + + BytesHashMap table = createBytesHashMap(pool, KEY_TYPES, VALUE_TYPES); + + K[] keys = generateRandomKeys(NUM_ENTRIES); + List expected = new ArrayList<>(NUM_ENTRIES); + verifyInsertAndUpdate(keys, expected, table); + verifyRetrieve(table, keys, expected); + table.free(); + } + + @Test + void testRest() throws Exception { + final int numMemSegments = + needNumMemSegments( + NUM_ENTRIES, + rowLength(RowType.of(VALUE_TYPES)), + rowLength(RowType.of(KEY_TYPES)), + PAGE_SIZE); + + int memorySize = numMemSegments * PAGE_SIZE; + + MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, PAGE_SIZE); + + BytesHashMap table = createBytesHashMap(pool, KEY_TYPES, VALUE_TYPES); + + final K[] keys = generateRandomKeys(NUM_ENTRIES); + List expected = new ArrayList<>(NUM_ENTRIES); + verifyInsertAndUpdate(keys, expected, table); + verifyRetrieve(table, keys, expected); + + table.reset(); + assertThat(table.getNumElements()).isEqualTo(0); + assertThat(table.getRecordAreaMemorySegments()).hasSize(1); + + expected.clear(); + verifyInsertAndUpdate(keys, expected, table); + verifyRetrieve(table, keys, expected); + table.free(); + } + + @Test + void testResetAndOutput() throws Exception { + final Random rnd = new Random(RANDOM_SEED); + final int reservedMemSegments = 64; + + int minMemorySize = reservedMemSegments * PAGE_SIZE; + MemorySegmentPool pool = new HeapMemorySegmentPool(minMemorySize, PAGE_SIZE); + + BytesHashMap table = createBytesHashMap(pool, KEY_TYPES, VALUE_TYPES); + + K[] keys = generateRandomKeys(NUM_ENTRIES); + List expected = new ArrayList<>(NUM_ENTRIES); + List actualValues = new ArrayList<>(NUM_ENTRIES); + List actualKeys = new ArrayList<>(NUM_ENTRIES); + for (int i = 0; i < NUM_ENTRIES; i++) { + K groupKey = keys[i]; + // look up and insert + BytesMap.LookupInfo lookupInfo = table.lookup(groupKey); + assertThat(lookupInfo.isFound()).isFalse(); + try { + BinaryRow entry = table.append(lookupInfo, defaultValue); + assertThat(entry).isNotNull(); + // mock multiple updates + for (int j = 0; j < NUM_REWRITES; j++) { + updateOutputBuffer(entry, rnd); + } + expected.add(entry.copy()); + } catch (Exception e) { + ArrayList segments = table.getRecordAreaMemorySegments(); + RandomAccessInputView inView = + new RandomAccessInputView(segments, segments.get(0).size()); + K reuseKey = keySerializer.createReuseInstance(); + BinaryRow reuseValue = valueSerializer.createInstance(); + for (int index = 0; index < table.getNumElements(); index++) { + reuseKey = keySerializer.mapFromPages(reuseKey, inView); + reuseValue = valueSerializer.mapFromPages(reuseValue, inView); + actualKeys.add(keySerializer.copy(reuseKey)); + actualValues.add(reuseValue.copy()); + } + table.reset(); + // retry + lookupInfo = table.lookup(groupKey); + BinaryRow entry = table.append(lookupInfo, defaultValue); + assertThat(entry).isNotNull(); + // mock multiple updates + for (int j = 0; j < NUM_REWRITES; j++) { + updateOutputBuffer(entry, rnd); + } + expected.add(entry.copy()); + } + } + KeyValueIterator iter = table.getEntryIterator(false); + while (iter.advanceNext()) { + actualKeys.add(keySerializer.copy(iter.getKey())); + actualValues.add(iter.getValue().copy()); + } + assertThat(expected).hasSize(NUM_ENTRIES); + assertThat(actualKeys).hasSize(NUM_ENTRIES); + assertThat(actualValues).hasSize(NUM_ENTRIES); + assertThat(actualValues).isEqualTo(expected); + table.free(); + } + + @Test + void testSingleKeyMultipleOps() throws Exception { + final int numMemSegments = + needNumMemSegments( + NUM_ENTRIES, + rowLength(RowType.of(VALUE_TYPES)), + rowLength(RowType.of(KEY_TYPES)), + PAGE_SIZE); + + int memorySize = numMemSegments * PAGE_SIZE; + + MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, PAGE_SIZE); + + BytesHashMap table = createBytesHashMap(pool, KEY_TYPES, VALUE_TYPES); + final K key = generateRandomKeys(1)[0]; + for (int i = 0; i < 3; i++) { + BytesMap.LookupInfo lookupInfo = table.lookup(key); + assertThat(lookupInfo.isFound()).isFalse(); + } + + for (int i = 0; i < 3; i++) { + BytesMap.LookupInfo lookupInfo = table.lookup(key); + BinaryRow entry = lookupInfo.getValue(); + if (i == 0) { + assertThat(lookupInfo.isFound()).isFalse(); + entry = table.append(lookupInfo, defaultValue); + } else { + assertThat(lookupInfo.isFound()).isTrue(); + } + assertThat(entry).isNotNull(); + } + table.free(); + } + + // ---------------------------------------------- + /** It will be codegened when in HashAggExec using rnd to mock update/initExprs resultTerm. */ + private void updateOutputBuffer(BinaryRow reuse, Random rnd) { + long longVal = rnd.nextLong(); + double doubleVal = rnd.nextDouble(); + boolean boolVal = longVal % 2 == 0; + reuse.setDouble(2, doubleVal); + reuse.setLong(3, longVal); + reuse.setBoolean(4, boolVal); + } + + // ----------------------- Utilities ----------------------- + + private void verifyRetrieve(BytesHashMap table, K[] keys, List expected) { + assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES); + for (int i = 0; i < NUM_ENTRIES; i++) { + K groupKey = keys[i]; + // look up and retrieve + BytesMap.LookupInfo lookupInfo = table.lookup(groupKey); + assertThat(lookupInfo.isFound()).isTrue(); + assertThat(lookupInfo.getValue()).isNotNull(); + assertThat(lookupInfo.getValue()).isEqualTo(expected.get(i)); + } + } + + private void verifyInsert(K[] keys, List inserted, BytesHashMap table) + throws IOException { + for (int i = 0; i < NUM_ENTRIES; i++) { + K groupKey = keys[i]; + // look up and insert + BytesMap.LookupInfo lookupInfo = table.lookup(groupKey); + assertThat(lookupInfo.isFound()).isFalse(); + BinaryRow entry = table.append(lookupInfo, defaultValue); + assertThat(entry).isNotNull(); + assertThat(defaultValue).isEqualTo(entry); + inserted.add(entry.copy()); + } + assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES); + } + + private void verifyInsertAndUpdate(K[] keys, List inserted, BytesHashMap table) + throws IOException { + final Random rnd = new Random(RANDOM_SEED); + for (int i = 0; i < NUM_ENTRIES; i++) { + K groupKey = keys[i]; + // look up and insert + BytesMap.LookupInfo lookupInfo = table.lookup(groupKey); + assertThat(lookupInfo.isFound()).isFalse(); + BinaryRow entry = table.append(lookupInfo, defaultValue); + assertThat(entry).isNotNull(); + // mock multiple updates + for (int j = 0; j < NUM_REWRITES; j++) { + updateOutputBuffer(entry, rnd); + } + inserted.add(entry.copy()); + } + assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES); + } + + private void verifyKeyPresent(K[] keys, BytesHashMap table) { + assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES); + BinaryRow present = new BinaryRow(0); + present.pointTo(MemorySegment.wrap(new byte[8]), 0, 8); + for (int i = 0; i < NUM_ENTRIES; i++) { + K groupKey = keys[i]; + // look up and retrieve + BytesMap.LookupInfo lookupInfo = table.lookup(groupKey); + assertThat(lookupInfo.isFound()).isTrue(); + assertThat(lookupInfo.getValue()).isNotNull(); + assertThat(lookupInfo.getValue()).isEqualTo(present); + } + } + + private void verifyKeyInsert(K[] keys, BytesHashMap table) throws IOException { + BinaryRow present = new BinaryRow(0); + present.pointTo(MemorySegment.wrap(new byte[8]), 0, 8); + for (int i = 0; i < NUM_ENTRIES; i++) { + K groupKey = keys[i]; + // look up and insert + BytesMap.LookupInfo lookupInfo = table.lookup(groupKey); + assertThat(lookupInfo.isFound()).isFalse(); + BinaryRow entry = table.append(lookupInfo, defaultValue); + assertThat(entry).isNotNull(); + assertThat(present).isEqualTo(entry); + } + assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/hash/BytesMapTestBase.java b/paimon-core/src/test/java/org/apache/paimon/hash/BytesMapTestBase.java new file mode 100644 index 000000000000..acde6d9a1689 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/hash/BytesMapTestBase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hash; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; + +import java.util.Random; + +/** Test case for {@link BytesMap}. */ +public class BytesMapTestBase { + + protected static final long RANDOM_SEED = 76518743207143L; + protected static final int PAGE_SIZE = 32 * 1024; + protected static final int NUM_ENTRIES = 10000; + + protected BinaryRow[] getRandomizedInputs(int num) { + final Random rnd = new Random(RANDOM_SEED); + return getRandomizedInputs(num, rnd, true); + } + + protected BinaryRow[] getRandomizedInputs(int num, Random rnd, boolean nullable) { + BinaryRow[] lists = new BinaryRow[num]; + for (int i = 0; i < num; i++) { + int intVal = rnd.nextInt(Integer.MAX_VALUE); + long longVal = -rnd.nextLong(); + boolean boolVal = longVal % 2 == 0; + String strVal = nullable && boolVal ? null : getString(intVal, intVal % 1024) + i; + Double doubleVal = rnd.nextDouble(); + Short shotVal = (short) intVal; + Float floatVal = nullable && boolVal ? null : rnd.nextFloat(); + lists[i] = createRow(intVal, strVal, doubleVal, longVal, boolVal, floatVal, shotVal); + } + return lists; + } + + protected BinaryRow createRow( + Integer f0, String f1, Double f2, Long f3, Boolean f4, Float f5, Short f6) { + + BinaryRow row = new BinaryRow(7); + BinaryRowWriter writer = new BinaryRowWriter(row); + + // int, string, double, long, boolean + if (f0 == null) { + writer.setNullAt(0); + } else { + writer.writeInt(0, f0); + } + if (f1 == null) { + writer.setNullAt(1); + } else { + writer.writeString(1, BinaryString.fromString(f1)); + } + if (f2 == null) { + writer.setNullAt(2); + } else { + writer.writeDouble(2, f2); + } + if (f3 == null) { + writer.setNullAt(3); + } else { + writer.writeLong(3, f3); + } + if (f4 == null) { + writer.setNullAt(4); + } else { + writer.writeBoolean(4, f4); + } + if (f5 == null) { + writer.setNullAt(5); + } else { + writer.writeFloat(5, f5); + } + if (f6 == null) { + writer.setNullAt(6); + } else { + writer.writeShort(6, f6); + } + writer.complete(); + return row; + } + + protected int needNumMemSegments(int numEntries, int valLen, int keyLen, int pageSize) { + return 2 * (valLen + keyLen + 1024 * 3 + 4 + 8 + 8) * numEntries / pageSize; + } + + protected int rowLength(RowType tpe) { + return BinaryRow.calculateFixPartSizeInBytes(tpe.getFieldCount()) + + BytesHashMap.getVariableLength(tpe.getFieldTypes().toArray(new DataType[0])); + } + + private String getString(int count, int length) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < length; i++) { + builder.append(count); + } + return builder.toString(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index aba891e44100..6931fe907218 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -20,13 +20,17 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.codegen.CodeGenUtils; import org.apache.paimon.codegen.Projection; -import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.mergetree.SortBufferWriteBuffer; import org.apache.paimon.mergetree.compact.MergeFunction; +import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger; +import org.apache.paimon.mergetree.localmerge.LocalMerger; +import org.apache.paimon.mergetree.localmerge.SortBufferLocalMerger; import org.apache.paimon.options.MemorySize; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; @@ -43,6 +47,7 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -63,13 +68,10 @@ public class LocalMergeOperator extends AbstractStreamOperator private final boolean ignoreDelete; private transient Projection keyProjection; - private transient RecordComparator keyComparator; - private transient long recordCount; private transient RowKindGenerator rowKindGenerator; - private transient MergeFunction mergeFunction; - private transient SortBufferWriteBuffer buffer; + private transient LocalMerger merger; private transient long currentWatermark; private transient boolean endOfInput; @@ -87,17 +89,14 @@ public LocalMergeOperator(TableSchema schema) { public void open() throws Exception { super.open(); - RowType keyType = PrimaryKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType()); + List primaryKeys = schema.primaryKeys(); RowType valueType = schema.logicalRowType(); CoreOptions options = new CoreOptions(schema.options()); - keyProjection = - CodeGenUtils.newProjection(valueType, schema.projection(schema.primaryKeys())); - keyComparator = new KeyComparatorSupplier(keyType).get(); + keyProjection = CodeGenUtils.newProjection(valueType, schema.projection(primaryKeys)); - recordCount = 0; rowKindGenerator = RowKindGenerator.create(schema, options); - mergeFunction = + MergeFunction mergeFunction = PrimaryKeyTableUtils.createMergeFunctionFactory( schema, new KeyValueFieldsExtractor() { @@ -117,26 +116,51 @@ public List valueFields(TableSchema schema) { }) .create(); - buffer = - new SortBufferWriteBuffer( - keyType, - valueType, - UserDefinedSeqComparator.create(valueType, options), - new HeapMemorySegmentPool( - options.localMergeBufferSize(), options.pageSize()), - false, - MemorySize.MAX_VALUE, - options.localSortMaxNumFileHandles(), - options.spillCompressOptions(), - null); - currentWatermark = Long.MIN_VALUE; + boolean canHashMerger = true; + for (DataField field : valueType.getFields()) { + if (primaryKeys.contains(field.name())) { + continue; + } + + if (!BinaryRow.isInFixedLengthPart(field.type())) { + canHashMerger = false; + break; + } + } + + HeapMemorySegmentPool pool = + new HeapMemorySegmentPool(options.localMergeBufferSize(), options.pageSize()); + UserDefinedSeqComparator udsComparator = + UserDefinedSeqComparator.create(valueType, options); + if (canHashMerger) { + merger = + new HashMapLocalMerger( + valueType, primaryKeys, pool, mergeFunction, udsComparator); + } else { + RowType keyType = + PrimaryKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType()); + SortBufferWriteBuffer sortBuffer = + new SortBufferWriteBuffer( + keyType, + valueType, + udsComparator, + pool, + false, + MemorySize.MAX_VALUE, + options.localSortMaxNumFileHandles(), + options.spillCompressOptions(), + null); + merger = + new SortBufferLocalMerger( + sortBuffer, new KeyComparatorSupplier(keyType).get(), mergeFunction); + } + currentWatermark = Long.MIN_VALUE; endOfInput = false; } @Override public void processElement(StreamRecord record) throws Exception { - recordCount++; InternalRow row = record.getValue(); RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); @@ -147,10 +171,10 @@ public void processElement(StreamRecord record) throws Exception { // row kind must be INSERT when it is divided into key and value row.setRowKind(RowKind.INSERT); - InternalRow key = keyProjection.apply(row); - if (!buffer.put(recordCount, rowKind, key, row)) { + BinaryRow key = keyProjection.apply(row); + if (!merger.put(rowKind, key, row)) { flushBuffer(); - if (!buffer.put(recordCount, rowKind, key, row)) { + if (!merger.put(rowKind, key, row)) { // change row kind back row.setRowKind(rowKind); output.collect(record); @@ -180,28 +204,20 @@ public void endInput() throws Exception { @Override public void close() throws Exception { - if (buffer != null) { - buffer.clear(); + if (merger != null) { + merger.clear(); } super.close(); } private void flushBuffer() throws Exception { - if (buffer.size() == 0) { + if (merger.size() == 0) { return; } - buffer.forEach( - keyComparator, - mergeFunction, - null, - kv -> { - InternalRow row = kv.value(); - row.setRowKind(kv.valueKind()); - output.collect(new StreamRecord<>(row)); - }); - buffer.clear(); + merger.forEach(row -> output.collect(new StreamRecord<>(row))); + merger.clear(); if (currentWatermark != Long.MIN_VALUE) { super.processWatermark(new Watermark(currentWatermark)); @@ -209,4 +225,14 @@ private void flushBuffer() throws Exception { currentWatermark = Long.MIN_VALUE; } } + + @VisibleForTesting + LocalMerger merger() { + return merger; + } + + @VisibleForTesting + void setOutput(Output> output) { + this.output = output; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java new file mode 100644 index 000000000000..59967b727080 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.OutputTag; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.function.Consumer; + +import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD; +import static org.apache.paimon.data.BinaryString.fromString; +import static org.apache.paimon.types.RowKind.DELETE; +import static org.assertj.core.api.Assertions.assertThat; + +class LocalMergeOperatorTest { + + private LocalMergeOperator operator; + + @Test + public void testHashNormal() throws Exception { + prepareHashOperator(); + List result = new ArrayList<>(); + operator.setOutput( + new TestOutput( + row -> + result.add( + row.getRowKind().shortString() + + ":" + + row.getString(0) + + "->" + + row.getInt(1)))); + + // first test + processElement("a", 1); + processElement("b", 1); + processElement("a", 2); + processElement(DELETE, "b", 2); + operator.prepareSnapshotPreBarrier(0); + assertThat(result).containsExactlyInAnyOrder("+I:a->2", "-D:b->2"); + result.clear(); + + // second test + processElement("c", 1); + processElement("d", 1); + operator.prepareSnapshotPreBarrier(0); + assertThat(result).containsExactlyInAnyOrder("+I:c->1", "+I:d->1"); + result.clear(); + + // large records + Map expected = new HashMap<>(); + Random rnd = new Random(); + int records = 10_000; + for (int i = 0; i < records; i++) { + String key = rnd.nextInt(records) + ""; + expected.put(key, "+I:" + key + "->" + i); + processElement(key, i); + } + + operator.prepareSnapshotPreBarrier(0); + assertThat(result).containsExactlyInAnyOrderElementsOf(expected.values()); + result.clear(); + } + + @Test + public void testUserDefineSequence() throws Exception { + Map options = new HashMap<>(); + options.put(SEQUENCE_FIELD.key(), "f1"); + prepareHashOperator(options); + + List result = new ArrayList<>(); + operator.setOutput( + new TestOutput( + row -> + result.add( + row.getRowKind().shortString() + + ":" + + row.getString(0) + + "->" + + row.getInt(1)))); + + processElement("a", 2); + processElement("b", 1); + processElement("a", 1); + operator.prepareSnapshotPreBarrier(0); + assertThat(result).containsExactlyInAnyOrder("+I:a->2", "+I:b->1"); + result.clear(); + } + + private void prepareHashOperator() throws Exception { + prepareHashOperator(new HashMap<>()); + } + + private void prepareHashOperator(Map options) throws Exception { + options.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "10 m"); + RowType rowType = + RowType.of( + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT()); + TableSchema schema = + new TableSchema( + 0L, + rowType.getFields(), + rowType.getFieldCount(), + Collections.emptyList(), + Collections.singletonList("f0"), + options, + null); + operator = new LocalMergeOperator(schema); + operator.open(); + assertThat(operator.merger()).isInstanceOf(HashMapLocalMerger.class); + } + + private void processElement(String key, int value) throws Exception { + processElement(RowKind.INSERT, key, value); + } + + private void processElement(RowKind rowKind, String key, int value) throws Exception { + operator.processElement( + new StreamRecord<>( + GenericRow.ofKind(rowKind, fromString(key), value, value, value, value))); + } + + private static class TestOutput implements Output> { + + private final Consumer consumer; + + private TestOutput(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public void emitWatermark(Watermark mark) {} + + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {} + + @Override + public void collect(OutputTag outputTag, StreamRecord record) {} + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) {} + + @Override + public void emitRecordAttributes(RecordAttributes recordAttributes) {} + + @Override + public void collect(StreamRecord record) { + consumer.accept(record.getValue()); + } + + @Override + public void close() {} + } +}