Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] fix delete record exception after partial-update merge engine configuration to lookup changelog producer #4345

Merged
merged 1 commit into from
Oct 29, 2024

Conversation

zhuangchong
Copy link
Contributor

Purpose

Linked issue: close #4344

Tests

PartialUpdateITCase#testRemoveRecordOnDelete()

API and Format

Documentation

@@ -120,7 +120,6 @@ public void add(KeyValue kv) {
currentDeleteRow = true;
row = new GenericRow(getters.length);
}
// ignore -U records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete this?

}
return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row);

RowKind rowKind = currentDeleteRow ? RowKind.DELETE : RowKind.INSERT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why here don't return null could avoid NPE in :
"Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)"
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the change, it returned null.

exception:

Caused by: java.io.IOException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:234)
at org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:123)
at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100)
at org.apache.paimon.flink.sink.PrepareCommitOperator.endInput(PrepareCommitOperator.java:88)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.paimon.compact.CompactFutureManager.obtainCompactResult(CompactFutureManager.java:67)
at org.apache.paimon.compact.CompactFutureManager.innerGetCompactionResult(CompactFutureManager.java:53)
at org.apache.paimon.mergetree.compact.MergeTreeCompactManager.getCompactionResult(MergeTreeCompactManager.java:220)
at org.apache.paimon.mergetree.MergeTreeWriter.trySyncLatestCompaction(MergeTreeWriter.java:327)
at org.apache.paimon.mergetree.MergeTreeWriter.prepareCommit(MergeTreeWriter.java:276)
at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:207)
at org.apache.paimon.operation.MemoryFileStoreWrite.prepareCommit(MemoryFileStoreWrite.java:151)
at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:253)
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:229)
... 15 more

sql(
"CREATE TABLE remove_record_on_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'partial-update',"
+ " 'partial-update.remove-record-on-delete' = 'true'"
+ " 'partial-update.remove-record-on-delete' = 'true',"
+ " 'changelog-producer' = 'lookup'"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a new case for lookup?

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit a5b1236 into apache:master Oct 29, 2024
12 checks passed
@zhuangchong zhuangchong deleted the partial-updat-delete branch October 30, 2024 01:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Delete record exception after partial-update merge engine configuration to lookup changelog producer
3 participants