Skip to content

Commit

Permalink
branch-2.1: [fix](job)Fix millisecond offset issue in time window sch…
Browse files Browse the repository at this point in the history
…eduling trigger time calculation #45176 (#45353)

Cherry-picked from #45176

Co-authored-by: Calvin Kirs <[email protected]>
  • Loading branch information
github-actions[bot] and CalvinKirs authored Dec 12, 2024
1 parent 667f5e6 commit 95ffc7a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,17 @@ public static long timeStringToLong(String timeStr) {
return d.getTime();
}

/**
* Converts a millisecond timestamp to a second-level timestamp.
*
* @param timestamp The millisecond timestamp to be converted.
* @return The timestamp rounded to the nearest second (in milliseconds).
*/
public static long convertToSecondTimestamp(long timestamp) {
// Divide by 1000 to convert to seconds, then multiply by 1000 to return to milliseconds with no fractional part
return (timestamp / 1000) * 1000;
}

public static long timeStringToLong(String timeStr, TimeZone timeZone) {
DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone();
dateFormatTimeZone.withZone(timeZone.toZoneId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private Long queryDelayTimeSecond(Long currentTimeMs, Long startTimeMs) {
return 0L;
}

return (startTimeMs - currentTimeMs) / 1000;
return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000;
}

// Returns a list of delay times in seconds for executing the job within the specified window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.job.base;

import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.IntervalUnit;

import com.google.gson.annotations.SerializedName;
Expand All @@ -40,11 +41,15 @@ public class TimerDefinition {

public void checkParams() {
if (null == startTimeMs) {
startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval);
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval);
}
if (null != endTimeMs && endTimeMs < startTimeMs) {
throw new IllegalArgumentException("endTimeMs must be greater than the start time");
}
if (null != endTimeMs) {
endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs);
}

if (null != intervalUnit) {
if (null == interval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void start() {
taskDisruptorGroupManager = new TaskDisruptorGroupManager();
taskDisruptorGroupManager.init();
this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor();
latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
}
Expand All @@ -94,7 +95,8 @@ public void start() {
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis()));
log.info("re-register system scheduler timer tasks, time is " + TimeUtils
.longToTimeStringWithms(System.currentTimeMillis()));
timerTaskScheduler.newTimeout(timeout -> {
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
Expand Down Expand Up @@ -144,7 +146,9 @@ public void close() throws IOException {


private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs);
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs,
startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
if (CollectionUtils.isEmpty(delaySeconds)) {
log.info("skip job {} scheduler timer job, delay seconds is empty", job.getJobName());
Expand Down Expand Up @@ -190,7 +194,8 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() {

long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
}
this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
log.info("execute timer job ids within last ten minutes window, last time window is {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ public void testGetTriggerDelayTimesRecurring() {
timerDefinition.setInterval(1L);
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size());
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size());
timerDefinition.setStartTimeMs(1672531200000L);
timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
timerDefinition.setInterval(1L);
Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray());

List<Long> expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L);

Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
}

@Test
Expand Down

0 comments on commit 95ffc7a

Please sign in to comment.