diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 12f842a5..7fb2971c 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -115,6 +115,18 @@ public class ConfigOptions { "The interval of auto partition check. " + "The default value is 10 minutes."); + public static final ConfigOption TABLE_MERGE_ENGINE = + key("table.merge-engine") + .enumType(MergeEngine.Type.class) + .noDefaultValue() + .withDescription("The merge engine for the primary key table."); + + public static final ConfigOption TABLE_MERGE_ENGINE_VERSION_COLUMN = + key("table.merge-engine.version.column") + .stringType() + .noDefaultValue() + .withDescription("The merge engine version column for the primary key table."); + // ------------------------------------------------------------------------ // ConfigOptions for Coordinator Server // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/MergeEngine.java b/fluss-common/src/main/java/com/alibaba/fluss/config/MergeEngine.java new file mode 100644 index 00000000..37dea13a --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/MergeEngine.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.config; + +import java.util.Map; + +/** The merge engine for primary key table. */ +public class MergeEngine { + + private final Type type; + private final String column; + + private MergeEngine(Type type) { + this(type, null); + } + + private MergeEngine(Type type, String column) { + this.type = type; + this.column = column; + } + + public static MergeEngine create(Map properties) { + return create(Configuration.fromMap(properties)); + } + + public static MergeEngine create(Configuration options) { + if (options == null) { + return null; + } + MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE); + if (type == null) { + return null; + } + + switch (type) { + case FIRST_ROW: + return new MergeEngine(Type.FIRST_ROW); + case VERSION: + String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN); + if (column == null) { + throw new IllegalArgumentException( + "When the merge engine is set to version, the 'table.merge-engine.version.column' cannot be empty."); + } + return new MergeEngine(Type.VERSION, column); + default: + throw new UnsupportedOperationException("Unsupported merge engine: " + type); + } + } + + public Type getType() { + return type; + } + + public String getColumn() { + return column; + } + + public enum Type { + FIRST_ROW("first_row"), + VERSION("version"); + private final String value; + + Type(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index d3737b93..f6443408 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -22,6 +22,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.ConfigurationUtils; +import com.alibaba.fluss.config.MergeEngine; import com.alibaba.fluss.utils.AutoPartitionStrategy; import com.alibaba.fluss.utils.Preconditions; import com.alibaba.fluss.utils.json.JsonSerdeUtils; @@ -133,6 +134,11 @@ && getLogFormat() != LogFormat.ARROW) { throw new IllegalArgumentException( "For Primary Key Table, if kv format is compacted, log format must be arrow."); } + + if (!hasPrimaryKey() && getMergeEngine() != null) { + throw new IllegalArgumentException( + "Merge engine is only supported in primary key table."); + } } /** Creates a builder for building table descriptor. */ @@ -244,6 +250,10 @@ public boolean isDataLakeEnabled() { return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED); } + public @Nullable MergeEngine getMergeEngine() { + return MergeEngine.create(configuration()); + } + public TableDescriptor copy(Map newProperties) { return new TableDescriptor( schema, comment, partitionKeys, tableDistribution, newProperties, customProperties); diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index dbe033bc..32cd31e8 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -18,6 +18,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.config.MergeEngine; import com.alibaba.fluss.connector.flink.FlinkConnectorOptions; import com.alibaba.fluss.connector.flink.lakehouse.LakeTableFactory; import com.alibaba.fluss.connector.flink.sink.FlinkTableSink; @@ -142,7 +143,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { toFlussClientConfig(helper.getOptions(), context.getConfiguration()), rowType, context.getPrimaryKeyIndexes(), - isStreamingMode); + isStreamingMode, + MergeEngine.create(helper.getOptions().toMap())); } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java index 5699aef9..50dcfd39 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.connector.flink.sink; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.config.MergeEngine; import com.alibaba.fluss.connector.flink.utils.PushdownUtils; import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual; import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion; @@ -67,17 +68,21 @@ public class FlinkTableSink private boolean appliedUpdates = false; @Nullable private GenericRow deleteRow; + private final MergeEngine mergeEngine; + public FlinkTableSink( TablePath tablePath, Configuration flussConfig, RowType tableRowType, int[] primaryKeyIndexes, - boolean streaming) { + boolean streaming, + MergeEngine mergeEngine) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; this.primaryKeyIndexes = primaryKeyIndexes; this.streaming = streaming; + this.mergeEngine = mergeEngine; } @Override @@ -165,7 +170,12 @@ private List columns(int[] columnIndexes) { public DynamicTableSink copy() { FlinkTableSink sink = new FlinkTableSink( - tablePath, flussConfig, tableRowType, primaryKeyIndexes, streaming); + tablePath, + flussConfig, + tableRowType, + primaryKeyIndexes, + streaming, + mergeEngine); sink.appliedUpdates = appliedUpdates; sink.deleteRow = deleteRow; return sink; diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index 4c9f1f1a..527197b0 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -712,6 +713,41 @@ void testUnsupportedDeleteAndUpdateStmtOnPartialPK() { "Currently, Fluss table only supports UPDATE statement with conditions on primary key."); } + @Test + void testVersionMergeEngine() throws Exception { + tEnv.executeSql( + "create table merge_engine_with_version (a int not null primary key not enforced," + + " b string, ts bigint) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); + tEnv.executeSql( + "create table log_sink (a int not null primary key not enforced, b string, ts bigint)"); + + JobClient insertJobClient = + tEnv.executeSql("insert into log_sink select * from merge_engine_with_version") + .getJobClient() + .get(); + + // insert once + tEnv.executeSql( + "insert into merge_engine_with_version (a, b, ts) VALUES (1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)") + .await(); + + CloseableIterator rowIter = tEnv.executeSql("select * from log_sink").collect(); + + // id=1 not update + List expectedRows = + Arrays.asList("+I[1, v1, 1000]", "+I[2, v2, 1000]", "+I[3, v3, 1000]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + // insert again, update id=3 + tEnv.executeSql( + "insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000)") + .await(); + expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + insertJobClient.cancel().get(); + } + private InsertAndExpectValues rowsToInsertInto(Collection partitions) { List insertValues = new ArrayList<>(); List expectedValues = new ArrayList<>(); diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java index 3a9927fe..805180d2 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java @@ -87,7 +87,8 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro flussConfig, FlinkConversions.toFlinkRowType(rowType), tableDescriptor.getSchema().getPrimaryKeyIndexes(), - true); + true, + null); sinkFunction = ((SinkFunctionProvider) diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java index d8441ed2..fecab6fa 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java @@ -18,6 +18,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.config.MergeEngine; import com.alibaba.fluss.exception.KvStorageException; import com.alibaba.fluss.memory.LazyMemorySegmentPool; import com.alibaba.fluss.memory.MemorySegmentPool; @@ -39,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.io.File; @@ -130,12 +132,14 @@ public void shutdown() { * @param tableBucket the table bucket * @param logTablet the cdc log tablet of the kv tablet * @param kvFormat the kv format + * @param mergeEngine the merge engine */ public KvTablet getOrCreateKv( PhysicalTablePath tablePath, TableBucket tableBucket, LogTablet logTablet, - KvFormat kvFormat) + KvFormat kvFormat, + @Nullable MergeEngine mergeEngine) throws Exception { return inLock( tabletCreationOrDeletionLock, @@ -153,7 +157,8 @@ public KvTablet getOrCreateKv( conf, arrowBufferAllocator, memorySegmentPool, - kvFormat); + kvFormat, + mergeEngine); currentKvs.put(tableBucket, tablet); LOG.info( @@ -254,7 +259,8 @@ public KvTablet loadKv(File tabletDir) throws Exception { conf, arrowBufferAllocator, memorySegmentPool, - tableDescriptor.getKvFormat()); + tableDescriptor.getKvFormat(), + tableDescriptor.getMergeEngine()); if (this.currentKvs.containsKey(tableBucket)) { throw new IllegalStateException( String.format( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index 7dc740ce..8cb81287 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.annotation.VisibleForTesting; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.config.MergeEngine; import com.alibaba.fluss.exception.KvStorageException; import com.alibaba.fluss.memory.ManagedPagedOutputView; import com.alibaba.fluss.memory.MemorySegmentPool; @@ -31,13 +32,14 @@ import com.alibaba.fluss.record.KvRecord; import com.alibaba.fluss.record.KvRecordBatch; import com.alibaba.fluss.record.KvRecordReadContext; -import com.alibaba.fluss.record.RowKind; import com.alibaba.fluss.row.BinaryRow; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.arrow.ArrowWriterPool; import com.alibaba.fluss.row.arrow.ArrowWriterProvider; import com.alibaba.fluss.row.encode.ValueDecoder; -import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.server.kv.mergeengine.MergeEngineWrapper; +import com.alibaba.fluss.server.kv.mergeengine.NoMergeEngineWrapper; +import com.alibaba.fluss.server.kv.mergeengine.VersionMergeEngineWrapper; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdaterCache; import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; @@ -56,7 +58,6 @@ import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.RowType; -import com.alibaba.fluss.utils.BytesUtils; import com.alibaba.fluss.utils.FileUtils; import com.alibaba.fluss.utils.FlussPaths; import com.alibaba.fluss.utils.types.Tuple2; @@ -102,6 +103,7 @@ public final class KvTablet { private final ReadWriteLock kvLock = new ReentrantReadWriteLock(); private final LogFormat logFormat; private final KvFormat kvFormat; + private final @Nullable MergeEngine mergeEngine; /** * The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been @@ -122,7 +124,8 @@ private KvTablet( LogFormat logFormat, BufferAllocator arrowBufferAllocator, MemorySegmentPool memorySegmentPool, - KvFormat kvFormat) { + KvFormat kvFormat, + @Nullable MergeEngine mergeEngine) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logTablet = logTablet; @@ -136,6 +139,7 @@ private KvTablet( // TODO: [FLUSS-58674883] share cache in server level when PartialUpdater is thread-safe this.partialUpdaterCache = new PartialUpdaterCache(); this.kvFormat = kvFormat; + this.mergeEngine = mergeEngine; } public static KvTablet create( @@ -144,7 +148,8 @@ public static KvTablet create( Configuration conf, BufferAllocator arrowBufferAllocator, MemorySegmentPool memorySegmentPool, - KvFormat kvFormat) + KvFormat kvFormat, + @Nullable MergeEngine mergeEngine) throws IOException { Tuple2 tablePathAndBucket = FlussPaths.parseTabletDir(kvTabletDir); @@ -156,7 +161,8 @@ public static KvTablet create( conf, arrowBufferAllocator, memorySegmentPool, - kvFormat); + kvFormat, + mergeEngine); } public static KvTablet create( @@ -167,7 +173,8 @@ public static KvTablet create( Configuration conf, BufferAllocator arrowBufferAllocator, MemorySegmentPool memorySegmentPool, - KvFormat kvFormat) + KvFormat kvFormat, + @Nullable MergeEngine mergeEngine) throws IOException { RocksDBKv kv = buildRocksDBKv(conf, kvTabletDir); return new KvTablet( @@ -180,7 +187,8 @@ public static KvTablet create( logTablet.getLogFormat(), arrowBufferAllocator, memorySegmentPool, - kvFormat); + kvFormat, + mergeEngine); } private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir) @@ -261,70 +269,21 @@ public LogAppendInfo putAsLeader( new ValueDecoder(readContext.getRowDecoder(schemaId)); int appendedRecordCount = 0; + + MergeEngineWrapper mergeEngineWrapper = + getMergeEngineWrapper( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + schema, + schemaId, + appendedRecordCount, + logOffset); for (KvRecord kvRecord : kvRecords.records(readContext)) { - byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); - KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); - if (kvRecord.getRow() == null) { - // kv tablet - byte[] oldValue = getFromBufferOrKv(key); - if (oldValue == null) { - // there might be large amount of such deletion, so we don't log - LOG.debug( - "The specific key can't be found in kv tablet although the kv record is for deletion, " - + "ignore it directly as it doesn't exist in the kv tablet yet."); - } else { - BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; - BinaryRow newRow = deleteRow(oldRow, partialUpdater); - // if newRow is null, it means the row should be deleted - if (newRow == null) { - walBuilder.append(RowKind.DELETE, oldRow); - appendedRecordCount += 1; - kvPreWriteBuffer.delete(key, logOffset++); - } else { - // otherwise, it's a partial update, should produce -U,+U - walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); - walBuilder.append(RowKind.UPDATE_AFTER, newRow); - appendedRecordCount += 2; - kvPreWriteBuffer.put( - key, - ValueEncoder.encodeValue(schemaId, newRow), - logOffset + 1); - logOffset += 2; - } - } - } else { - // upsert operation - byte[] oldValue = getFromBufferOrKv(key); - // it's update - if (oldValue != null) { - BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; - BinaryRow newRow = - updateRow(oldRow, kvRecord.getRow(), partialUpdater); - walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); - walBuilder.append(RowKind.UPDATE_AFTER, newRow); - appendedRecordCount += 2; - // logOffset is for -U, logOffset + 1 is for +U, we need to use - // the log offset for +U - kvPreWriteBuffer.put( - key, - ValueEncoder.encodeValue(schemaId, newRow), - logOffset + 1); - logOffset += 2; - } else { - // it's insert - // TODO: we should add guarantees that all non-specified columns - // of the input row are set to null. - BinaryRow newRow = kvRecord.getRow(); - walBuilder.append(RowKind.INSERT, newRow); - appendedRecordCount += 1; - kvPreWriteBuffer.put( - key, - ValueEncoder.encodeValue(schemaId, newRow), - logOffset++); - } - } + mergeEngineWrapper.doPutRecord(kvRecord); } - + appendedRecordCount = mergeEngineWrapper.getAppendedRecordCount(); // if appendedRecordCount is 0, it means there is no record to append, we // should not append. if (appendedRecordCount > 0) { @@ -356,6 +315,43 @@ public LogAppendInfo putAsLeader( }); } + private MergeEngineWrapper getMergeEngineWrapper( + PartialUpdater partialUpdater, + ValueDecoder valueDecoder, + WalBuilder walBuilder, + KvPreWriteBuffer kvPreWriteBuffer, + Schema schema, + short schemaId, + int appendedRecordCount, + long logOffset) { + if (mergeEngine != null) { + switch (mergeEngine.getType()) { + case VERSION: + return new VersionMergeEngineWrapper( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset, + mergeEngine); + } + } + return new NoMergeEngineWrapper( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset); + } + private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { switch (logFormat) { case INDEXED: diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/AbstractMergeEngineWrapper.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/AbstractMergeEngineWrapper.java new file mode 100644 index 00000000..020d4383 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/AbstractMergeEngineWrapper.java @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.record.KvRecord; +import com.alibaba.fluss.record.RowKind; +import com.alibaba.fluss.row.BinaryRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.encode.ValueDecoder; +import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; +import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; +import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; +import com.alibaba.fluss.server.kv.wal.WalBuilder; +import com.alibaba.fluss.utils.BytesUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** A merge engine wrapper for primary table. */ +public abstract class AbstractMergeEngineWrapper implements MergeEngineWrapper { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractMergeEngineWrapper.class); + protected final PartialUpdater partialUpdater; + protected final ValueDecoder valueDecoder; + protected final WalBuilder walBuilder; + protected final KvPreWriteBuffer kvPreWriteBuffer; + protected final RocksDBKv rocksDBKv; + protected final Schema schema; + protected final short schemaId; + protected int appendedRecordCount; + protected long logOffset; + + public AbstractMergeEngineWrapper( + PartialUpdater partialUpdater, + ValueDecoder valueDecoder, + WalBuilder walBuilder, + KvPreWriteBuffer kvPreWriteBuffer, + RocksDBKv rocksDBKv, + Schema schema, + short schemaId, + int appendedRecordCount, + long logOffset) { + this.partialUpdater = partialUpdater; + this.valueDecoder = valueDecoder; + this.walBuilder = walBuilder; + this.kvPreWriteBuffer = kvPreWriteBuffer; + this.rocksDBKv = rocksDBKv; + this.schema = schema; + this.schemaId = schemaId; + this.appendedRecordCount = appendedRecordCount; + this.logOffset = logOffset; + } + + @Override + public int getAppendedRecordCount() { + return appendedRecordCount; + } + + @Override + public long getLogOffset() { + return logOffset; + } + + @Override + public void doPutRecord(KvRecord kvRecord) throws Exception { + byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); + KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); + if (kvRecord.getRow() == null) { + // it's for deletion + byte[] oldValue = getFromBufferOrKv(key); + if (oldValue == null) { + // there might be large amount of such deletion, so we don't log + LOG.debug( + "The specific key can't be found in kv tablet although the kv record is for deletion, " + + "ignore it directly as it doesn't exist in the kv tablet yet."); + } else { + BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; + BinaryRow newRow = deleteRow(oldRow, partialUpdater); + // if newRow is null, it means the row should be deleted + if (newRow == null) { + delete(oldRow, key); + } else { + // otherwise, it's a partial update, should produce -U,+U + update(oldRow, newRow, key); + } + } + } else { + // upsert operation + byte[] oldValue = getFromBufferOrKv(key); + // it's update + if (oldValue != null) { + BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; + BinaryRow newRow = updateRow(oldRow, kvRecord.getRow(), partialUpdater); + update(oldRow, newRow, key); + } else { + // it's insert + // TODO: we should add guarantees that all non-specified columns + // of the input row are set to null. + insert(kvRecord, key); + } + } + } + + protected void delete(BinaryRow oldRow, KvPreWriteBuffer.Key key) throws Exception { + walBuilder.append(RowKind.DELETE, oldRow); + appendedRecordCount += 1; + kvPreWriteBuffer.delete(key, logOffset++); + } + + protected void insert(KvRecord kvRecord, KvPreWriteBuffer.Key key) throws Exception { + BinaryRow newRow = kvRecord.getRow(); + walBuilder.append(RowKind.INSERT, newRow); + appendedRecordCount += 1; + kvPreWriteBuffer.put(key, ValueEncoder.encodeValue(schemaId, newRow), logOffset++); + } + + protected void update(BinaryRow oldRow, BinaryRow newRow, KvPreWriteBuffer.Key key) + throws Exception { + walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); + walBuilder.append(RowKind.UPDATE_AFTER, newRow); + appendedRecordCount += 2; + // logOffset is for -U, logOffset + 1 is for +U, we need to use + // the log offset for +U + kvPreWriteBuffer.put(key, ValueEncoder.encodeValue(schemaId, newRow), logOffset + 1); + logOffset += 2; + } + + // get from kv pre-write buffer first, if can't find, get from rocksdb + protected byte[] getFromBufferOrKv(KvPreWriteBuffer.Key key) throws IOException { + KvPreWriteBuffer.Value value = kvPreWriteBuffer.get(key); + if (value == null) { + return rocksDBKv.get(key.get()); + } + return value.get(); + } + + protected @Nullable BinaryRow deleteRow( + InternalRow oldRow, @Nullable PartialUpdater partialUpdater) { + if (partialUpdater == null) { + return null; + } + return partialUpdater.deleteRow(oldRow); + } + + protected BinaryRow updateRow( + BinaryRow oldRow, BinaryRow updateRow, @Nullable PartialUpdater partialUpdater) { + // if is not partial update, return the update row + if (partialUpdater == null) { + return updateRow; + } + // otherwise, do partial update + return partialUpdater.updateRow(oldRow, updateRow); + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/MergeEngineWrapper.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/MergeEngineWrapper.java new file mode 100644 index 00000000..42e80441 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/MergeEngineWrapper.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.record.KvRecord; + +public interface MergeEngineWrapper { + + void doPutRecord(KvRecord kvRecord) throws Exception; + + int getAppendedRecordCount(); + + long getLogOffset(); +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/NoMergeEngineWrapper.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/NoMergeEngineWrapper.java new file mode 100644 index 00000000..2dd244a8 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/NoMergeEngineWrapper.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.row.encode.ValueDecoder; +import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; +import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; +import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; +import com.alibaba.fluss.server.kv.wal.WalBuilder; + +/** A wrapper for default. */ +public class NoMergeEngineWrapper extends AbstractMergeEngineWrapper { + + public NoMergeEngineWrapper( + PartialUpdater partialUpdater, + ValueDecoder valueDecoder, + WalBuilder walBuilder, + KvPreWriteBuffer kvPreWriteBuffer, + RocksDBKv rocksDBKv, + Schema schema, + short schemaId, + int appendedRecordCount, + long logOffset) { + super( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset); + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionMergeEngineWrapper.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionMergeEngineWrapper.java new file mode 100644 index 00000000..5b2e60a3 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionMergeEngineWrapper.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.config.MergeEngine; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.record.RowKind; +import com.alibaba.fluss.row.BinaryRow; +import com.alibaba.fluss.row.encode.ValueDecoder; +import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; +import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; +import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; +import com.alibaba.fluss.server.kv.wal.WalBuilder; +import com.alibaba.fluss.types.BigIntType; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.TimeType; +import com.alibaba.fluss.types.TimestampType; + +/** A wrapper for version merge engine. */ +public class VersionMergeEngineWrapper extends AbstractMergeEngineWrapper { + + private final MergeEngine mergeEngine; + private final Schema schema; + + public VersionMergeEngineWrapper( + PartialUpdater partialUpdater, + ValueDecoder valueDecoder, + WalBuilder walBuilder, + KvPreWriteBuffer kvPreWriteBuffer, + RocksDBKv rocksDBKv, + Schema schema, + short schemaId, + int appendedRecordCount, + long logOffset, + MergeEngine mergeEngine) { + super( + partialUpdater, + valueDecoder, + walBuilder, + kvPreWriteBuffer, + rocksDBKv, + schema, + schemaId, + appendedRecordCount, + logOffset); + this.mergeEngine = mergeEngine; + this.schema = schema; + } + + @Override + protected void update(BinaryRow oldRow, BinaryRow newRow, KvPreWriteBuffer.Key key) + throws Exception { + RowType rowType = schema.toRowType(); + if (checkVersionMergeEngine(rowType, oldRow, newRow)) { + return; + } + walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); + walBuilder.append(RowKind.UPDATE_AFTER, newRow); + appendedRecordCount += 2; + // logOffset is for -U, logOffset + 1 is for +U, we need to use + // the log offset for +U + kvPreWriteBuffer.put(key, ValueEncoder.encodeValue(schemaId, newRow), logOffset + 1); + logOffset += 2; + } + + private boolean checkVersionMergeEngine(RowType rowType, BinaryRow oldRow, BinaryRow newRow) { + if (!checkRowNewVersion(mergeEngine, rowType, oldRow, newRow)) { + // When the specified field version is less + // than the version number of the old + // record, do not update + return true; + } + return false; + } + + // Check row version. + private boolean checkRowNewVersion( + MergeEngine mergeEngine, RowType rowType, BinaryRow oldRow, BinaryRow newRow) { + int fieldIndex = rowType.getFieldIndex(mergeEngine.getColumn()); + DataType dataType = rowType.getTypeAt(fieldIndex); + if (dataType instanceof BigIntType) { + return newRow.getLong(fieldIndex) > oldRow.getLong(fieldIndex); + } else if (dataType instanceof IntType) { + return newRow.getInt(fieldIndex) > oldRow.getInt(fieldIndex); + } else if (dataType instanceof TimestampType || dataType instanceof TimeType) { + return newRow.getTimestampNtz(fieldIndex, ((TimestampType) dataType).getPrecision()) + .getMillisecond() + > oldRow.getTimestampNtz(fieldIndex, ((TimestampType) dataType).getPrecision()) + .getMillisecond(); + } else if (dataType instanceof LocalZonedTimestampType) { + return newRow.getTimestampLtz(fieldIndex, ((TimestampType) dataType).getPrecision()) + .toEpochMicros() + > oldRow.getTimestampLtz(fieldIndex, ((TimestampType) dataType).getPrecision()) + .toEpochMicros(); + } else { + throw new RuntimeException("Unsupported data type: " + dataType); + } + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 337b3f50..1ac3c408 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -18,6 +18,7 @@ import com.alibaba.fluss.annotation.VisibleForTesting; import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.MergeEngine; import com.alibaba.fluss.exception.FencedLeaderEpochException; import com.alibaba.fluss.exception.InvalidColumnProjectionException; import com.alibaba.fluss.exception.InvalidTimestampException; @@ -164,6 +165,7 @@ public final class Replica { private final Schema schema; private final LogFormat logFormat; private final KvFormat kvFormat; + private final @Nullable MergeEngine mergeEngine; private final long logTTLMs; private final boolean dataLakeEnabled; private final int tieredLogLocalSegments; @@ -227,6 +229,7 @@ public Replica( this.logTTLMs = tableDescriptor.getLogTTLMs(); this.dataLakeEnabled = tableDescriptor.isDataLakeEnabled(); this.tieredLogLocalSegments = tableDescriptor.getTieredLogLocalSegments(); + this.mergeEngine = tableDescriptor.getMergeEngine(); this.partitionKeys = tableDescriptor.getPartitionKeys(); this.snapshotContext = snapshotContext; // create a closeable registry for the replica @@ -588,7 +591,9 @@ private Optional initKvTablet() { LOG.info("No snapshot found, restore from log."); // actually, kv manager always create a kv tablet since we will drop the kv // if it exists before init kv tablet - kvTablet = kvManager.getOrCreateKv(physicalPath, tableBucket, logTablet, kvFormat); + kvTablet = + kvManager.getOrCreateKv( + physicalPath, tableBucket, logTablet, kvFormat, mergeEngine); } logTablet.updateMinRetainOffset(restoreStartOffset); recoverKvTablet(restoreStartOffset); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java index 6bbf7632..3a266fd6 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java @@ -260,7 +260,7 @@ private KvTablet getOrCreateKv( LogTablet logTablet = logManager.getOrCreateLog(physicalTablePath, tableBucket, LogFormat.ARROW, 1, true); return kvManager.getOrCreateKv( - physicalTablePath, tableBucket, logTablet, KvFormat.COMPACTED); + physicalTablePath, tableBucket, logTablet, KvFormat.COMPACTED, null); } private byte[] valueOf(KvRecord kvRecord) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 63b126fb..e4184d6d 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -124,7 +124,8 @@ void beforeEach() throws Exception { conf, new RootAllocator(Long.MAX_VALUE), new TestingMemorySegmentPool(10 * 1024), - KvFormat.COMPACTED); + KvFormat.COMPACTED, + null); executor = Executors.newFixedThreadPool(2); }