Skip to content

Commit

Permalink
[kv] Support first row merge engine
Browse files Browse the repository at this point in the history
  • Loading branch information
luoyuxia committed Dec 20, 2024
1 parent 9255d03 commit 7f37bbb
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.client.admin.ClientToServerITCaseBase;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.client.scanner.log.LogScan;
import com.alibaba.fluss.client.scanner.log.LogScanner;
import com.alibaba.fluss.client.scanner.log.ScanRecords;
import com.alibaba.fluss.client.table.writer.AppendWriter;
Expand Down Expand Up @@ -790,4 +791,59 @@ void testInvalidColumnProjection() throws Exception {
"Only ARROW log format supports column projection, but the log format "
+ "of table 'test_db_1.test_non_pk_table_1' is INDEXED");
}

@Test
void testFirstRowMergeEngine() throws Exception {
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA1_SCHEMA_PK)
.property(
ConfigOptions.TABLE_MERGE_ENGINE,
ConfigOptions.MergeEngine.FIRST_ROW)
.build();
RowType rowType = DATA1_SCHEMA_PK.toRowType();
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false);
int rows = 5;
int rowNum = 3;
try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) {
// first, put rows
UpsertWriter upsertWriter = table.getUpsertWriter();
List<InternalRow> expectedRows = new ArrayList<>(rows);
for (int row = 0; row < rows; row++) {
for (int num = 0; num < rowNum; num++) {
upsertWriter.upsert(compactedRow(rowType, new Object[] {row, "value_" + num}));
}
expectedRows.add(compactedRow(rowType, new Object[] {row, "value_0"}));
}
upsertWriter.flush();

// now, get rows by lookup
for (int row = 0; row < rows; row++) {
InternalRow gotRow =
table.lookup(keyRow(DATA1_SCHEMA_PK, new Object[] {row, "dumpy"}))
.get()
.getRow();
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(row));
}

// check scan change log
LogScanner logScanner = table.getLogScanner(new LogScan());
logScanner.subscribeFromBeginning(0);

List<ScanRecord> actualLogRecords = new ArrayList<>(0);
while (actualLogRecords.size() < rows) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
scanRecords.forEach(actualLogRecords::add);
}

assertThat(actualLogRecords).hasSize(rows);
for (int i = 0; i < actualLogRecords.size(); i++) {
ScanRecord scanRecord = actualLogRecords.get(i);
assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT);
assertThatRow(scanRecord.getRow())
.withSchema(rowType)
.isEqualTo(expectedRows.get(i));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,12 @@ public class ConfigOptions {
+ "When this option is set to ture and the datalake tiering service is up,"
+ " the table will be tiered and compacted into datalake format stored on lakehouse storage.");

public static final ConfigOption<MergeEngine> TABLE_MERGE_ENGINE =
key("table.merge-engine")
.enumType(MergeEngine.class)
.noDefaultValue()
.withDescription("The merge engine for the primary key table.");

// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1238,4 +1244,20 @@ public enum CompressionType {
LZ4,
ZSTD
}

/** The merge engine for primary key table. */
public enum MergeEngine {
FIRST_ROW("first_row");

private final String value;

MergeEngine(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ && getLogFormat() != LogFormat.ARROW) {
throw new IllegalArgumentException(
"For Primary Key Table, if kv format is compacted, log format must be arrow.");
}

if (!hasPrimaryKey() && getMergeEngine() != null) {
throw new IllegalArgumentException(
"Merge-engine is only supported in primary key table.");
}
}

/** Creates a builder for building table descriptor. */
Expand Down Expand Up @@ -244,6 +249,10 @@ public boolean isDataLakeEnabled() {
return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED);
}

public @Nullable ConfigOptions.MergeEngine getMergeEngine() {
return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE);
}

public TableDescriptor copy(Map<String, String> newProperties) {
return new TableDescriptor(
schema, comment, partitionKeys, tableDistribution, newProperties, customProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions.get(
key(ConfigOptions.TABLE_DATALAKE_ENABLED.key())
.booleanType()
.defaultValue(false)));
.defaultValue(false)),
tableOptions.get(
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
.enumType(ConfigOptions.MergeEngine.class)
.noDefaultValue()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.fluss.connector.flink.source;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.FlinkConnectorOptions;
import com.alibaba.fluss.connector.flink.source.enumerator.initializer.OffsetsInitializer;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class FlinkTableSource

private final long scanPartitionDiscoveryIntervalMs;
private final boolean isDataLakeEnabled;
@Nullable private final ConfigOptions.MergeEngine mergeEngine;

// output type after projection pushdown
private LogicalType producedDataType;
Expand Down Expand Up @@ -130,7 +132,8 @@ public FlinkTableSource(
boolean lookupAsync,
@Nullable LookupCache cache,
long scanPartitionDiscoveryIntervalMs,
boolean isDataLakeEnabled) {
boolean isDataLakeEnabled,
@Nullable ConfigOptions.MergeEngine mergeEngine) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableOutputType = tableOutputType;
Expand All @@ -146,6 +149,7 @@ public FlinkTableSource(

this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
this.isDataLakeEnabled = isDataLakeEnabled;
this.mergeEngine = mergeEngine;
}

@Override
Expand All @@ -155,7 +159,11 @@ public ChangelogMode getChangelogMode() {
} else {
if (hasPrimaryKey()) {
// pk table
return ChangelogMode.all();
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
return ChangelogMode.insertOnly();
} else {
return ChangelogMode.all();
}
} else {
// append only
return ChangelogMode.insertOnly();
Expand Down Expand Up @@ -328,7 +336,8 @@ public DynamicTableSource copy() {
lookupAsync,
cache,
scanPartitionDiscoveryIntervalMs,
isDataLakeEnabled);
isDataLakeEnabled,
mergeEngine);
source.producedDataType = producedDataType;
source.projectedFields = projectedFields;
source.singleRowFilter = singleRowFilter;
Expand Down Expand Up @@ -396,6 +405,12 @@ public RowLevelModificationScanContext applyRowLevelModificationScan(
RowLevelModificationType rowLevelModificationType,
@Nullable RowLevelModificationScanContext rowLevelModificationScanContext) {
modificationScanType = rowLevelModificationType;
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
String.format(
"%s is not supported for merge engine %s",
rowLevelModificationType, mergeEngine));
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -356,6 +357,39 @@ void testPartialUpsert() throws Exception {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}

@Test
void testFirstRowMergeEngine() throws Exception {
tEnv.executeSql(
"create table first_row_source (a int not null primary key not enforced,"
+ " b string) with('table.merge-engine' = 'first_row')");
tEnv.executeSql("create table first_row_sink (a int, b string)");

// insert from first_row_source to first_row_sink
JobClient insertJobClient =
tEnv.executeSql("insert into first_row_sink select * from first_row_source")
.getJobClient()
.get();

// insert once
tEnv.executeSql(
"insert into first_row_source(a, b) VALUES (1, 'v1'), (2, 'v2'), (1, 'v11'), (3, 'v3')")
.await();

CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from first_row_source").collect();

List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3, v3]");

assertResultsIgnoreOrder(rowIter, expectedRows, false);

// insert again
tEnv.executeSql("insert into first_row_source(a, b) VALUES (3, 'v33'), ('4', 'v44')")
.await();
expectedRows = Collections.singletonList("+I[4, v44]");
assertResultsIgnoreOrder(rowIter, expectedRows, true);
insertJobClient.cancel().get();
}

@Test
void testInsertWithoutSpecifiedCols() {
tEnv.executeSql("create table sink_insert_all (a int, b bigint, c string)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.io.File;
Expand Down Expand Up @@ -130,12 +131,14 @@ public void shutdown() {
* @param tableBucket the table bucket
* @param logTablet the cdc log tablet of the kv tablet
* @param kvFormat the kv format
* @param mergeEngine the merge engine
*/
public KvTablet getOrCreateKv(
PhysicalTablePath tablePath,
TableBucket tableBucket,
LogTablet logTablet,
KvFormat kvFormat)
KvFormat kvFormat,
@Nullable ConfigOptions.MergeEngine mergeEngine)
throws Exception {
return inLock(
tabletCreationOrDeletionLock,
Expand All @@ -153,7 +156,8 @@ public KvTablet getOrCreateKv(
conf,
arrowBufferAllocator,
memorySegmentPool,
kvFormat);
kvFormat,
mergeEngine);
currentKvs.put(tableBucket, tablet);

LOG.info(
Expand Down Expand Up @@ -254,7 +258,8 @@ public KvTablet loadKv(File tabletDir) throws Exception {
conf,
arrowBufferAllocator,
memorySegmentPool,
tableDescriptor.getKvFormat());
tableDescriptor.getKvFormat(),
tableDescriptor.getMergeEngine());
if (this.currentKvs.containsKey(tableBucket)) {
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public final class KvTablet {
private final ReadWriteLock kvLock = new ReentrantReadWriteLock();
private final LogFormat logFormat;
private final KvFormat kvFormat;
private final @Nullable ConfigOptions.MergeEngine mergeEngine;

/**
* The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been
Expand All @@ -122,7 +123,8 @@ private KvTablet(
LogFormat logFormat,
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat) {
KvFormat kvFormat,
@Nullable ConfigOptions.MergeEngine mergeEngine) {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
this.logTablet = logTablet;
Expand All @@ -136,6 +138,7 @@ private KvTablet(
// TODO: [FLUSS-58674883] share cache in server level when PartialUpdater is thread-safe
this.partialUpdaterCache = new PartialUpdaterCache();
this.kvFormat = kvFormat;
this.mergeEngine = mergeEngine;
}

public static KvTablet create(
Expand All @@ -144,7 +147,8 @@ public static KvTablet create(
Configuration conf,
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat)
KvFormat kvFormat,
@Nullable ConfigOptions.MergeEngine mergeEngine)
throws IOException {
Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
FlussPaths.parseTabletDir(kvTabletDir);
Expand All @@ -156,7 +160,8 @@ public static KvTablet create(
conf,
arrowBufferAllocator,
memorySegmentPool,
kvFormat);
kvFormat,
mergeEngine);
}

public static KvTablet create(
Expand All @@ -167,7 +172,8 @@ public static KvTablet create(
Configuration conf,
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat)
KvFormat kvFormat,
@Nullable ConfigOptions.MergeEngine mergeEngine)
throws IOException {
RocksDBKv kv = buildRocksDBKv(conf, kvTabletDir);
return new KvTablet(
Expand All @@ -180,7 +186,8 @@ public static KvTablet create(
logTablet.getLogFormat(),
arrowBufferAllocator,
memorySegmentPool,
kvFormat);
kvFormat,
mergeEngine);
}

private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir)
Expand Down Expand Up @@ -265,14 +272,18 @@ public LogAppendInfo putAsLeader(
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
if (kvRecord.getRow() == null) {
// kv tablet
// it's for deletion
byte[] oldValue = getFromBufferOrKv(key);
if (oldValue == null) {
// there might be large amount of such deletion, so we don't log
LOG.debug(
"The specific key can't be found in kv tablet although the kv record is for deletion, "
+ "ignore it directly as it doesn't exist in the kv tablet yet.");
} else {
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
// if the merge engine is first row, skip the deletion
continue;
}
BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row;
BinaryRow newRow = deleteRow(oldRow, partialUpdater);
// if newRow is null, it means the row should be deleted
Expand All @@ -297,6 +308,10 @@ public LogAppendInfo putAsLeader(
byte[] oldValue = getFromBufferOrKv(key);
// it's update
if (oldValue != null) {
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
// if the merge engine is first row, skip the update
continue;
}
BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row;
BinaryRow newRow =
updateRow(oldRow, kvRecord.getRow(), partialUpdater);
Expand Down
Loading

0 comments on commit 7f37bbb

Please sign in to comment.