diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/NestTableMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/NestTableMergeFunction.java index 9c674670901e2..db7da46268ab2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/NestTableMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/NestTableMergeFunction.java @@ -1,9 +1,11 @@ package org.apache.paimon.mergetree.compact; import org.apache.paimon.KeyValue; +import org.apache.paimon.data.BinaryArray; import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import javax.annotation.Nullable; @@ -12,21 +14,25 @@ import java.util.List; import java.util.Map; -public class NestTableMergeFunction implements MergeFunction{ - private Map nestRowContainer; - private Map nestTableRowType; +public class NestTableMergeFunction implements MergeFunction { + private Map> nestRowContainer; + private Map nestTableFieldGetter; private InternalRow.FieldGetter[] getters; private InternalRow currentKey; private GenericRow mergedRow; - public NestTableMergeFunction(Map nestTableRowType,InternalRow.FieldGetter[] getters) { + private KeyValue reused; + + private Long latestSequenceNumber; + + public NestTableMergeFunction(Map nestTableFieldGetter, InternalRow.FieldGetter[] getters) { this.nestRowContainer = new HashMap<>(); - this.mergedRow =new GenericRow(getters.length); + this.mergedRow = new GenericRow(getters.length); - this.nestTableRowType = nestTableRowType; - this.getters =getters; + this.nestTableFieldGetter = nestTableFieldGetter; + this.getters = getters; } @Override @@ -38,11 +44,31 @@ public void reset() { @Override public void add(KeyValue kv) { currentKey = kv.key(); + InternalRow value = kv.value(); + latestSequenceNumber = kv.sequenceNumber(); + for (int i = 0; i < getters.length; i++) { + if (nestTableFieldGetter.containsKey(i)) { + GenericArray nestTable = (GenericArray)getters[i].getFieldOrNull(value); + if (nestTable!=null){ +// nestRowContainer.getOrDefault(i,new GenericArray()) +// InternalRow[] nestTableRows = (InternalRow[])nestTable.toObjectArray(); + nestRowContainer.getOrDefault(i,new ArrayList<>()).add(nestTable); + } + } else { + Object field = getters[i].getFieldOrNull(value); + if (field != null) { + mergedRow.setField(i, field); + } + } + } } @Nullable @Override public KeyValue getResult() { - return null; + if (reused == null) { + reused = new KeyValue(); + } + return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, mergedRow); } }