Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
luoyuxia committed Dec 23, 2024
1 parent 7f37bbb commit decc7b2
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -804,13 +804,13 @@ void testFirstRowMergeEngine() throws Exception {
RowType rowType = DATA1_SCHEMA_PK.toRowType();
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false);
int rows = 5;
int rowNum = 3;
int duplicateNum = 3;
try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) {
// first, put rows
UpsertWriter upsertWriter = table.getUpsertWriter();
List<InternalRow> expectedRows = new ArrayList<>(rows);
for (int row = 0; row < rows; row++) {
for (int num = 0; num < rowNum; num++) {
for (int num = 0; num < duplicateNum; num++) {
upsertWriter.upsert(compactedRow(rowType, new Object[] {row, "value_" + num}));
}
expectedRows.add(compactedRow(rowType, new Object[] {row, "value_0"}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public DynamicTableSource createDynamicTableSource(Context context) {
throw new UnsupportedOperationException("Full lookup caching is not supported yet.");
}

ConfigOptions.MergeEngine mergeEngine =
tableOptions.get(
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
.enumType(ConfigOptions.MergeEngine.class)
.noDefaultValue());

return new FlinkTableSource(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
Expand All @@ -124,10 +130,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
key(ConfigOptions.TABLE_DATALAKE_ENABLED.key())
.booleanType()
.defaultValue(false)),
tableOptions.get(
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
.enumType(ConfigOptions.MergeEngine.class)
.noDefaultValue()));
mergeEngine);
}

@Override
Expand All @@ -140,13 +143,20 @@ public DynamicTableSink createDynamicTableSink(Context context) {
== RuntimeExecutionMode.STREAMING;

RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
ConfigOptions.MergeEngine mergeEngine =
helper.getOptions()
.get(
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
.enumType(ConfigOptions.MergeEngine.class)
.noDefaultValue());

return new FlinkTableSink(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode);
isStreamingMode,
mergeEngine);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class FlinkTableSink
private final RowType tableRowType;
private final int[] primaryKeyIndexes;
private final boolean streaming;
@Nullable private final ConfigOptions.MergeEngine mergeEngine;

private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;
Expand All @@ -72,12 +74,14 @@ public FlinkTableSink(
Configuration flussConfig,
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming) {
boolean streaming,
@Nullable ConfigOptions.MergeEngine mergeEngine) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.primaryKeyIndexes = primaryKeyIndexes;
this.streaming = streaming;
this.mergeEngine = mergeEngine;
}

@Override
Expand Down Expand Up @@ -165,7 +169,12 @@ private List<String> columns(int[] columnIndexes) {
public DynamicTableSink copy() {
FlinkTableSink sink =
new FlinkTableSink(
tablePath, flussConfig, tableRowType, primaryKeyIndexes, streaming);
tablePath,
flussConfig,
tableRowType,
primaryKeyIndexes,
streaming,
mergeEngine);
sink.appliedUpdates = appliedUpdates;
sink.deleteRow = deleteRow;
return sink;
Expand Down Expand Up @@ -281,6 +290,15 @@ private void validateUpdatableAndDeletable() {
"Table %s is a Log Table. Log Table doesn't support DELETE and UPDATE statements.",
tablePath));
}

if (mergeEngine != null && mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
String.format(
"Table %s is with merge engine '%s'. Table with '%s' merge engine doesn't support DELETE and UPDATE statements.",
tablePath,
ConfigOptions.MergeEngine.FIRST_ROW,
ConfigOptions.MergeEngine.FIRST_ROW));
}
}

private Map<Integer, LogicalType> getPrimaryKeyTypes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,6 @@ public RowLevelModificationScanContext applyRowLevelModificationScan(
RowLevelModificationType rowLevelModificationType,
@Nullable RowLevelModificationScanContext rowLevelModificationScanContext) {
modificationScanType = rowLevelModificationType;
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
String.format(
"%s is not supported for merge engine %s",
rowLevelModificationType, mergeEngine));
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,12 @@ void testFirstRowMergeEngine() throws Exception {
tEnv.executeSql(
"create table first_row_source (a int not null primary key not enforced,"
+ " b string) with('table.merge-engine' = 'first_row')");
tEnv.executeSql("create table first_row_sink (a int, b string)");
tEnv.executeSql("create table log_sink (a int, b string)");

// insert from first_row_source to first_row_sink
// insert the primary table with first_row merge engine into the a log table to verify that
// the first_row merge engine only generates append-only stream
JobClient insertJobClient =
tEnv.executeSql("insert into first_row_sink select * from first_row_source")
tEnv.executeSql("insert into log_sink select * from first_row_source")
.getJobClient()
.get();

Expand All @@ -375,16 +376,14 @@ void testFirstRowMergeEngine() throws Exception {
"insert into first_row_source(a, b) VALUES (1, 'v1'), (2, 'v2'), (1, 'v11'), (3, 'v3')")
.await();

CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from first_row_source").collect();
CloseableIterator<Row> rowIter = tEnv.executeSql("select * from log_sink").collect();

List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3, v3]");

assertResultsIgnoreOrder(rowIter, expectedRows, false);

// insert again
tEnv.executeSql("insert into first_row_source(a, b) VALUES (3, 'v33'), ('4', 'v44')")
.await();
tEnv.executeSql("insert into first_row_source(a, b) VALUES (3, 'v33'), (4, 'v44')").await();
expectedRows = Collections.singletonList("+I[4, v44]");
assertResultsIgnoreOrder(rowIter, expectedRows, true);
insertJobClient.cancel().get();
Expand Down Expand Up @@ -746,6 +745,35 @@ void testUnsupportedDeleteAndUpdateStmtOnPartialPK() {
"Currently, Fluss table only supports UPDATE statement with conditions on primary key.");
}

@Test
void testUnsupportedDeleteAndUpdateStmtOnFirstRowMergeEngine() {
String t1 = "firstRowMergeEngineTable";
TablePath tablePath = TablePath.of(DEFAULT_DB, t1);
tBatchEnv.executeSql(
String.format(
"create table %s ("
+ " a int not null,"
+ " b bigint not null, "
+ " primary key (a) not enforced"
+ ") with ('table.merge-engine' = 'first_row')",
t1));
assertThatThrownBy(() -> tBatchEnv.executeSql("DELETE FROM " + t1 + " WHERE a = 1").await())
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Table %s is with merge engine 'first_row'. Table with 'first_row' merge engine doesn't support DELETE and UPDATE statements.",
tablePath);

assertThatThrownBy(
() ->
tBatchEnv
.executeSql("UPDATE " + t1 + " SET b = 4004 WHERE a = 1")
.await())
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Table %s is with merge engine 'first_row'. Table with 'first_row' merge engine doesn't support DELETE and UPDATE statements.",
tablePath);
}

private InsertAndExpectValues rowsToInsertInto(Collection<String> partitions) {
List<String> insertValues = new ArrayList<>();
List<String> expectedValues = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro
flussConfig,
FlinkConversions.toFlinkRowType(rowType),
tableDescriptor.getSchema().getPrimaryKeyIndexes(),
true);
true,
null);

sinkFunction =
((SinkFunctionProvider)
Expand Down

0 comments on commit decc7b2

Please sign in to comment.