Skip to content

Commit

Permalink
[core] MergeFunction.getResult return not null (#3136)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Apr 1, 2024
1 parent 3b80e14 commit 2ced694
Show file tree
Hide file tree
Showing 8 changed files with 3 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public void add(KeyValue kv) {
}

@Override
@Nullable
public KeyValue getResult() {
return latestKv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public void add(KeyValue kv) {
}
}

@Nullable
@Override
public KeyValue getResult() {
return first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ public void add(KeyValue kv) {
public ChangelogResult getResult() {
reusedResult.reset();
KeyValue result = mergeFunction.getResult();
if (result == null) {
return reusedResult;
}

if (contains.test(result.key())) {
// empty
return reusedResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ public ChangelogResult getResult() {
if (isInitialized) {
KeyValue merged = mergeFunction.getResult();
if (topLevelKv == null) {
if (merged != null && merged.isAdd()) {
if (merged.isAdd()) {
reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT, merged));
}
} else {
if (merged == null || !merged.isAdd()) {
if (!merged.isAdd()) {
reusedResult.addChangelog(replace(reusedBefore, RowKind.DELETE, topLevelKv));
} else if (!changelogRowDeduplicate
|| !valueEqualiser.equals(topLevelKv.value(), merged.value())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.paimon.KeyValue;

import javax.annotation.Nullable;

/**
* Merge function to merge multiple {@link KeyValue}s.
*
Expand All @@ -44,7 +42,6 @@ public interface MergeFunction<T> {
/** Add the given {@link KeyValue} to the merge function. */
void add(KeyValue kv);

/** Get current merged value. Return null if this merged result should be skipped. */
@Nullable
/** Get current merged value. */
T getResult();
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {

private InternalRow currentKey;
private long latestSequenceNumber;
private boolean isEmpty;
private GenericRow row;
private KeyValue reused;

Expand All @@ -93,7 +92,6 @@ public void reset() {
this.currentKey = null;
this.row = new GenericRow(getters.length);
fieldAggregators.values().forEach(FieldAggregator::reset);
this.isEmpty = true;
}

@Override
Expand All @@ -119,7 +117,6 @@ public void add(KeyValue kv) {
}

latestSequenceNumber = kv.sequenceNumber();
isEmpty = false;
if (fieldSequences.isEmpty()) {
updateNonNullFields(kv);
} else {
Expand Down Expand Up @@ -203,12 +200,7 @@ private void retractWithSequenceGroup(KeyValue kv) {
}

@Override
@Nullable
public KeyValue getResult() {
if (isEmpty) {
return null;
}

if (reused == null) {
reused = new KeyValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public void add(KeyValue kv) {
}
}

@Nullable
@Override
public KeyValue getResult() {
checkNotNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,7 @@ public void add(KeyValue kv) {
}

@Override
@Nullable
public KeyValue getResult() {
if (total == 0) {
return null;
}

if (reused == null) {
reused = new KeyValue();
}
Expand Down

0 comments on commit 2ced694

Please sign in to comment.