Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Co-operative Rebalancing Support for HDFS Connector (#625) #712

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

yarocher
Copy link

This is a copy of #711, but from my personal email, with which I can sign the CLA.

Problem

See #625.
If consumer.partition.assignment.strategy is set to org.apache.kafka.clients.consumer.CooperativeStickyAssignor in config/connect-distributed.properties, after a partial partition revocation (say, a new worker joins and takes over some partitions from some other worker) in a task it will be killed due to such NullPointerException:

[2024-11-15 13:46:37,190] ERROR [hdfs-sink-2|task-1] WorkerSinkTask{id=hdfs-sink-2-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:364)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:133)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
        ... 11 more

The reason for that is that during onPartitionsRevoked callback the DataWriter currently closes and removes all of its TopicPartitionWriters, probably assuming eager rebalancing.
If the consumer only gets some partitions revoked and none new assigned, the onPartitionsAssigned will be called with empty collection and the topicPartitionWriters collection will remain empty.
When new data arrives (from partitions that the consumer kept ownership of) to HdfsSinkTask#put, the NullPointerException will be thrown when accessing the topicPartitionsWriters.

Solution

Only close and remove from topicPartitionsWriters those partitions retrieved from HdfsSinkTask#close(Collection<TopicPartition>).
Note: there is a 9 years old comment explaining why should we close all of the topic partition writers, which I didn't really understand. This solution simply ignores and removes it. @ewencp, can you please review and put some comments if it would be safe to do so?

Does this solution apply anywhere else?
  • yes
  • no
If yes, where?

Test Strategy

Added HdfsSinkTaskTest#testPartialRevocation unit test simulating a partial revocation that throws NullPointerException without the fix.
Plus, tested manually (after adding a second worker (partial revocation happens) and writing some data to topic no NPE is thrown).

Testing done:
  • Unit tests
  • Integration tests
  • System tests
  • Manual tests

Release Plan

Please see a similar issue for Connect framework itself, KAFKA-12487. I was testing the fix on Kafka 7.7.2-18-ccs where this fix is already present, but haven't tested on earlier versions without it. Should I do it?

@yarocher yarocher requested a review from a team as a code owner November 22, 2024 11:08
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
✅ yarocher
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant