Skip to content

Commit

Permalink
[Fix](job)Fix for Duplicate Scheduling of Tasks (#46872)
Browse files Browse the repository at this point in the history
### Problem Description
The current scheduling logic calculates the next scheduled time and adds
it to the task queue when the condition triggerTime <= windowEndTimeMs
is met. However, this can lead to a task being scheduled twice if its
triggerTime is exactly equal to windowEndTimeMs:

- The task is added to the current scheduling window.
- At the same time, this timestamp becomes the startTime for the next
scheduling window, causing the task to be scheduled again.

### Changes Made
Updated the condition from triggerTime <= windowEndTimeMs to triggerTime
< windowEndTimeMs. This ensures that the scheduling time doesn’t overlap
with the window’s end time, preventing duplicate scheduling.
  • Loading branch information
CalvinKirs authored Jan 13, 2025
1 parent 1f9e426 commit 77aadf1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private List<Long> getExecutionDelaySeconds(long windowStartTimeMs, long windowE
}

// Calculate the trigger time list
for (long triggerTime = firstTriggerTime; triggerTime <= windowEndTimeMs; triggerTime += intervalMs) {
for (long triggerTime = firstTriggerTime; triggerTime < windowEndTimeMs; triggerTime += intervalMs) {
if (null == timerDefinition.getEndTimeMs()
|| triggerTime < timerDefinition.getEndTimeMs()) {
timerDefinition.setLatestSchedulerTimeMs(triggerTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package org.apache.doris.job.base;

import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.IntervalUnit;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class JobExecutionConfigurationTest {

private static final Logger LOG = LoggerFactory.getLogger(JobExecutionConfigurationTest.class);

@Test
public void testGetTriggerDelayTimesOneTime() {
JobExecutionConfiguration configuration = new JobExecutionConfiguration();
Expand Down Expand Up @@ -73,16 +78,64 @@ public void testGetTriggerDelayTimesRecurring() {
long second = 1000L;
timerDefinition.setStartTimeMs(second);
timerDefinition.setInterval(1L);
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size());
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size());
Assertions.assertEquals(2, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size());
Assertions.assertEquals(2, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size());
timerDefinition.setStartTimeMs(1672531200000L);
timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
timerDefinition.setInterval(1L);
Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray());

List<Long> expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L);

Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
timerDefinition.setInterval(1L);
timerDefinition.setStartTimeMs(1577808000000L);
// Log detailed time information
LOG.info("Current time is: "
+ TimeUtils.longToTimeStringWithms(1736459699000L));
LOG.info("Start time window is: "
+ TimeUtils.longToTimeStringWithms(1736459698000L));
LOG.info("Latest batch scheduler timer task time is: "
+ TimeUtils.longToTimeStringWithms(1736460299000L));

// Get and log trigger delay times
delayTimes = configuration.getTriggerDelayTimes(1736459699000L, 1736459698000L,
1736460299000L);
Assertions.assertEquals(10, delayTimes.size());
LOG.info("Trigger delay times size: " + delayTimes.size());
delayTimes.forEach(a -> LOG.info(TimeUtils.longToTimeStringWithms(a * 1000 + 1736459699000L)));

LOG.info("----");

// Log detailed time information
LOG.info("Current time is: "
+ TimeUtils.longToTimeStringWithms(1736460901000L));
LOG.info("Start time window is: "
+ TimeUtils.longToTimeStringWithms(1736460900000L));
LOG.info("Latest batch scheduler timer task time is: "
+ TimeUtils.longToTimeStringWithms(1736461501000L));

// Get and log trigger delay times
delayTimes = configuration.getTriggerDelayTimes(1736460901000L, 1736460900000L,
1736461501000L);
Assertions.assertEquals(11, delayTimes.size());
LOG.info("Trigger delay times size: " + delayTimes.size());
delayTimes.forEach(a -> LOG.info(TimeUtils.longToTimeStringWithms(a * 1000 + 1736460901000L)));

LOG.info("----");

// Log detailed time information
LOG.info("Current time is: " + TimeUtils.longToTimeStringWithms(1736461502000L));
LOG.info("Start time window is: " + TimeUtils.longToTimeStringWithms(1736461501000L));
LOG.info("Latest batch scheduler timer task time is: "
+ TimeUtils.longToTimeStringWithms(1736462102000L));

// Get and log trigger delay times
delayTimes = configuration.getTriggerDelayTimes(1736461502000L, 1736461501000L,
1736462102000L);
Assertions.assertEquals(10, delayTimes.size());
LOG.info("Trigger delay times size: " + delayTimes.size());
delayTimes.forEach(a -> LOG.info(TimeUtils.longToTimeStringWithms(a * 1000 + 1736461502000L)));
}

@Test
Expand Down

0 comments on commit 77aadf1

Please sign in to comment.