Skip to content

Commit

Permalink
[fix](job)Fix millisecond offset issue in time window scheduling trig…
Browse files Browse the repository at this point in the history
…ger time calculation (#45176)

### Abstract:
In the current time window scheduling logic, the calculation of trigger
times was not strictly aligned to the second level, which could lead to
millisecond offsets. This offset caused issues such as consecutive
trigger times at 14:56:59 and 14:57:00, disrupting the correctness of
the scheduling.

This PR optimizes the calculation of trigger times to ensure that time
points are strictly aligned to the second level, preventing the
accumulation of millisecond errors.

### Issue Description:

Under a specified window (e.g., 14:50:00 to 14:59:00) and a fixed
interval (e.g., every minute), the scheduler generated erroneous trigger
times such as:

```
| 2024-12-04 14:56:59 |
| 2024-12-04 14:57:00 |
| 2024-12-04 14:57:59 |
| 2024-12-04 14:58:00 |
```
#### Cause:
The current firstTriggerTime and the loop calculation did not strictly
align trigger times to the second level, resulting in erroneous trigger
points due to floating-point or millisecond offset accumulation. The end
condition for the time window was not aligned to the second level, which
could lead to additional trigger times being included.

### Fix:
Modification 1: Strictly align the trigger time to the second level.
  • Loading branch information
CalvinKirs authored Dec 12, 2024
1 parent 7335ea8 commit 13add3c
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,17 @@ public static long timeStringToLong(String timeStr) {
return d.getTime();
}

/**
* Converts a millisecond timestamp to a second-level timestamp.
*
* @param timestamp The millisecond timestamp to be converted.
* @return The timestamp rounded to the nearest second (in milliseconds).
*/
public static long convertToSecondTimestamp(long timestamp) {
// Divide by 1000 to convert to seconds, then multiply by 1000 to return to milliseconds with no fractional part
return (timestamp / 1000) * 1000;
}

public static long timeStringToLong(String timeStr, TimeZone timeZone) {
DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone();
dateFormatTimeZone.withZone(timeZone.toZoneId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private Long queryDelayTimeSecond(Long currentTimeMs, Long startTimeMs) {
return 0L;
}

return (startTimeMs - currentTimeMs) / 1000;
return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000;
}

// Returns a list of delay times in seconds for executing the job within the specified window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.job.base;

import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.IntervalUnit;

import com.google.gson.annotations.SerializedName;
Expand All @@ -40,11 +41,15 @@ public class TimerDefinition {

public void checkParams() {
if (null == startTimeMs) {
startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval);
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval);
}
if (null != endTimeMs && endTimeMs < startTimeMs) {
throw new IllegalArgumentException("endTimeMs must be greater than the start time");
}
if (null != endTimeMs) {
endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs);
}

if (null != intervalUnit) {
if (null == interval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void start() {
taskDisruptorGroupManager = new TaskDisruptorGroupManager();
taskDisruptorGroupManager.init();
this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor();
latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
}
Expand All @@ -94,7 +95,8 @@ public void start() {
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis()));
log.info("re-register system scheduler timer tasks, time is " + TimeUtils
.longToTimeStringWithms(System.currentTimeMillis()));
timerTaskScheduler.newTimeout(timeout -> {
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
Expand Down Expand Up @@ -144,7 +146,9 @@ public void close() throws IOException {


private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs);
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs,
startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
if (CollectionUtils.isEmpty(delaySeconds)) {
log.info("skip job {} scheduler timer job, delay seconds is empty", job.getJobName());
Expand Down Expand Up @@ -190,7 +194,8 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() {

long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
}
this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
log.info("execute timer job ids within last ten minutes window, last time window is {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ public void testGetTriggerDelayTimesRecurring() {
timerDefinition.setInterval(1L);
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size());
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size());
timerDefinition.setStartTimeMs(1672531200000L);
timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
timerDefinition.setInterval(1L);
Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray());

List<Long> expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L);

Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
}

@Test
Expand Down

0 comments on commit 13add3c

Please sign in to comment.