-
Notifications
You must be signed in to change notification settings - Fork 988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[cdc] fix cdc watermark value emit error #2763
Conversation
@MonsterChenzhuo @JingsongLi PTAL. thx |
@@ -56,13 +56,10 @@ public void onEvent(String record, long timestamp, WatermarkOutput output) { | |||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a need to throw an exception here. From my understanding, even if there is a problem with the watermark calculation, it will not cause problems with the task, so I think you can just ignore and printing a warning log here. Avoid bugs caused by issues we haven't considered. But this is obviously not the issue that this PR needs to deal with
WDYT @MonsterChenzhuo
cc @yuzelin |
@huyuanfeng2018 Have you encountered any practical problems? |
@huyuanfeng2018 "onPeriodicEmit is called regularly, so the system timestamp will be sent as a watermark regularly." This is to solve the mechanism of normal tag creation when no data is written. This is not a bug. |
So if periodic calls are always sent using system time, what is the point of extracting watermark? I do not quite understand. In a scenario where tags are created based on watermarks every day, if there is a delay or backlog in binlog consumption at 0 o'clock, I will still get the system time as the watermark, and my tags will still be created, even if my data has not completely arrived. So I think that if want to avoid creating tags when no data arrives, you should use other methods to handle it instead of sending the system time as a watermark. |
I did encounter some problems, but due to online tasks, I directly recompiled and restored it. At that time, I did not debug what data caused this error. |
|
So, do we need a mechanism to determine that the source has no data input for a certain period of time, just like a parameter |
You can take a look at this:#2646 |
Well, this pr looks good, but I still have a problem. This pr is only used to force the generation of snapshots, even if the data has not been sent, and watermarks need to be used to determine whether the conditions for generating snapshots are met. Then we produce watermarks here. Can it be done in a similar way? For example, if I have no data written for 5 minutes, I will force the current time to be sent as the watermark. @Override
public void onEvent(String record, long timestamp, WatermarkOutput output) {
long tMs;
try {
tMs = timestampExtractor.extractTimestamp(record);
currentMaxTimestamp = Math.max(currentMaxTimestamp, tMs);
lastEventTimestamp = System.currentTimeMillis();
} catch (Exception e) {
// ignore
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
long timeMillis = System.currentTimeMillis();
if (timeMillis - lastEventTimestamp > 1000 * 60 * 5) {
currentMaxTimestamp = timeMillis;
}
output.emitWatermark(new Watermark(currentMaxTimestamp - 1));
} |
It seems we didn't find a way to fix this. Close this now, feel free to re-open if you have more thoughts. |
Purpose
Linked issue: close #2745
Tests
API and Format
Documentation