Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Co-operative Rebalancing Support for HDFS Connector #711

Closed
wants to merge 1 commit into from

Conversation

ychernysh
Copy link

Problem

See #625.
If consumer.partition.assignment.strategy is set to org.apache.kafka.clients.consumer.CooperativeStickyAssignor in config/connect-distributed.properties, after a partial partition revocation (say, a new worker joins and takes over some partitions from some other worker) in a task it will be killed due to such NullPointerException:

[2024-11-15 13:46:37,190] ERROR [hdfs-sink-2|task-1] WorkerSinkTask{id=hdfs-sink-2-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:364)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:133)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
        ... 11 more

The reason for that is that during onPartitionsRevoked callback the DataWriter currently closes and removes all of its TopicPartitionWriters, probably assuming eager rebalancing.
If the consumer only gets some partitions revoked and none new assigned, the onPartitionsAssigned will be called with empty collection and the topicPartitionWriters collection will remain empty.
When new data arrives (from partitions that the consumer kept ownership of) to HdfsSinkTask#put, the NullPointerException will be thrown when accessing the topicPartitionsWriters.

Solution

Only close and remove from topicPartitionsWriters those partitions retrieved from HdfsSinkTask#close(Collection<TopicPartition>).
Note: there is a 9 years old comment explaining why should we close all of the topic partition writers, which I didn't really understand. This solution simply ignores and removes it. @ewencp, can you please review and put some comments if it would be safe to do so?

Does this solution apply anywhere else?
  • yes
  • no
If yes, where?

Test Strategy

Added HdfsSinkTaskTest#testPartialRevocation unit test simulating a partial revocation that throws NullPointerException without the fix.
Plus, tested manually (after adding a second worker (partial revocation happens) and writing some data to topic no NPE is thrown).

Testing done:
  • Unit tests
  • Integration tests
  • System tests
  • Manual tests

Release Plan

Please see a similar issue for Connect framework itself, KAFKA-12487. I was testing the fix on Kafka 7.7.2-18-ccs where this fix is already present, but haven't tested on earlier versions without it. Should I do it?

@ychernysh ychernysh requested a review from a team as a code owner November 15, 2024 16:07
@confluent-cla-assistant
Copy link

Please sign the Contributor License Agreement here before this PR can be approved.
❌ ychernysh
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@yarocher
Copy link

yarocher commented Nov 22, 2024

Please close this one. I am reopening it as #712 from my personal account, where I can sign the CLA. Please further use that PR. Sorry for the inconvenience...

@ychernysh ychernysh closed this Nov 22, 2024
@ychernysh
Copy link
Author

Closed this one. Please see #712

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants