Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 9, 2025
1 parent fce5768 commit ad01c53
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableSet;
import com.alibaba.fluss.types.BigIntType;
import com.alibaba.fluss.types.IntType;
import com.alibaba.fluss.types.LocalZonedTimestampType;
import com.alibaba.fluss.types.TimeType;
import com.alibaba.fluss.types.TimestampType;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* The merge engine for primary key table.
Expand All @@ -31,6 +38,14 @@
*/
public class MergeEngine {

public static final Set<String> VERSION_SUPPORTED_DATA_TYPES =
ImmutableSet.of(
BigIntType.class.getName(),
IntType.class.getName(),
TimestampType.class.getName(),
TimeType.class.getName(),
LocalZonedTimestampType.class.getName());

private final Type type;

/** When merge engine type is version, column cannot be null. */
Expand Down Expand Up @@ -62,7 +77,7 @@ private static MergeEngine create(Configuration options) {
if (column == null) {
throw new IllegalArgumentException(
String.format(
"When the merge engine is set to version, the '%s' must be set.",
"When the merge engine is set to version, the option '%s' must be set.",
ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key()));
}
return new MergeEngine(Type.VERSION, column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.ConfigurationUtils;
import com.alibaba.fluss.types.DataType;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.AutoPartitionStrategy;
import com.alibaba.fluss.utils.Preconditions;
Expand All @@ -43,6 +44,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.alibaba.fluss.metadata.MergeEngine.VERSION_SUPPORTED_DATA_TYPES;
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
import static java.util.Collections.unmodifiableMap;
Expand Down Expand Up @@ -281,7 +283,27 @@ public boolean isDataLakeEnabled() {
}

public @Nullable MergeEngine getMergeEngine() {
return MergeEngine.create(properties);
MergeEngine mergeEngine = MergeEngine.create(properties);
if (mergeEngine != null) {
if (mergeEngine.getType() == MergeEngine.Type.VERSION) {
String column = mergeEngine.getColumn();
RowType rowType = schema.toRowType();
int fieldIndex = rowType.getFieldIndex(column);
if (fieldIndex == -1) {
throw new IllegalArgumentException(
String.format(
"The version merge engine column %s does not exist.", column));
}
DataType dataType = rowType.getTypeAt(fieldIndex);
if (!VERSION_SUPPORTED_DATA_TYPES.contains(dataType.getClass().getName())) {
throw new IllegalArgumentException(
String.format(
"The version merge engine column does not support type %s .",
dataType));
}
}
}
return mergeEngine;
}

public TableDescriptor copy(Map<String, String> newProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ public LogAppendInfo putAsLeader(
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
if (kvRecord.getRow() == null) {
// currently, all supported merge engine will ignore delete row.
if (rowMergeEngine.shouldSkipDeletion()) {
continue;
}
// it's for deletion
byte[] oldValue = getFromBufferOrKv(key);
if (oldValue == null) {
Expand All @@ -284,17 +288,8 @@ public LogAppendInfo putAsLeader(
"The specific key can't be found in kv tablet although the kv record is for deletion, "
+ "ignore it directly as it doesn't exist in the kv tablet yet.");
} else {

BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row;
BinaryRow newRow = deleteRow(oldRow, partialUpdater);
if (rowMergeEngine.shouldSkipDeletion(newRow)) {
continue;
}
newRow = rowMergeEngine.merge(oldRow, newRow);
if (newRow == null) {
continue;
}

// if newRow is null, it means the row should be deleted
if (newRow == null) {
walBuilder.append(RowKind.DELETE, oldRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,15 @@

import com.alibaba.fluss.row.BinaryRow;

/**
* The first row merge engine for primary key table. Always retain the first row.
*
* @since 0.6
*/
/** The first row merge engine for primary key table. Always retain the first row. */
public class FirstRowMergeEngine implements RowMergeEngine {
@Override
public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
return oldRow == null ? newRow : null;
return null;
}

@Override
public boolean shouldSkipDeletion(BinaryRow newRow) {
return newRow == null;
public boolean shouldSkipDeletion() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@

import com.alibaba.fluss.row.BinaryRow;

/**
* The row merge engine for primary key table.
*
* @since 0.6
*/
/** The row merge engine for primary key table. */
public interface RowMergeEngine {
BinaryRow merge(BinaryRow oldRow, BinaryRow newRow);

default boolean shouldSkipDeletion(BinaryRow newRow) {
default boolean shouldSkipDeletion() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.alibaba.fluss.server.kv.mergeengine;

import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.row.BinaryRow;
Expand All @@ -33,8 +32,6 @@
/**
* The version row merge engine for primary key table. The update will only occur if the new value
* of the specified version field is greater than the old value.
*
* @since 0.6
*/
public class VersionRowMergeEngine implements RowMergeEngine {

Expand All @@ -57,7 +54,7 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
if (fieldIndex == -1) {
throw new IllegalArgumentException(
String.format(
"When the merge engine is set to version, the column %s does not exist.",
"The merge engine is set to version, but the version column %s does not exist.",
mergeEngine.getColumn()));
}
InternalRow.FieldGetter fieldGetter = currentFieldGetters[fieldIndex];
Expand All @@ -72,7 +69,7 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
}

@Override
public boolean shouldSkipDeletion(BinaryRow newRow) {
public boolean shouldSkipDeletion() {
return true;
}

Expand All @@ -92,7 +89,8 @@ private ValueComparator getValueComparator(DataType dataType) {
return (left, right) ->
((TimestampLtz) left).toEpochMicros() > ((TimestampLtz) right).toEpochMicros();
}
throw new FlussRuntimeException("Unsupported data type: " + dataType.asSummaryString());
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType.asSummaryString());
}

interface ValueComparator {
Expand Down

0 comments on commit ad01c53

Please sign in to comment.