Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kv] Support first row merge engine #240

Merged
merged 4 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.fluss.client.admin.ClientToServerITCaseBase;
import com.alibaba.fluss.client.lookup.PrefixLookupResult;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.client.scanner.log.LogScan;
import com.alibaba.fluss.client.scanner.log.LogScanner;
import com.alibaba.fluss.client.scanner.log.ScanRecords;
import com.alibaba.fluss.client.table.writer.AppendWriter;
Expand All @@ -32,6 +33,7 @@
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -876,4 +878,57 @@ void testInvalidColumnProjection() throws Exception {
.hasMessage(
"Projected field index 2 is out of bound for schema ROW<`a` INT, `b` STRING>");
}

@Test
void testFirstRowMergeEngine() throws Exception {
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA1_SCHEMA_PK)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.FIRST_ROW)
.build();
RowType rowType = DATA1_SCHEMA_PK.toRowType();
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false);
int rows = 5;
int duplicateNum = 3;
try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) {
// first, put rows
UpsertWriter upsertWriter = table.getUpsertWriter();
List<InternalRow> expectedRows = new ArrayList<>(rows);
for (int id = 0; id < rows; id++) {
for (int num = 0; num < duplicateNum; num++) {
upsertWriter.upsert(compactedRow(rowType, new Object[] {id, "value_" + num}));
}
expectedRows.add(compactedRow(rowType, new Object[] {id, "value_0"}));
}
upsertWriter.flush();

// now, get rows by lookup
for (int id = 0; id < rows; id++) {
InternalRow gotRow =
table.lookup(keyRow(DATA1_SCHEMA_PK, new Object[] {id, "dumpy"}))
.get()
.getRow();
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id));
}

// check scan change log
LogScanner logScanner = table.getLogScanner(new LogScan());
logScanner.subscribeFromBeginning(0);

List<ScanRecord> actualLogRecords = new ArrayList<>(0);
while (actualLogRecords.size() < rows) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
scanRecords.forEach(actualLogRecords::add);
}

assertThat(actualLogRecords).hasSize(rows);
for (int i = 0; i < actualLogRecords.size(); i++) {
ScanRecord scanRecord = actualLogRecords.get(i);
assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT);
assertThatRow(scanRecord.getRow())
.withSchema(rowType)
.isEqualTo(expectedRows.get(i));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.utils.ArrayUtils;

import java.time.Duration;
Expand Down Expand Up @@ -968,6 +969,12 @@ public class ConfigOptions {
+ "When this option is set to ture and the datalake tiering service is up,"
+ " the table will be tiered and compacted into datalake format stored on lakehouse storage.");

public static final ConfigOption<MergeEngine> TABLE_MERGE_ENGINE =
key("table.merge-engine")
.enumType(MergeEngine.class)
.noDefaultValue()
.withDescription("The merge engine for the primary key table.");

// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.metadata;

/**
* The merge engine for primary key table.
*
* @since 0.6
*/
public enum MergeEngine {
FIRST_ROW("first_row");

private final String value;

MergeEngine(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ && getLogFormat() != LogFormat.ARROW) {
throw new IllegalArgumentException(
"For Primary Key Table, if kv format is compacted, log format must be arrow.");
}

if (!hasPrimaryKey() && getMergeEngine() != null) {
throw new IllegalArgumentException(
"Merge-engine is only supported in primary key table.");
}
}

/** Creates a builder for building table descriptor. */
Expand Down Expand Up @@ -275,6 +280,10 @@ public boolean isDataLakeEnabled() {
return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED);
}

public @Nullable MergeEngine getMergeEngine() {
return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE);
}

public TableDescriptor copy(Map<String, String> newProperties) {
return new TableDescriptor(
schema, comment, partitionKeys, tableDistribution, newProperties, customProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import java.util.Set;

import static com.alibaba.fluss.connector.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
import static org.apache.flink.configuration.ConfigOptions.key;
import static com.alibaba.fluss.connector.flink.utils.FlinkConversions.toFlinkOption;

/** Factory to create table source and table sink for Fluss. */
public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
Expand Down Expand Up @@ -114,15 +114,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();
boolean isDatalakeEnabled =
tableOptions.get(
key(ConfigOptions.TABLE_DATALAKE_ENABLED.key())
.booleanType()
.defaultValue(false));

return new FlinkTableSource(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
toFlussClientConfig(tableOptions, context.getConfiguration()),
tableOutputType,
primaryKeyIndexes,
bucketKeyIndexes,
Expand All @@ -133,7 +128,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
cache,
partitionDiscoveryIntervalMs,
isDatalakeEnabled);
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
}

@Override
Expand All @@ -146,13 +142,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {
== RuntimeExecutionMode.STREAMING;

RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
final ReadableConfig tableOptions = helper.getOptions();

return new FlinkTableSink(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
toFlussClientConfig(tableOptions, context.getConfiguration()),
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode);
isStreamingMode,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.GenericRow;

Expand Down Expand Up @@ -63,6 +64,7 @@ public class FlinkTableSink
private final RowType tableRowType;
private final int[] primaryKeyIndexes;
private final boolean streaming;
@Nullable private final MergeEngine mergeEngine;

private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;
Expand All @@ -72,12 +74,14 @@ public FlinkTableSink(
Configuration flussConfig,
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming) {
boolean streaming,
@Nullable MergeEngine mergeEngine) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.primaryKeyIndexes = primaryKeyIndexes;
this.streaming = streaming;
this.mergeEngine = mergeEngine;
}

@Override
Expand Down Expand Up @@ -112,12 +116,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
// is 0, when no column specified, it's not partial update
// see FLINK-36000
&& context.getTargetColumns().get().length != 0) {
// check partial update
if (primaryKeyIndexes.length == 0
&& context.getTargetColumns().get().length != tableRowType.getFieldCount()) {
throw new ValidationException(
"Fluss table sink does not support partial updates for table without primary key. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.");

// is partial update, check whether partial update is supported or not
if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) {
if (primaryKeyIndexes.length == 0) {
throw new ValidationException(
"Fluss table sink does not support partial updates for table without primary key. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.");
} else if (mergeEngine == MergeEngine.FIRST_ROW) {
throw new ValidationException(
String.format(
"Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.",
tablePath, MergeEngine.FIRST_ROW));
}
}
int[][] targetColumns = context.getTargetColumns().get();
targetColumnIndexes = new int[targetColumns.length];
Expand Down Expand Up @@ -165,7 +177,12 @@ private List<String> columns(int[] columnIndexes) {
public DynamicTableSink copy() {
FlinkTableSink sink =
new FlinkTableSink(
tablePath, flussConfig, tableRowType, primaryKeyIndexes, streaming);
tablePath,
flussConfig,
tableRowType,
primaryKeyIndexes,
streaming,
mergeEngine);
sink.appliedUpdates = appliedUpdates;
sink.deleteRow = deleteRow;
return sink;
Expand Down Expand Up @@ -281,6 +298,13 @@ private void validateUpdatableAndDeletable() {
"Table %s is a Log Table. Log Table doesn't support DELETE and UPDATE statements.",
tablePath));
}

if (mergeEngine == MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
String.format(
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
tablePath, MergeEngine.FIRST_ROW));
}
}

private Map<Integer, LogicalType> getPrimaryKeyTypes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.types.RowType;

Expand Down Expand Up @@ -104,6 +105,7 @@ public class FlinkTableSource

private final long scanPartitionDiscoveryIntervalMs;
private final boolean isDataLakeEnabled;
@Nullable private final MergeEngine mergeEngine;

// output type after projection pushdown
private LogicalType producedDataType;
Expand Down Expand Up @@ -134,7 +136,8 @@ public FlinkTableSource(
boolean lookupAsync,
@Nullable LookupCache cache,
long scanPartitionDiscoveryIntervalMs,
boolean isDataLakeEnabled) {
boolean isDataLakeEnabled,
@Nullable MergeEngine mergeEngine) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableOutputType = tableOutputType;
Expand All @@ -151,6 +154,7 @@ public FlinkTableSource(

this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
this.isDataLakeEnabled = isDataLakeEnabled;
this.mergeEngine = mergeEngine;
}

@Override
Expand All @@ -160,7 +164,11 @@ public ChangelogMode getChangelogMode() {
} else {
if (hasPrimaryKey()) {
// pk table
return ChangelogMode.all();
if (mergeEngine == MergeEngine.FIRST_ROW) {
return ChangelogMode.insertOnly();
} else {
return ChangelogMode.all();
}
} else {
// append only
return ChangelogMode.insertOnly();
Expand Down Expand Up @@ -341,7 +349,8 @@ public DynamicTableSource copy() {
lookupAsync,
cache,
scanPartitionDiscoveryIntervalMs,
isDataLakeEnabled);
isDataLakeEnabled,
mergeEngine);
source.producedDataType = producedDataType;
source.projectedFields = projectedFields;
source.singleRowFilter = singleRowFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,9 @@ public static List<org.apache.flink.configuration.ConfigOption<?>> toFlinkOption
}

/** Convert Fluss's ConfigOption to Flink's ConfigOption. */
public static org.apache.flink.configuration.ConfigOption<?> toFlinkOption(
ConfigOption<?> flussOption) {
@SuppressWarnings("unchecked")
public static <T> org.apache.flink.configuration.ConfigOption<T> toFlinkOption(
ConfigOption<T> flussOption) {
org.apache.flink.configuration.ConfigOptions.OptionBuilder builder =
org.apache.flink.configuration.ConfigOptions.key(flussOption.key());
org.apache.flink.configuration.ConfigOption<?> option;
Expand Down Expand Up @@ -301,7 +302,7 @@ public static org.apache.flink.configuration.ConfigOption<?> toFlinkOption(
}
option.withDescription(flussOption.description());
// TODO: support fallback keys in the future.
return option;
return (org.apache.flink.configuration.ConfigOption<T>) option;
}

private static Map<String, String> convertFlinkOptionsToFlussTableProperties(
Expand Down
Loading