Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](job)Fix millisecond offset issue in time window scheduling trigger time calculation #45176

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,17 @@ public static long timeStringToLong(String timeStr) {
return d.getTime();
}

/**
* Converts a millisecond timestamp to a second-level timestamp.
*
* @param timestamp The millisecond timestamp to be converted.
* @return The timestamp rounded to the nearest second (in milliseconds).
*/
public static long convertToSecondTimestamp(long timestamp) {
// Divide by 1000 to convert to seconds, then multiply by 1000 to return to milliseconds with no fractional part
return (timestamp / 1000) * 1000;
}

public static long timeStringToLong(String timeStr, TimeZone timeZone) {
DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone();
dateFormatTimeZone.withZone(timeZone.toZoneId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private Long queryDelayTimeSecond(Long currentTimeMs, Long startTimeMs) {
return 0L;
}

return (startTimeMs - currentTimeMs) / 1000;
return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000;
}

// Returns a list of delay times in seconds for executing the job within the specified window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.job.base;

import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.IntervalUnit;

import com.google.gson.annotations.SerializedName;
Expand All @@ -40,11 +41,15 @@ public class TimerDefinition {

public void checkParams() {
if (null == startTimeMs) {
startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval);
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval);
}
if (null != endTimeMs && endTimeMs < startTimeMs) {
throw new IllegalArgumentException("endTimeMs must be greater than the start time");
}
if (null != endTimeMs) {
endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs);
}

if (null != intervalUnit) {
if (null == interval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void start() {
taskDisruptorGroupManager = new TaskDisruptorGroupManager();
taskDisruptorGroupManager.init();
this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor();
latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
}
Expand All @@ -94,7 +95,8 @@ public void start() {
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis()));
log.info("re-register system scheduler timer tasks, time is " + TimeUtils
.longToTimeStringWithms(System.currentTimeMillis()));
timerTaskScheduler.newTimeout(timeout -> {
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
Expand Down Expand Up @@ -144,7 +146,9 @@ public void close() throws IOException {


private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs);
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs,
startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
if (CollectionUtils.isNotEmpty(delaySeconds)) {
delaySeconds.forEach(delaySecond -> {
Expand Down Expand Up @@ -185,7 +189,8 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() {

long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
}
this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
if (jobMap.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ public void testGetTriggerDelayTimesRecurring() {
timerDefinition.setInterval(1L);
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size());
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size());
timerDefinition.setStartTimeMs(1672531200000L);
timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
timerDefinition.setInterval(1L);
Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray());

List<Long> expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L);

Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
}

@Test
Expand Down
Loading