From 934a7d60c8e786d183e49f3836200dac8947ffb4 Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Fri, 8 Mar 2024 17:33:46 +0800 Subject: [PATCH] polish --- .../LookupChangelogMergeFunctionWrapper.java | 45 ++------ .../compact/LookupMergeFunction.java | 17 +-- .../compact/LookupMergeRecordManager.java | 102 ++++++++++++++++++ 3 files changed, 123 insertions(+), 41 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeRecordManager.java diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java index 319f5055a4310..fe3075d6e680a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java @@ -23,8 +23,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.types.RowKind; -import java.util.Iterator; -import java.util.LinkedList; +import java.util.List; import java.util.function.Function; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -47,7 +46,6 @@ public class LookupChangelogMergeFunctionWrapper implements MergeFunctionWrapper { private final LookupMergeFunction mergeFunction; - private final MergeFunction mergeFunction2; private final Function lookup; private final ChangelogResult reusedResult = new ChangelogResult(); @@ -67,7 +65,6 @@ public LookupChangelogMergeFunctionWrapper( "Merge function should be a LookupMergeFunction, but is %s, there is a bug.", mergeFunction.getClass().getName()); this.mergeFunction = (LookupMergeFunction) mergeFunction; - this.mergeFunction2 = mergeFunctionFactory.create(); this.lookup = lookup; this.valueEqualiser = valueEqualiser; this.changelogRowDeduplicate = changelogRowDeduplicate; @@ -85,41 +82,19 @@ public void add(KeyValue kv) { @Override public ChangelogResult getResult() { - // 1. Compute the latest high level record and containLevel0 of candidates - LinkedList candidates = mergeFunction.candidates(); - Iterator descending = candidates.descendingIterator(); - KeyValue highLevel = null; - boolean containLevel0 = false; - while (descending.hasNext()) { - KeyValue kv = descending.next(); - if (kv.level() > 0) { - descending.remove(); - if (highLevel == null) { - highLevel = kv; - } - } else { - containLevel0 = true; - } - } + LookupMergeRecordManager recordManager = mergeFunction.recordManager(); + mergeFunction.reset(); - // 2. Lookup if latest high level record is absent - if (highLevel == null) { - InternalRow lookupKey = candidates.get(0).key(); - highLevel = lookup.apply(lookupKey); - } + // 1. Lookup if latest high level record is absent + List candidates = recordManager.getCandidates(lookup); - // 3. Calculate result - mergeFunction2.reset(); - if (highLevel != null) { - mergeFunction2.add(highLevel); - } - candidates.forEach(mergeFunction2::add); - KeyValue result = mergeFunction2.getResult(); + // 2. Calculate result + KeyValue result = mergeFunction.getResult(candidates); - // 4. Set changelog when there's level-0 records + // 3. Set changelog when there's level-0 records reusedResult.reset(); - if (containLevel0) { - setChangelog(highLevel, result); + if (recordManager.containsLevel0()) { + setChangelog(recordManager.highLevel(), result); } return reusedResult.setResult(result); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java index 7fc3e1a119779..28d41e3eba180 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java @@ -25,7 +25,7 @@ import javax.annotation.Nullable; -import java.util.LinkedList; +import java.util.List; /** * A {@link MergeFunction} for lookup, this wrapper only considers the latest high level record, @@ -35,36 +35,41 @@ public class LookupMergeFunction implements MergeFunction { private final MergeFunction mergeFunction; - private final LinkedList candidates = new LinkedList<>(); private final InternalRowSerializer keySerializer; private final InternalRowSerializer valueSerializer; + private final LookupMergeRecordManager recordManager; public LookupMergeFunction( MergeFunction mergeFunction, RowType keyType, RowType valueType) { this.mergeFunction = mergeFunction; this.keySerializer = new InternalRowSerializer(keyType); this.valueSerializer = new InternalRowSerializer(valueType); + this.recordManager = new LookupMergeRecordManager(); } @Override public void reset() { - candidates.clear(); + recordManager.reset(); } @Override public void add(KeyValue kv) { - candidates.add(kv.copy(keySerializer, valueSerializer)); + recordManager.add(kv.copy(keySerializer, valueSerializer)); } @Override public KeyValue getResult() { + return getResult(recordManager.getCandidates()); + } + + KeyValue getResult(List candidates) { mergeFunction.reset(); candidates.forEach(mergeFunction::add); return mergeFunction.getResult(); } - LinkedList candidates() { - return candidates; + LookupMergeRecordManager recordManager() { + return recordManager.copy(); } public static MergeFunctionFactory wrap( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeRecordManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeRecordManager.java new file mode 100644 index 0000000000000..8cf78529f6377 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeRecordManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** A maintainer of records added to {@link LookupMergeFunction}. */ +class LookupMergeRecordManager { + + private final List candidates; + private final List highLevelIndex; + private KeyValue highLevel = null; + + LookupMergeRecordManager() { + this.candidates = new ArrayList<>(); + this.highLevelIndex = new ArrayList<>(); + } + + private LookupMergeRecordManager(List candidates, List highLevelIndex) { + this.candidates = candidates; + this.highLevelIndex = highLevelIndex; + } + + void add(KeyValue kv) { + candidates.add(kv); + if (kv.level() != 0) { + highLevelIndex.add(candidates.size() - 1); + highLevel = kv; + } + } + + void reset() { + candidates.clear(); + highLevelIndex.clear(); + highLevel = null; + } + + List getCandidates() { + return getCandidates(null); + } + + List getCandidates(@Nullable Function lookup) { + setHighLevel(); + + if (lookup != null && highLevel == null) { + InternalRow lookupKey = candidates.get(0).key(); + highLevel = lookup.apply(lookupKey); + } + + List copy = new ArrayList<>(candidates); + for (int i = 0; i < highLevelIndex.size() - 1; i++) { + copy.set(i, null); + } + + return copy.stream().filter(Objects::nonNull).collect(Collectors.toList()); + } + + private void setHighLevel() { + if (!highLevelIndex.isEmpty()) { + this.highLevel = candidates.get(highLevelIndex.get(highLevelIndex.size() - 1)); + } + } + + boolean containsLevel0() { + return candidates.size() > highLevelIndex.size(); + } + + @Nullable + KeyValue highLevel() { + return highLevel; + } + + LookupMergeRecordManager copy() { + return new LookupMergeRecordManager( + new ArrayList<>(this.candidates), new ArrayList<>(this.highLevelIndex)); + } +}