Skip to content

Commit

Permalink
Merge pull request #17 from tongcheng-elong/tc-paimon-0.9-delete-by-type
Browse files Browse the repository at this point in the history
[core] when binlog event_type is 'delete',just delete this record
  • Loading branch information
wxplovecc authored Oct 24, 2024
2 parents c9a572f + fffb20a commit 3ece779
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to enable asynchronous IO writing when writing files.");

public static final ConfigOption<Boolean> SUPPORT_DELETE_BY_TYPE =
key("support-delete-by-type")
.booleanType()
.defaultValue(false)
.withDescription(
"If set to true, when binlog_eventtype value is 'delete',the row will be deleted");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -2142,6 +2149,10 @@ public boolean metadataIcebergCompatible() {
return options.get(METADATA_ICEBERG_COMPATIBLE);
}

public boolean supportDeleteByType() {
return options.get(SUPPORT_DELETE_BY_TYPE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback;
Expand All @@ -40,6 +41,7 @@
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import java.util.List;
Expand Down Expand Up @@ -165,12 +167,13 @@ public TableWriteImpl<KeyValue> newWrite(
rowType(),
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
(record, rowKind) ->
kv.replace(
record.primaryKey(),
KeyValue.UNKNOWN_SEQUENCE,
rowKind,
record.row()),
(record, rowKind) -> {
if (store().options().supportDeleteByType()) {
rowKind = getRowKindByBinlogType(record.row(), rowKind);
}
return kv.replace(
record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, record.row());
},
rowKindGenerator(),
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}
Expand All @@ -191,4 +194,16 @@ protected List<CommitCallback> createCommitCallbacks(String commitUser) {

return callbacks;
}

public RowKind getRowKindByBinlogType(InternalRow row, RowKind rowKind) {
int index = schema().logicalRowType().getFieldNames().indexOf("binlog_eventtype");
if (index < 0) {
return rowKind;
}
String binlog_eventtype = row.getString(index).toString();
if (binlog_eventtype.equalsIgnoreCase("delete")) {
return RowKind.DELETE;
}
return rowKind;
}
}

0 comments on commit 3ece779

Please sign in to comment.