Solved the problem of files getting stuck in the temporary folder due… #666
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
… to the lack of new messages in the topic despite the set property rotate.interval.ms. Property flush.size now also works correctly.
Problem
Occasionally I faced a problem when files were stuck in
+tmp
folder despite the set propertyrotate.interval.ms
. This situation was observed when new messages stopped falling into the topic. The parameterrotate.interval.ms
did not work, because there was no trigger that could cause a check on it.Also, while working on this problem, I was able to fix the rotation mechanism and accordingly the
flush.size
parameter. As written in the documentation: rotation can be controlled by the file collection time (rotate.interval.ms
) and/or by the number of messages in the file (flush.size
). In fact, rotation by time did not work by the file collection time, but by the time of the field for partitioning. And if one Kafka partition contained messages whose difference in values in the field by which partitioning was done was greater than the value ofrotate.interval.ms
, then rotation occurred and premature flush of the file from the temporary folder to the permanent one. This led to a large number of small files and the inability to correctly control writing using theflush.size
parameter. Now this problem is not observed.Solution
I added a check for this parameter in
HdfsSinkTask.preCommit()
. Also, I fixed the logic of rotation inTopicPartitionWriter.shouldRotateAndMaybeUpdateTimers()
method.Does this solution apply anywhere else?
Test Strategy
Testing done: