Skip to content

Commit

Permalink
nesttable
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Dec 20, 2023
1 parent 51fe015 commit 624dd06
Showing 1 changed file with 34 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.apache.paimon.mergetree.compact;

import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;
Expand All @@ -12,21 +14,25 @@
import java.util.List;
import java.util.Map;

public class NestTableMergeFunction implements MergeFunction<KeyValue>{
private Map<Integer,GenericArray> nestRowContainer;
private Map<Integer,RowType> nestTableRowType;
public class NestTableMergeFunction implements MergeFunction<KeyValue> {
private Map<Integer, List<GenericArray>> nestRowContainer;
private Map<Integer, InternalRow.FieldGetter[]> nestTableFieldGetter;

private InternalRow.FieldGetter[] getters;
private InternalRow currentKey;

private GenericRow mergedRow;

public NestTableMergeFunction(Map<Integer, RowType> nestTableRowType,InternalRow.FieldGetter[] getters) {
private KeyValue reused;

private Long latestSequenceNumber;

public NestTableMergeFunction(Map<Integer, InternalRow.FieldGetter[]> nestTableFieldGetter, InternalRow.FieldGetter[] getters) {
this.nestRowContainer = new HashMap<>();
this.mergedRow =new GenericRow(getters.length);
this.mergedRow = new GenericRow(getters.length);

this.nestTableRowType = nestTableRowType;
this.getters =getters;
this.nestTableFieldGetter = nestTableFieldGetter;
this.getters = getters;
}

@Override
Expand All @@ -38,11 +44,31 @@ public void reset() {
@Override
public void add(KeyValue kv) {
currentKey = kv.key();
InternalRow value = kv.value();
latestSequenceNumber = kv.sequenceNumber();
for (int i = 0; i < getters.length; i++) {
if (nestTableFieldGetter.containsKey(i)) {
GenericArray nestTable = (GenericArray)getters[i].getFieldOrNull(value);
if (nestTable!=null){
// nestRowContainer.getOrDefault(i,new GenericArray())
// InternalRow[] nestTableRows = (InternalRow[])nestTable.toObjectArray();
nestRowContainer.getOrDefault(i,new ArrayList<>()).add(nestTable);
}
} else {
Object field = getters[i].getFieldOrNull(value);
if (field != null) {
mergedRow.setField(i, field);
}
}
}
}

@Nullable
@Override
public KeyValue getResult() {
return null;
if (reused == null) {
reused = new KeyValue();
}
return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, mergedRow);
}
}

0 comments on commit 624dd06

Please sign in to comment.