diff --git a/docs/content/concepts/primary-key-table.md b/docs/content/concepts/primary-key-table.md
index 8c3b695168ac..eb3ade03576c 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -458,3 +458,10 @@ of sequence number will be made up to microsecond by system.
3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add the micro to the second, and then
pad the row kind flag.
+
+## Row Kind Field
+
+By default, the primary key table determines the row kind according to the input row. You can also define the
+`'rowkind.field'` to use a field to extract row kind.
+
+The valid row kind string should be `'+I'`, `'-U'`, `'+U'` or `'-D'`.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index a6024d99ae40..82a8fc798b14 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -419,6 +419,12 @@
Integer |
Read batch size for orc and parquet. |
+
+ rowkind.field |
+ (none) |
+ String |
+ The field that generates the row kind for primary key table, the row kind determines which data is '+I', '-U', '+U' or '-D'. |
+
scan.bounded.watermark |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 7a24523e9c5b..4274ea8d2b9e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -427,6 +427,15 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");
+ @Immutable
+ public static final ConfigOption ROWKIND_FIELD =
+ key("rowkind.field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The field that generates the row kind for primary key table,"
+ + " the row kind determines which data is '+I', '-U', '+U' or '-D'.");
+
public static final ConfigOption SEQUENCE_AUTO_PADDING =
key("sequence.auto-padding")
.stringType()
@@ -1311,6 +1320,10 @@ public Optional sequenceField() {
return options.getOptional(SEQUENCE_FIELD);
}
+ public Optional rowkindField() {
+ return options.getOptional(ROWKIND_FIELD);
+ }
+
public List sequenceAutoPadding() {
String padding = options.get(SEQUENCE_AUTO_PADDING);
if (padding == null) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java b/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java
index 6903ab52483f..17096dfbfc6f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java
@@ -128,4 +128,25 @@ public static RowKind fromByteValue(byte value) {
"Unsupported byte value '" + value + "' for row kind.");
}
}
+
+ /**
+ * Creates a {@link RowKind} from the given short string.
+ *
+ * @see #shortString() for mapping of string and {@link RowKind}.
+ */
+ public static RowKind fromShortString(String value) {
+ switch (value.toUpperCase()) {
+ case "+I":
+ return INSERT;
+ case "-U":
+ return UPDATE_BEFORE;
+ case "+U":
+ return UPDATE_AFTER;
+ case "-D":
+ return DELETE;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported short string '" + value + "' for row kind.");
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index c4c13b5c77c3..538cff2dd9d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -170,6 +170,15 @@ public static void validateTableSchema(TableSchema schema) {
schema.fieldNames().contains(field),
"Nonexistent sequence field: '%s'",
field));
+
+ Optional rowkindField = options.rowkindField();
+ rowkindField.ifPresent(
+ field ->
+ checkArgument(
+ schema.fieldNames().contains(field),
+ "Nonexistent rowkind field: '%s'",
+ field));
+
sequenceField.ifPresent(
field ->
checkArgument(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index feb643937b22..84a586066995 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -36,6 +36,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.InnerTableRead;
@@ -43,6 +44,7 @@
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
+import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import java.util.List;
@@ -181,22 +183,25 @@ public TableWriteImpl newWrite(String commitUser) {
@Override
public TableWriteImpl newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
- final SequenceGenerator sequenceGenerator =
- SequenceGenerator.create(schema(), store().options());
+ TableSchema schema = schema();
+ CoreOptions options = store().options();
+ final SequenceGenerator sequenceGenerator = SequenceGenerator.create(schema, options);
+ final RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options);
final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
record -> {
+ InternalRow row = record.row();
long sequenceNumber =
sequenceGenerator == null
? KeyValue.UNKNOWN_SEQUENCE
- : sequenceGenerator.generate(record.row());
- return kv.replace(
- record.primaryKey(),
- sequenceNumber,
- record.row().getRowKind(),
- record.row());
+ : sequenceGenerator.generate(row);
+ RowKind rowKind =
+ rowKindGenerator == null
+ ? row.getRowKind()
+ : rowKindGenerator.generate(row);
+ return kv.replace(record.primaryKey(), sequenceNumber, rowKind, row);
});
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java
new file mode 100644
index 000000000000..2735339b419b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import static org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Generate row kind. */
+public class RowKindGenerator {
+
+ private final int index;
+
+ public RowKindGenerator(String field, RowType rowType) {
+ this.index = rowType.getFieldNames().indexOf(field);
+ if (index == -1) {
+ throw new RuntimeException(
+ String.format("Can not find rowkind %s in table schema: %s", field, rowType));
+ }
+ DataType fieldType = rowType.getTypeAt(index);
+ checkArgument(
+ fieldType.is(CHARACTER_STRING),
+ "only support string type for rowkind, but %s is %s",
+ field,
+ fieldType);
+ }
+
+ public RowKind generate(InternalRow row) {
+ if (row.isNullAt(index)) {
+ throw new RuntimeException("Row kind cannot be null.");
+ }
+ return RowKind.fromShortString(row.getString(index).toString());
+ }
+
+ @Nullable
+ public static RowKindGenerator create(TableSchema schema, CoreOptions options) {
+ return options.rowkindField()
+ .map(field -> new RowKindGenerator(field, schema.logicalRowType()))
+ .orElse(null);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
index 293c9df54152..4fe0315aa45a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
@@ -85,6 +85,7 @@ public SequenceGenerator(int index, DataType dataType) {
generator = fieldType.accept(new SequenceGeneratorVisitor());
}
+ @Nullable
public static SequenceGenerator create(TableSchema schema, CoreOptions options) {
List sequenceAutoPadding =
options.sequenceAutoPadding().stream()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index e85c155d28ec..8a37b03bf5ed 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -30,6 +30,7 @@
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
+import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
@@ -62,6 +63,7 @@ public class LocalMergeOperator extends AbstractStreamOperator
private transient long recordCount;
private transient SequenceGenerator sequenceGenerator;
+ private transient RowKindGenerator rowKindGenerator;
private transient MergeFunction mergeFunction;
private transient SortBufferWriteBuffer buffer;
@@ -92,6 +94,7 @@ public void open() throws Exception {
recordCount = 0;
sequenceGenerator = SequenceGenerator.create(schema, options);
+ rowKindGenerator = RowKindGenerator.create(schema, options);
mergeFunction =
PrimaryKeyTableUtils.createMergeFunctionFactory(
schema,
@@ -131,7 +134,8 @@ public void processElement(StreamRecord record) throws Exception {
recordCount++;
InternalRow row = record.getValue();
- RowKind rowKind = row.getRowKind();
+ RowKind rowKind =
+ rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row);
// row kind must be INSERT when it is divided into key and value
row.setRowKind(RowKind.INSERT);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 00c7b4aa4177..fae1f5acf70a 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -357,4 +357,15 @@ public void testDynamicPartitionPruningOnTwoFactTables() {
assertThat(tEnv.explainSql(joinSql)).contains("DynamicFilteringDataCollector");
assertThat(sql(joinSql).toString()).isEqualTo(expected2);
}
+
+ @Test
+ public void testRowKindField() {
+ sql(
+ "CREATE TABLE R_T (pk INT PRIMARY KEY NOT ENFORCED, v INT, rf STRING) "
+ + "WITH ('rowkind.field'='rf')");
+ sql("INSERT INTO R_T VALUES (1, 1, '+I')");
+ assertThat(sql("SELECT * FROM R_T")).containsExactly(Row.of(1, 1, "+I"));
+ sql("INSERT INTO R_T VALUES (1, 2, '-D')");
+ assertThat(sql("SELECT * FROM R_T")).isEmpty();
+ }
}