Skip to content

Commit

Permalink
Introduce version merge engine for primary key table alibaba#213
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiaojian Sun committed Dec 25, 2024
1 parent 1879ddb commit 7dc4b95
Show file tree
Hide file tree
Showing 16 changed files with 592 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ public class ConfigOptions {
"The interval of auto partition check. "
+ "The default value is 10 minutes.");

public static final ConfigOption<MergeEngine.Type> TABLE_MERGE_ENGINE =
key("table.merge-engine")
.enumType(MergeEngine.Type.class)
.noDefaultValue()
.withDescription("The merge engine for the primary key table.");

public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
key("table.merge-engine.version.column")
.stringType()
.noDefaultValue()
.withDescription("The merge engine version column for the primary key table.");

// ------------------------------------------------------------------------
// ConfigOptions for Coordinator Server
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.config;

import java.util.Map;

/** The merge engine for primary key table. */
public class MergeEngine {

private final Type type;
private final String column;

private MergeEngine(Type type) {
this(type, null);
}

private MergeEngine(Type type, String column) {
this.type = type;
this.column = column;
}

public static MergeEngine create(Map<String, String> properties) {
return create(Configuration.fromMap(properties));
}

public static MergeEngine create(Configuration options) {
if (options == null) {
return null;
}
MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE);
if (type == null) {
return null;
}

switch (type) {
case FIRST_ROW:
return new MergeEngine(Type.FIRST_ROW);
case VERSION:
String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
if (column == null) {
throw new IllegalArgumentException(
"When the merge engine is set to version, the 'table.merge-engine.version.column' cannot be empty.");
}
return new MergeEngine(Type.VERSION, column);
default:
throw new UnsupportedOperationException("Unsupported merge engine: " + type);
}
}

public Type getType() {
return type;
}

public String getColumn() {
return column;
}

public enum Type {
FIRST_ROW("first_row"),
VERSION("version");
private final String value;

Type(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.ConfigurationUtils;
import com.alibaba.fluss.config.MergeEngine;
import com.alibaba.fluss.utils.AutoPartitionStrategy;
import com.alibaba.fluss.utils.Preconditions;
import com.alibaba.fluss.utils.json.JsonSerdeUtils;
Expand Down Expand Up @@ -133,6 +134,11 @@ && getLogFormat() != LogFormat.ARROW) {
throw new IllegalArgumentException(
"For Primary Key Table, if kv format is compacted, log format must be arrow.");
}

if (!hasPrimaryKey() && getMergeEngine() != null) {
throw new IllegalArgumentException(
"Merge engine is only supported in primary key table.");
}
}

/** Creates a builder for building table descriptor. */
Expand Down Expand Up @@ -244,6 +250,10 @@ public boolean isDataLakeEnabled() {
return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED);
}

public @Nullable MergeEngine getMergeEngine() {
return MergeEngine.create(configuration());
}

public TableDescriptor copy(Map<String, String> newProperties) {
return new TableDescriptor(
schema, comment, partitionKeys, tableDistribution, newProperties, customProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.MergeEngine;
import com.alibaba.fluss.connector.flink.FlinkConnectorOptions;
import com.alibaba.fluss.connector.flink.lakehouse.LakeTableFactory;
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
Expand Down Expand Up @@ -142,7 +143,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode);
isStreamingMode,
MergeEngine.create(helper.getOptions().toMap()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.MergeEngine;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
Expand Down Expand Up @@ -67,17 +68,21 @@ public class FlinkTableSink
private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;

private final MergeEngine mergeEngine;

public FlinkTableSink(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming) {
boolean streaming,
MergeEngine mergeEngine) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.primaryKeyIndexes = primaryKeyIndexes;
this.streaming = streaming;
this.mergeEngine = mergeEngine;
}

@Override
Expand Down Expand Up @@ -165,7 +170,12 @@ private List<String> columns(int[] columnIndexes) {
public DynamicTableSink copy() {
FlinkTableSink sink =
new FlinkTableSink(
tablePath, flussConfig, tableRowType, primaryKeyIndexes, streaming);
tablePath,
flussConfig,
tableRowType,
primaryKeyIndexes,
streaming,
mergeEngine);
sink.appliedUpdates = appliedUpdates;
sink.deleteRow = deleteRow;
return sink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -712,6 +713,40 @@ void testUnsupportedDeleteAndUpdateStmtOnPartialPK() {
"Currently, Fluss table only supports UPDATE statement with conditions on primary key.");
}

@Test
void testVersionMergeEngine() throws Exception {
tEnv.executeSql(
"create table version_merge_source (a int not null primary key not enforced,"
+ " b string, ts bigint) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')");
tEnv.executeSql(
"create table log_sink (a int not null primary key not enforced, b string, ts bigint)");

JobClient insertJobClient =
tEnv.executeSql("insert into log_sink select * from version_merge_source")
.getJobClient()
.get();

// insert once
tEnv.executeSql(
"insert into version_merge_source(a, b, ts) VALUES (1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)")
.await();

CloseableIterator<Row> rowIter = tEnv.executeSql("select * from log_sink").collect();

List<String> expectedRows =
Arrays.asList("+I[1, v1, 1000]", "+I[2, v2, 1000]", "+I[3, v3, 1000]");

assertResultsIgnoreOrder(rowIter, expectedRows, false);

// insert again
tEnv.executeSql(
"insert into version_merge_source(a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44')")
.await();
expectedRows = Collections.singletonList("+I[4, v44]");
assertResultsIgnoreOrder(rowIter, expectedRows, true);
insertJobClient.cancel().get();
}

private InsertAndExpectValues rowsToInsertInto(Collection<String> partitions) {
List<String> insertValues = new ArrayList<>();
List<String> expectedValues = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro
flussConfig,
FlinkConversions.toFlinkRowType(rowType),
tableDescriptor.getSchema().getPrimaryKeyIndexes(),
true);
true,
null);

sinkFunction =
((SinkFunctionProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.MergeEngine;
import com.alibaba.fluss.exception.KvStorageException;
import com.alibaba.fluss.memory.LazyMemorySegmentPool;
import com.alibaba.fluss.memory.MemorySegmentPool;
Expand All @@ -39,6 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.io.File;
Expand Down Expand Up @@ -130,12 +132,14 @@ public void shutdown() {
* @param tableBucket the table bucket
* @param logTablet the cdc log tablet of the kv tablet
* @param kvFormat the kv format
* @param mergeEngine the merge engine
*/
public KvTablet getOrCreateKv(
PhysicalTablePath tablePath,
TableBucket tableBucket,
LogTablet logTablet,
KvFormat kvFormat)
KvFormat kvFormat,
@Nullable MergeEngine mergeEngine)
throws Exception {
return inLock(
tabletCreationOrDeletionLock,
Expand All @@ -153,7 +157,8 @@ public KvTablet getOrCreateKv(
conf,
arrowBufferAllocator,
memorySegmentPool,
kvFormat);
kvFormat,
mergeEngine);
currentKvs.put(tableBucket, tablet);

LOG.info(
Expand Down Expand Up @@ -254,7 +259,8 @@ public KvTablet loadKv(File tabletDir) throws Exception {
conf,
arrowBufferAllocator,
memorySegmentPool,
tableDescriptor.getKvFormat());
tableDescriptor.getKvFormat(),
tableDescriptor.getMergeEngine());
if (this.currentKvs.containsKey(tableBucket)) {
throw new IllegalStateException(
String.format(
Expand Down
Loading

0 comments on commit 7dc4b95

Please sign in to comment.