-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Generate Idle Watermark if the source is idling #1385
Conversation
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
a62f187
to
e8dec81
Compare
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
47260a3
to
f835af0
Compare
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
239cefc
to
8b05029
Compare
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
threshold: 5s | ||
incrementBy: 3s | ||
stepInterval: 2s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some comments on these numbers on how they are used to test source idling? The comment should make it clear why removing these configurations will break the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am un-resolving this one. We configured the idle source, updated the simple keyed reduce test with a new start time and new wait time before sending next batch.
I have a hard time understanding how this test works in terms of verifying idle source works. Can we write some comments on why this test is testing the source idling functionality? How these numbers 5s
, 3s
, 2s
, startTime := 100,
time.Sleep(10 * time.Millisecond)and
WithTimeout(120*time.Second)` work together to achieve idle source test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a unit test of idle handler?
return 0 | ||
// Partitions returns the partitions from which the source is reading. | ||
func (r *kafkaSource) Partitions() []int32 { | ||
for topic, partitions := range r.handler.sess.Claims() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please make sure this doesn't involve a n/w call. i am quite sure it wouldn't.
readMessages, err := u.sourceApplier.ApplyReadFn(ctx, count, u.readTimeout) | ||
if err != nil { | ||
return nil, err | ||
} | ||
for _, msg := range readMessages { | ||
if _, ok := u.partitions[msg.ReadOffset.PartitionIdx()]; !ok { | ||
u.partitions[msg.ReadOffset.PartitionIdx()] = struct{}{} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not right, we only append by never remove. we will have to ask users to add Partitions()
to ud-source interface.
@KeranYang WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is not right. With this change we now have two sources of truth on how many partitions a source has. One is the read batch and the other is the source Partitions() interface. Should stick with the second one.
The removing part can be tricky because if we remove a certain partition, we also need to remove the corresponding watermark KV store etc. We can look into details as part of issue 1405 that you created.
sourcePublishWM := NewPublish(sp.ctx, processorEntity, sp.srcPublishWMStores, 1, IsSource(), WithDelay(sp.opts.delay)) | ||
sp.sourcePublishWMs[sp.opts.defaultPartitionIdx] = sourcePublishWM | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please document else
branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need few changes as discussed.
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: chandankumar4 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have only one minor comment open. Approving.
Signed-off-by: Vigith Maurice <[email protected]>
Explain what this PR does.
Fixes #633
Example Pipeline to test the changes:
Note:
This will not work in below case: