Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf committed Jun 11, 2024
1 parent daa1dbd commit a1494b1
Showing 1 changed file with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;

Expand All @@ -36,6 +37,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -58,7 +60,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {

private final InternalRow.FieldGetter[] getters;
private final boolean ignoreDelete;
private final Map<Integer, UserDefinedSeqComparator> fieldSeqComparators;
private final Map<Integer, FieldsComparator> fieldSeqComparators;
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;

Expand All @@ -70,7 +72,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
protected PartialUpdateMergeFunction(
InternalRow.FieldGetter[] getters,
boolean ignoreDelete,
Map<Integer, UserDefinedSeqComparator> fieldSeqComparators,
Map<Integer, FieldsComparator> fieldSeqComparators,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled) {
this.getters = getters;
Expand Down Expand Up @@ -135,7 +137,7 @@ private void updateNonNullFields(KeyValue kv) {
private void updateWithSequenceGroup(KeyValue kv) {
for (int i = 0; i < getters.length; i++) {
Object field = getters[i].getFieldOrNull(kv.value());
UserDefinedSeqComparator seqComparator = fieldSeqComparators.get(i);
FieldsComparator seqComparator = fieldSeqComparators.get(i);
FieldAggregator aggregator = fieldAggregators.get(i);
Object accumulator = getters[i].getFieldOrNull(row);
if (seqComparator == null) {
Expand Down Expand Up @@ -170,7 +172,7 @@ private void updateWithSequenceGroup(KeyValue kv) {
}
}

private boolean isEmptySequenceGroup(KeyValue kv, UserDefinedSeqComparator comparator) {
private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator comparator) {
for (int fieldIndex : comparator.compareFields()) {
if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) {
return false;
Expand All @@ -181,8 +183,10 @@ private boolean isEmptySequenceGroup(KeyValue kv, UserDefinedSeqComparator compa
}

private void retractWithSequenceGroup(KeyValue kv) {
Set<Integer> updatedSequenceFields = new HashSet<>();

for (int i = 0; i < getters.length; i++) {
UserDefinedSeqComparator seqComparator = fieldSeqComparators.get(i);
FieldsComparator seqComparator = fieldSeqComparators.get(i);
if (seqComparator != null) {
FieldAggregator aggregator = fieldAggregators.get(i);
if (isEmptySequenceGroup(kv, seqComparator)) {
Expand All @@ -197,7 +201,10 @@ private void retractWithSequenceGroup(KeyValue kv) {
if (Arrays.stream(seqComparator.compareFields())
.anyMatch(field -> field == index)) {
for (int field : seqComparator.compareFields()) {
row.setField(field, getters[field].getFieldOrNull(kv.value()));
if (!updatedSequenceFields.contains(field)) {
row.setField(field, getters[field].getFieldOrNull(kv.value()));
updatedSequenceFields.add(field);
}
}
} else {
// retract normal field
Expand Down Expand Up @@ -245,7 +252,7 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {

private final List<DataType> tableTypes;

private final Map<Integer, UserDefinedSeqComparator> fieldSeqComparators;
private final Map<Integer, FieldsComparator> fieldSeqComparators;

private final Map<Integer, FieldAggregator> fieldAggregators;

Expand Down Expand Up @@ -308,7 +315,7 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
if (projection != null) {
Map<Integer, UserDefinedSeqComparator> projectedSeqComparators = new HashMap<>();
Map<Integer, FieldsComparator> projectedSeqComparators = new HashMap<>();
Map<Integer, FieldAggregator> projectedAggregators = new HashMap<>();
int[] projects = Projection.of(projection).toTopLevelIndexes();
Map<Integer, Integer> indexMap = new HashMap<>();
Expand Down Expand Up @@ -385,7 +392,7 @@ public AdjustedProjection adjustProjection(@Nullable int[][] projection) {
int[] topProjects = Projection.of(projection).toTopLevelIndexes();
Set<Integer> indexSet = Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
for (int index : topProjects) {
UserDefinedSeqComparator comparator = fieldSeqComparators.get(index);
FieldsComparator comparator = fieldSeqComparators.get(index);
if (comparator == null) {
continue;
}
Expand Down

0 comments on commit a1494b1

Please sign in to comment.