From ad01c5336daaf55deccbf98b43917162454bf0ba Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 9 Jan 2025 11:11:36 +0800 Subject: [PATCH] fixed --- .../alibaba/fluss/metadata/MergeEngine.java | 17 ++++++++++++- .../fluss/metadata/TableDescriptor.java | 24 ++++++++++++++++++- .../com/alibaba/fluss/server/kv/KvTablet.java | 13 ++++------ .../kv/mergeengine/FirstRowMergeEngine.java | 12 ++++------ .../server/kv/mergeengine/RowMergeEngine.java | 8 ++----- .../kv/mergeengine/VersionRowMergeEngine.java | 10 ++++---- 6 files changed, 53 insertions(+), 31 deletions(-) diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java index e76b98b0..d06f3557 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java @@ -18,11 +18,18 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableSet; +import com.alibaba.fluss.types.BigIntType; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.TimeType; +import com.alibaba.fluss.types.TimestampType; import javax.annotation.Nullable; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * The merge engine for primary key table. @@ -31,6 +38,14 @@ */ public class MergeEngine { + public static final Set VERSION_SUPPORTED_DATA_TYPES = + ImmutableSet.of( + BigIntType.class.getName(), + IntType.class.getName(), + TimestampType.class.getName(), + TimeType.class.getName(), + LocalZonedTimestampType.class.getName()); + private final Type type; /** When merge engine type is version, column cannot be null. */ @@ -62,7 +77,7 @@ private static MergeEngine create(Configuration options) { if (column == null) { throw new IllegalArgumentException( String.format( - "When the merge engine is set to version, the '%s' must be set.", + "When the merge engine is set to version, the option '%s' must be set.", ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key())); } return new MergeEngine(Type.VERSION, column); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index a850201e..f140614a 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -22,6 +22,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.ConfigurationUtils; +import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.AutoPartitionStrategy; import com.alibaba.fluss.utils.Preconditions; @@ -43,6 +44,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static com.alibaba.fluss.metadata.MergeEngine.VERSION_SUPPORTED_DATA_TYPES; import static com.alibaba.fluss.utils.Preconditions.checkArgument; import static com.alibaba.fluss.utils.Preconditions.checkNotNull; import static java.util.Collections.unmodifiableMap; @@ -281,7 +283,27 @@ public boolean isDataLakeEnabled() { } public @Nullable MergeEngine getMergeEngine() { - return MergeEngine.create(properties); + MergeEngine mergeEngine = MergeEngine.create(properties); + if (mergeEngine != null) { + if (mergeEngine.getType() == MergeEngine.Type.VERSION) { + String column = mergeEngine.getColumn(); + RowType rowType = schema.toRowType(); + int fieldIndex = rowType.getFieldIndex(column); + if (fieldIndex == -1) { + throw new IllegalArgumentException( + String.format( + "The version merge engine column %s does not exist.", column)); + } + DataType dataType = rowType.getTypeAt(fieldIndex); + if (!VERSION_SUPPORTED_DATA_TYPES.contains(dataType.getClass().getName())) { + throw new IllegalArgumentException( + String.format( + "The version merge engine column does not support type %s .", + dataType)); + } + } + } + return mergeEngine; } public TableDescriptor copy(Map newProperties) { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index df568273..5fc2c780 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -276,6 +276,10 @@ public LogAppendInfo putAsLeader( byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); if (kvRecord.getRow() == null) { + // currently, all supported merge engine will ignore delete row. + if (rowMergeEngine.shouldSkipDeletion()) { + continue; + } // it's for deletion byte[] oldValue = getFromBufferOrKv(key); if (oldValue == null) { @@ -284,17 +288,8 @@ public LogAppendInfo putAsLeader( "The specific key can't be found in kv tablet although the kv record is for deletion, " + "ignore it directly as it doesn't exist in the kv tablet yet."); } else { - BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; BinaryRow newRow = deleteRow(oldRow, partialUpdater); - if (rowMergeEngine.shouldSkipDeletion(newRow)) { - continue; - } - newRow = rowMergeEngine.merge(oldRow, newRow); - if (newRow == null) { - continue; - } - // if newRow is null, it means the row should be deleted if (newRow == null) { walBuilder.append(RowKind.DELETE, oldRow); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java index 942d3eef..9c9bfbc8 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java @@ -17,19 +17,15 @@ import com.alibaba.fluss.row.BinaryRow; -/** - * The first row merge engine for primary key table. Always retain the first row. - * - * @since 0.6 - */ +/** The first row merge engine for primary key table. Always retain the first row. */ public class FirstRowMergeEngine implements RowMergeEngine { @Override public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { - return oldRow == null ? newRow : null; + return null; } @Override - public boolean shouldSkipDeletion(BinaryRow newRow) { - return newRow == null; + public boolean shouldSkipDeletion() { + return true; } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java index 56831cdf..528a04fa 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java @@ -17,15 +17,11 @@ import com.alibaba.fluss.row.BinaryRow; -/** - * The row merge engine for primary key table. - * - * @since 0.6 - */ +/** The row merge engine for primary key table. */ public interface RowMergeEngine { BinaryRow merge(BinaryRow oldRow, BinaryRow newRow); - default boolean shouldSkipDeletion(BinaryRow newRow) { + default boolean shouldSkipDeletion() { return false; } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java index 478ef50a..b8eb9bad 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java @@ -15,7 +15,6 @@ */ package com.alibaba.fluss.server.kv.mergeengine; -import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.row.BinaryRow; @@ -33,8 +32,6 @@ /** * The version row merge engine for primary key table. The update will only occur if the new value * of the specified version field is greater than the old value. - * - * @since 0.6 */ public class VersionRowMergeEngine implements RowMergeEngine { @@ -57,7 +54,7 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { if (fieldIndex == -1) { throw new IllegalArgumentException( String.format( - "When the merge engine is set to version, the column %s does not exist.", + "The merge engine is set to version, but the version column %s does not exist.", mergeEngine.getColumn())); } InternalRow.FieldGetter fieldGetter = currentFieldGetters[fieldIndex]; @@ -72,7 +69,7 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { } @Override - public boolean shouldSkipDeletion(BinaryRow newRow) { + public boolean shouldSkipDeletion() { return true; } @@ -92,7 +89,8 @@ private ValueComparator getValueComparator(DataType dataType) { return (left, right) -> ((TimestampLtz) left).toEpochMicros() > ((TimestampLtz) right).toEpochMicros(); } - throw new FlussRuntimeException("Unsupported data type: " + dataType.asSummaryString()); + throw new UnsupportedOperationException( + "Unsupported data type: " + dataType.asSummaryString()); } interface ValueComparator {