Skip to content

Commit

Permalink
[core] Fix that ignore-delete hasn't handle rowkind.field (apache#3365)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored and joyCurry30 committed May 30, 2024
1 parent 4a7b0d5 commit 6e04401
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
import org.apache.paimon.table.source.InnerStreamTableScan;
Expand Down Expand Up @@ -578,6 +579,10 @@ private RollbackHelper rollbackHelper() {
store().newTagDeletion());
}

protected RowKindGenerator rowKindGenerator() {
return RowKindGenerator.create(schema(), store().options());
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ public TableWriteImpl<InternalRow> newWrite(
return new TableWriteImpl<>(
writer,
createRowKeyExtractor(),
record -> {
(record, rowKind) -> {
Preconditions.checkState(
record.row().getRowKind() == RowKind.INSERT,
rowKind.isAdd(),
"Append only writer can not accept row with RowKind %s",
record.row().getRowKind());
rowKind);
return record.row();
},
rowKindGenerator(),
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
Expand All @@ -35,13 +34,11 @@
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import java.util.List;
Expand Down Expand Up @@ -167,21 +164,17 @@ public TableWriteImpl<KeyValue> newWrite(String commitUser) {
@Override
public TableWriteImpl<KeyValue> newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
TableSchema schema = schema();
CoreOptions options = store().options();
RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options);
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
record -> {
InternalRow row = record.row();
RowKind rowKind =
rowKindGenerator == null
? row.getRowKind()
: rowKindGenerator.generate(row);
return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
},
(record, rowKind) ->
kv.replace(
record.primaryKey(),
KeyValue.UNKNOWN_SEQUENCE,
rowKind,
record.row()),
rowKindGenerator(),
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,8 @@ public static RowKindGenerator create(TableSchema schema, CoreOptions options) {
.map(field -> new RowKindGenerator(field, schema.logicalRowType()))
.orElse(null);
}

public static RowKind getRowKind(@Nullable RowKindGenerator rowKindGenerator, InternalRow row) {
return rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Restorable;

import javax.annotation.Nullable;
Expand All @@ -49,6 +50,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State
private final FileStoreWrite<T> write;
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
private final RecordExtractor<T> recordExtractor;
@Nullable private final RowKindGenerator rowKindGenerator;
private final boolean ignoreDelete;

private boolean batchCommitted = false;
Expand All @@ -58,10 +60,12 @@ public TableWriteImpl(
FileStoreWrite<T> write,
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
RecordExtractor<T> recordExtractor,
@Nullable RowKindGenerator rowKindGenerator,
boolean ignoreDelete) {
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
this.rowKindGenerator = rowKindGenerator;
this.ignoreDelete = ignoreDelete;
}

Expand Down Expand Up @@ -128,21 +132,23 @@ public void write(InternalRow row, int bucket) throws Exception {

@Nullable
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
if (ignoreDelete && row.getRowKind().isRetract()) {
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return null;
}
SinkRecord record = toSinkRecord(row);
write.write(record.partition(), record.bucket(), recordExtractor.extract(record));
write.write(record.partition(), record.bucket(), recordExtractor.extract(record, rowKind));
return record;
}

@Nullable
public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception {
if (ignoreDelete && row.getRowKind().isRetract()) {
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return null;
}
SinkRecord record = toSinkRecord(row, bucket);
write.write(record.partition(), bucket, recordExtractor.extract(record));
write.write(record.partition(), bucket, recordExtractor.extract(record, rowKind));
return record;
}

Expand Down Expand Up @@ -231,6 +237,6 @@ public FileStoreWrite<T> getWrite() {
/** Extractor to extract {@link T} from the {@link SinkRecord}. */
public interface RecordExtractor<T> {

T extract(SinkRecord record);
T extract(SinkRecord record, RowKind rowKind);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ public void processElement(StreamRecord<InternalRow> record) throws Exception {
recordCount++;
InternalRow row = record.getValue();

RowKind rowKind =
rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row);
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,22 @@ public void testIgnoreDelete() {
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B"));
}

@Test
public void testIgnoreDeleteWithRowKindField() {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING, kind STRING) "
+ "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')");

sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I"));

sql("INSERT INTO ignore_delete VALUES (1, 'A', '-D')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I"));

sql("INSERT INTO ignore_delete VALUES (1, 'B', '+I')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B", "+I"));
}

@Test
public void testDeleteWithPkLookup() throws Exception {
sql(
Expand Down

0 comments on commit 6e04401

Please sign in to comment.