Skip to content

Commit

Permalink
[core] when binlog event_type is 'delete',just delete this record
Browse files Browse the repository at this point in the history
  • Loading branch information
pongandnoon committed Nov 1, 2023
1 parent 4f8faeb commit f7e81aa
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,12 @@ public class CoreOptions implements Serializable {
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(10))
.withDescription("The threshold for read file async.");
public static final ConfigOption<Boolean> SUPPORT_DELETE_BY_TYPE =
key("support-delete-by-type")
.booleanType()
.defaultValue(false)
.withDescription(
"If set to true, when binlog_eventtype value is 'delete',the row will be deleted");

private final Options options;

Expand Down Expand Up @@ -1352,6 +1358,11 @@ public int varTypeSize() {
return options.get(ZORDER_VAR_LENGTH_CONTRIBUTION);
}

public boolean supportDeleteByType() {
return options.get(SUPPORT_DELETE_BY_TYPE);
}


/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import java.util.List;
Expand Down Expand Up @@ -195,6 +197,9 @@ public TableWriteImpl<KeyValue> newWrite(
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
record -> {
if(store().options().supportDeleteByType()){
setRowKindByBinlogType(record.row());
}
long sequenceNumber =
sequenceGenerator == null
? KeyValue.UNKNOWN_SEQUENCE
Expand All @@ -206,4 +211,16 @@ record -> {
record.row());
});
}

public void setRowKindByBinlogType(InternalRow row){
RowType rowType = schema().logicalRowType();
int index = rowType.getFieldNames().indexOf("binlog_eventtype");
if(index <0){
return;
}
String binlog_eventtype = row.getString(index).toString();
if(binlog_eventtype.equalsIgnoreCase("delete")){
row.setRowKind(RowKind.DELETE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public abstract class AbstractFlinkTableFactory

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
System.out.println("createDynamicTableSource:2023-09-07");
System.out.println("createDynamicTableSource:2023-10-25");
CatalogTable origin = context.getCatalogTable().getOrigin();
boolean isStreamingMode =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
Expand Down Expand Up @@ -124,7 +124,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
System.out.println("createDynamicTableSink:2023-09-07");
System.out.println("createDynamicTableSink:2023-10-25");
Table table = buildPaimonTable(context);
if (table instanceof FileStoreTable) {
storeTableLineage(
Expand Down

0 comments on commit f7e81aa

Please sign in to comment.