Skip to content

Commit

Permalink
[core] Introduce 'rowkind.field' to define rowkind from data (#2476)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Dec 11, 2023
1 parent 9b80042 commit b70d1f7
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 9 deletions.
7 changes: 7 additions & 0 deletions docs/content/concepts/primary-key-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,10 @@ of sequence number will be made up to microsecond by system.

3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add the micro to the second, and then
pad the row kind flag.

## Row Kind Field

By default, the primary key table determines the row kind according to the input row. You can also define the
`'rowkind.field'` to use a field to extract row kind.

The valid row kind string should be `'+I'`, `'-U'`, `'+U'` or `'-D'`.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@
<td>Integer</td>
<td>Read batch size for orc and parquet.</td>
</tr>
<tr>
<td><h5>rowkind.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The field that generates the row kind for primary key table, the row kind determines which data is '+I', '-U', '+U' or '-D'.</td>
</tr>
<tr>
<td><h5>scan.bounded.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
13 changes: 13 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,15 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");

@Immutable
public static final ConfigOption<String> ROWKIND_FIELD =
key("rowkind.field")
.stringType()
.noDefaultValue()
.withDescription(
"The field that generates the row kind for primary key table,"
+ " the row kind determines which data is '+I', '-U', '+U' or '-D'.");

public static final ConfigOption<String> SEQUENCE_AUTO_PADDING =
key("sequence.auto-padding")
.stringType()
Expand Down Expand Up @@ -1311,6 +1320,10 @@ public Optional<String> sequenceField() {
return options.getOptional(SEQUENCE_FIELD);
}

public Optional<String> rowkindField() {
return options.getOptional(ROWKIND_FIELD);
}

public List<String> sequenceAutoPadding() {
String padding = options.get(SEQUENCE_AUTO_PADDING);
if (padding == null) {
Expand Down
21 changes: 21 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/RowKind.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,25 @@ public static RowKind fromByteValue(byte value) {
"Unsupported byte value '" + value + "' for row kind.");
}
}

/**
* Creates a {@link RowKind} from the given short string.
*
* @see #shortString() for mapping of string and {@link RowKind}.
*/
public static RowKind fromShortString(String value) {
switch (value.toUpperCase()) {
case "+I":
return INSERT;
case "-U":
return UPDATE_BEFORE;
case "+U":
return UPDATE_AFTER;
case "-D":
return DELETE;
default:
throw new UnsupportedOperationException(
"Unsupported short string '" + value + "' for row kind.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ public static void validateTableSchema(TableSchema schema) {
schema.fieldNames().contains(field),
"Nonexistent sequence field: '%s'",
field));

Optional<String> rowkindField = options.rowkindField();
rowkindField.ifPresent(
field ->
checkArgument(
schema.fieldNames().contains(field),
"Nonexistent rowkind field: '%s'",
field));

sequenceField.ifPresent(
field ->
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import java.util.List;
Expand Down Expand Up @@ -181,22 +183,25 @@ public TableWriteImpl<KeyValue> newWrite(String commitUser) {
@Override
public TableWriteImpl<KeyValue> newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
final SequenceGenerator sequenceGenerator =
SequenceGenerator.create(schema(), store().options());
TableSchema schema = schema();
CoreOptions options = store().options();
final SequenceGenerator sequenceGenerator = SequenceGenerator.create(schema, options);
final RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options);
final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
record -> {
InternalRow row = record.row();
long sequenceNumber =
sequenceGenerator == null
? KeyValue.UNKNOWN_SEQUENCE
: sequenceGenerator.generate(record.row());
return kv.replace(
record.primaryKey(),
sequenceNumber,
record.row().getRowKind(),
record.row());
: sequenceGenerator.generate(row);
RowKind rowKind =
rowKindGenerator == null
? row.getRowKind()
: rowKindGenerator.generate(row);
return kv.replace(record.primaryKey(), sequenceNumber, rowKind, row);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import static org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Generate row kind. */
public class RowKindGenerator {

private final int index;

public RowKindGenerator(String field, RowType rowType) {
this.index = rowType.getFieldNames().indexOf(field);
if (index == -1) {
throw new RuntimeException(
String.format("Can not find rowkind %s in table schema: %s", field, rowType));
}
DataType fieldType = rowType.getTypeAt(index);
checkArgument(
fieldType.is(CHARACTER_STRING),
"only support string type for rowkind, but %s is %s",
field,
fieldType);
}

public RowKind generate(InternalRow row) {
if (row.isNullAt(index)) {
throw new RuntimeException("Row kind cannot be null.");
}
return RowKind.fromShortString(row.getString(index).toString());
}

@Nullable
public static RowKindGenerator create(TableSchema schema, CoreOptions options) {
return options.rowkindField()
.map(field -> new RowKindGenerator(field, schema.logicalRowType()))
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public SequenceGenerator(int index, DataType dataType) {
generator = fieldType.accept(new SequenceGeneratorVisitor());
}

@Nullable
public static SequenceGenerator create(TableSchema schema, CoreOptions options) {
List<SequenceAutoPadding> sequenceAutoPadding =
options.sequenceAutoPadding().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class LocalMergeOperator extends AbstractStreamOperator<InternalRow>

private transient long recordCount;
private transient SequenceGenerator sequenceGenerator;
private transient RowKindGenerator rowKindGenerator;
private transient MergeFunction<KeyValue> mergeFunction;

private transient SortBufferWriteBuffer buffer;
Expand Down Expand Up @@ -92,6 +94,7 @@ public void open() throws Exception {

recordCount = 0;
sequenceGenerator = SequenceGenerator.create(schema, options);
rowKindGenerator = RowKindGenerator.create(schema, options);
mergeFunction =
PrimaryKeyTableUtils.createMergeFunctionFactory(
schema,
Expand Down Expand Up @@ -131,7 +134,8 @@ public void processElement(StreamRecord<InternalRow> record) throws Exception {
recordCount++;
InternalRow row = record.getValue();

RowKind rowKind = row.getRowKind();
RowKind rowKind =
rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row);
// row kind must be INSERT when it is divided into key and value
row.setRowKind(RowKind.INSERT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,15 @@ public void testDynamicPartitionPruningOnTwoFactTables() {
assertThat(tEnv.explainSql(joinSql)).contains("DynamicFilteringDataCollector");
assertThat(sql(joinSql).toString()).isEqualTo(expected2);
}

@Test
public void testRowKindField() {
sql(
"CREATE TABLE R_T (pk INT PRIMARY KEY NOT ENFORCED, v INT, rf STRING) "
+ "WITH ('rowkind.field'='rf')");
sql("INSERT INTO R_T VALUES (1, 1, '+I')");
assertThat(sql("SELECT * FROM R_T")).containsExactly(Row.of(1, 1, "+I"));
sql("INSERT INTO R_T VALUES (1, 2, '-D')");
assertThat(sql("SELECT * FROM R_T")).isEmpty();
}
}

0 comments on commit b70d1f7

Please sign in to comment.