Skip to content

Commit

Permalink
[core] Add support of sequence field in first row
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Apr 30, 2024
1 parent 8f4f818 commit 3a65c7a
Show file tree
Hide file tree
Showing 23 changed files with 554 additions and 84 deletions.
8 changes: 4 additions & 4 deletions docs/content/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,13 @@ is an empty map instead of `+I[1->A]`

## First Row

By specifying `'merge-engine' = 'first-row'`, users can keep the first row of the same primary key. It differs from the
`deduplicate` merge engine that in the `first-row` merge engine, it will generate insert only changelog.
By specifying `'merge-engine' = 'first-row'`, users can keep the first row of the same primary key. You can use the first row merge engine with or without setting `sequence.field`.
When the sequence field is set, it retains the first row in a user-defined order and may generate an UPDATE_BEFORE message to adjust the outcome.
If the sequence field is not set, it keeps the first row in natural order, resulting in only an INSERT message being created.

{{< hint info >}}
1. `first-row` merge engine must be used together with `lookup` [changelog producer]({{< ref "primary-key-table/changelog-producer" >}}).
2. You can not specify `sequence.field`.
3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records.
2. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records.
{{< /hint >}}

This is of great help in replacing log deduplication in streaming computation.
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,10 @@ public MergeEngine mergeEngine() {
return options.get(MERGE_ENGINE);
}

public boolean noDupKeysOverLevel0() {
return mergeEngine() == MergeEngine.FIRST_ROW && sequenceField().isEmpty();
}

public boolean ignoreDelete() {
return options.get(IGNORE_DELETE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void pushdown(Predicate keyFilter) {
options.scanManifestParallelism(),
branchName,
options.deletionVectorsEnabled(),
options.mergeEngine());
options.noDupKeysOverLevel0());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree;

import org.apache.paimon.KeyValue;
import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

/**
* This class is the same as FirstRowMergeFunction, but it is specifically for merging unordered
* input.
*/
public class UnOrderedFirstRowMergeFunction extends FirstRowMergeFunction {
protected UnOrderedFirstRowMergeFunction(RowType keyType, RowType valueType) {
super(keyType, valueType);
}

public static MergeFunctionFactory<KeyValue> factory(RowType keyType, RowType valueType) {
return new UnOrderedFirstRowMergeFunction.Factory(keyType, valueType);
}

private static class Factory implements MergeFunctionFactory<KeyValue> {

private static final long serialVersionUID = 1L;
private final RowType keyType;
private final RowType valueType;

public Factory(RowType keyType, RowType valueType) {
this.keyType = keyType;
this.valueType = valueType;
}

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
return new UnOrderedFirstRowMergeFunction(keyType, valueType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,16 @@ public void add(KeyValue kv) {
public ChangelogResult getResult() {
// 1. Compute the latest high level record and containLevel0 of candidates
LinkedList<KeyValue> candidates = mergeFunction.candidates();
Iterator<KeyValue> descending = candidates.descendingIterator();
Iterator<KeyValue> iterator =
mergeFunction.getLevelOrder() == LookupMergeFunction.LevelOrder.DESCENDING
? candidates.descendingIterator()
: candidates.iterator();
KeyValue highLevel = null;
boolean containLevel0 = false;
while (descending.hasNext()) {
KeyValue kv = descending.next();
while (iterator.hasNext()) {
KeyValue kv = iterator.next();
if (kv.level() > 0) {
descending.remove();
iterator.remove();
if (highLevel == null) {
highLevel = kv;
}
Expand All @@ -141,14 +144,13 @@ public ChangelogResult getResult() {
}
}
}

// 3. Calculate result
KeyValue result = calculateResult(candidates, highLevel);

// 4. Set changelog when there's level-0 records
reusedResult.reset();
if (containLevel0 && lookupStrategy.produceChangelog) {
setChangelog(highLevel, result);
setChangelog(highLevel, result, reusedResult);
}

return reusedResult.setResult(result);
Expand All @@ -171,7 +173,8 @@ private KeyValue calculateResult(List<KeyValue> candidates, @Nullable KeyValue h
return mergeFunction2.getResult();
}

private void setChangelog(@Nullable KeyValue before, KeyValue after) {
protected void setChangelog(
@Nullable KeyValue before, KeyValue after, ChangelogResult reusedResult) {
if (before == null || !before.isAdd()) {
if (after.isAdd()) {
reusedResult.addChangelog(replaceAfter(RowKind.INSERT, after));
Expand All @@ -196,7 +199,7 @@ private KeyValue replaceAfter(RowKind valueKind, KeyValue from) {
return replace(reusedAfter, valueKind, from);
}

private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue from) {
protected KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue from) {
return reused.replace(from.key(), from.sequenceNumber(), valueKind, from.value());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.KeyValue;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.mergetree.UnOrderedFirstRowMergeFunction;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Projection;

Expand All @@ -39,12 +40,27 @@ public class LookupMergeFunction implements MergeFunction<KeyValue> {
private final LinkedList<KeyValue> candidates = new LinkedList<>();
private final InternalRowSerializer keySerializer;
private final InternalRowSerializer valueSerializer;
private final LevelOrder levelOrder;

public LookupMergeFunction(
MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType valueType) {
MergeFunction<KeyValue> mergeFunction,
RowType keyType,
RowType valueType,
LevelOrder levelOrder) {
this.mergeFunction = mergeFunction;
this.keySerializer = new InternalRowSerializer(keyType);
this.valueSerializer = new InternalRowSerializer(valueType);
this.levelOrder = levelOrder;
}

/**
* The relationship between the high-level order and the sequence order.
* <li>For the first row, as the level increases, so does the sequence number.
* <li>For all other merge engine, as the level increases, the sequence number decreases.
*/
enum LevelOrder {
DESCENDING,
ASCENDING
}

@Override
Expand All @@ -60,13 +76,16 @@ public void add(KeyValue kv) {
@Override
public KeyValue getResult() {
// 1. Find the latest high level record
Iterator<KeyValue> descending = candidates.descendingIterator();
Iterator<KeyValue> iterator =
levelOrder == LevelOrder.DESCENDING
? candidates.descendingIterator()
: candidates().iterator();
KeyValue highLevel = null;
while (descending.hasNext()) {
KeyValue kv = descending.next();
while (iterator.hasNext()) {
KeyValue kv = iterator.next();
if (kv.level() > 0) {
if (highLevel != null) {
descending.remove();
iterator.remove();
} else {
highLevel = kv;
}
Expand All @@ -83,14 +102,23 @@ LinkedList<KeyValue> candidates() {
return candidates;
}

LevelOrder getLevelOrder() {
return levelOrder;
}

public static MergeFunctionFactory<KeyValue> wrap(
MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType valueType) {
if (wrapped.create() instanceof FirstRowMergeFunction) {
if (wrapped.create().getClass() == FirstRowMergeFunction.class) {
// don't wrap first row, it is already OK
return wrapped;
}

return new Factory(wrapped, keyType, valueType);
return new Factory(
wrapped,
keyType,
valueType,
wrapped.create() instanceof UnOrderedFirstRowMergeFunction
? LevelOrder.ASCENDING
: LevelOrder.DESCENDING);
}

private static class Factory implements MergeFunctionFactory<KeyValue> {
Expand All @@ -100,18 +128,25 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {
private final MergeFunctionFactory<KeyValue> wrapped;
private final RowType keyType;
private final RowType rowType;
private final LevelOrder levelOrder;

private Factory(MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType rowType) {
private Factory(
MergeFunctionFactory<KeyValue> wrapped,
RowType keyType,
RowType rowType,
LevelOrder levelOrder) {
this.wrapped = wrapped;
this.keyType = keyType;
this.rowType = rowType;
this.levelOrder = levelOrder;
}

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
RowType valueType =
projection == null ? rowType : Projection.of(projection).project(rowType);
return new LookupMergeFunction(wrapped.create(projection), keyType, valueType);
return new LookupMergeFunction(
wrapped.create(projection), keyType, valueType, levelOrder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,47 @@ public MergeFunctionWrapper<ChangelogResult> create(
});
}
}

/** A {@link MergeFunctionWrapperFactory} for first row. */
public static class UnOrderedFirstRowMergeFunctionWrapperFactory<T>
implements MergeFunctionWrapperFactory<T> {

private final RecordEqualiser valueEqualiser;
private final boolean changelogRowDeduplicate;
private final LookupStrategy lookupStrategy;
@Nullable private final UserDefinedSeqComparator userDefinedSeqComparator;

public UnOrderedFirstRowMergeFunctionWrapperFactory(
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate,
LookupStrategy lookupStrategy,
@Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
this.lookupStrategy = lookupStrategy;
this.userDefinedSeqComparator = userDefinedSeqComparator;
}

@Override
public MergeFunctionWrapper<ChangelogResult> create(
MergeFunctionFactory<KeyValue> mfFactory,
int outputLevel,
LookupLevels<T> lookupLevels,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
return new UnOrderedFirstRowMergeFunctionWrapper<>(
mfFactory,
key -> {
try {
return lookupLevels.lookup(key, outputLevel + 1);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
},
valueEqualiser,
changelogRowDeduplicate,
lookupStrategy,
deletionVectorsMaintainer,
userDefinedSeqComparator);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.UserDefinedSeqComparator;

import javax.annotation.Nullable;

import java.util.function.Function;

/** Wrapper for {@link MergeFunction}s to produce changelog by lookup for first row. */
public class UnOrderedFirstRowMergeFunctionWrapper<T>
extends LookupChangelogMergeFunctionWrapper<T> {

private final KeyValue reusedBefore = new KeyValue();
private final KeyValue reusedAfter = new KeyValue();

public UnOrderedFirstRowMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
Function<InternalRow, T> lookup,
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate,
LookupStrategy lookupStrategy,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
@Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
super(
mergeFunctionFactory,
lookup,
valueEqualiser,
changelogRowDeduplicate,
lookupStrategy,
deletionVectorsMaintainer,
userDefinedSeqComparator);
}

@Override
protected void setChangelog(
@Nullable KeyValue before, KeyValue after, ChangelogResult reusedResult) {
if (after.level() == 0) {
if (before == null) {
reusedResult.addChangelog(after);
} else {
reusedResult
.addChangelog(replace(reusedBefore, RowKind.UPDATE_BEFORE, before))
.addChangelog(replace(reusedAfter, RowKind.UPDATE_AFTER, after));
}
}
}
}
Loading

0 comments on commit 3a65c7a

Please sign in to comment.