Skip to content

Commit

Permalink
address comment2
Browse files Browse the repository at this point in the history
  • Loading branch information
luoyuxia committed Dec 24, 2024
1 parent decc7b2 commit e64b962
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
/** Factory to create table source and table sink for Fluss. */
public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

private static final ConfigOption<ConfigOptions.MergeEngine> MERGE_ENGINE_OPTION =
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
.enumType(ConfigOptions.MergeEngine.class)
.noDefaultValue();

private volatile LakeTableFactory lakeTableFactory;

@Override
Expand Down Expand Up @@ -106,12 +111,6 @@ public DynamicTableSource createDynamicTableSource(Context context) {
throw new UnsupportedOperationException("Full lookup caching is not supported yet.");
}

ConfigOptions.MergeEngine mergeEngine =
tableOptions.get(
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
.enumType(ConfigOptions.MergeEngine.class)
.noDefaultValue());

return new FlinkTableSource(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
Expand All @@ -130,7 +129,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
key(ConfigOptions.TABLE_DATALAKE_ENABLED.key())
.booleanType()
.defaultValue(false)),
mergeEngine);
tableOptions.get(MERGE_ENGINE_OPTION));
}

@Override
Expand All @@ -143,20 +142,14 @@ public DynamicTableSink createDynamicTableSink(Context context) {
== RuntimeExecutionMode.STREAMING;

RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
ConfigOptions.MergeEngine mergeEngine =
helper.getOptions()
.get(
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
.enumType(ConfigOptions.MergeEngine.class)
.noDefaultValue());

return new FlinkTableSink(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode,
mergeEngine);
helper.getOptions().get(MERGE_ENGINE_OPTION));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.GenericRow;

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.connector.ChangelogMode;
Expand All @@ -40,7 +39,6 @@
import org.apache.flink.types.RowKind;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -116,12 +114,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
// is 0, when no column specified, it's not partial update
// see FLINK-36000
&& context.getTargetColumns().get().length != 0) {
// check partial update
if (primaryKeyIndexes.length == 0
&& context.getTargetColumns().get().length != tableRowType.getFieldCount()) {
throw new ValidationException(
"Fluss table sink does not support partial updates for table without primary key. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.");

// is partial update, check whether partial update is supported or not
if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) {
if (primaryKeyIndexes.length == 0) {
throw new ValidationException(
"Fluss table sink does not support partial updates for table without primary key. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.");
} else if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
throw new ValidationException(
String.format(
"Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.",
tablePath, ConfigOptions.MergeEngine.FIRST_ROW));
}
}
int[][] targetColumns = context.getTargetColumns().get();
targetColumnIndexes = new int[targetColumns.length];
Expand Down Expand Up @@ -291,13 +297,11 @@ private void validateUpdatableAndDeletable() {
tablePath));
}

if (mergeEngine != null && mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
String.format(
"Table %s is with merge engine '%s'. Table with '%s' merge engine doesn't support DELETE and UPDATE statements.",
tablePath,
ConfigOptions.MergeEngine.FIRST_ROW,
ConfigOptions.MergeEngine.FIRST_ROW));
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
tablePath, ConfigOptions.MergeEngine.FIRST_ROW));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -746,21 +747,22 @@ void testUnsupportedDeleteAndUpdateStmtOnPartialPK() {
}

@Test
void testUnsupportedDeleteAndUpdateStmtOnFirstRowMergeEngine() {
void testUnsupportedStmtOnFirstRowMergeEngine() {
String t1 = "firstRowMergeEngineTable";
TablePath tablePath = TablePath.of(DEFAULT_DB, t1);
tBatchEnv.executeSql(
String.format(
"create table %s ("
+ " a int not null,"
+ " b bigint not null, "
+ " b bigint null, "
+ " c string null, "
+ " primary key (a) not enforced"
+ ") with ('table.merge-engine' = 'first_row')",
t1));
assertThatThrownBy(() -> tBatchEnv.executeSql("DELETE FROM " + t1 + " WHERE a = 1").await())
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Table %s is with merge engine 'first_row'. Table with 'first_row' merge engine doesn't support DELETE and UPDATE statements.",
"Table %s uses the 'first_row' merge engine which does not support DELETE or UPDATE statements.",
tablePath);

assertThatThrownBy(
Expand All @@ -770,7 +772,18 @@ void testUnsupportedDeleteAndUpdateStmtOnFirstRowMergeEngine() {
.await())
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Table %s is with merge engine 'first_row'. Table with 'first_row' merge engine doesn't support DELETE and UPDATE statements.",
"Table %s uses the 'first_row' merge engine which does not support DELETE or UPDATE statements.",
tablePath);

assertThatThrownBy(
() ->
tBatchEnv
.executeSql("INSERT INTO " + t1 + "(a, c) VALUES(1, 'c1')")
.await())
.isInstanceOf(ValidationException.class)
.hasMessage(
"Table %s uses the 'first_row' merge engine which does not support partial updates."
+ " Please make sure the number of specified columns in INSERT INTO matches columns of the Fluss table.",
tablePath);
}

Expand Down

0 comments on commit e64b962

Please sign in to comment.