Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Store job entry ID for fixed rate scheduler and also fixes cron sched…
Browse files Browse the repository at this point in the history
…ule for missing key (#431)

* Store job entry ID for fixed rate scheduler

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Add kickoff time input arg only if the user set it in the workflow

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Added unit test for no kick off time arg

Signed-off-by: Prafulla Mahindrakar <[email protected]>
  • Loading branch information
pmahindrakar-oss authored May 27, 2022
1 parent b0ccd98 commit 32161db
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
4 changes: 3 additions & 1 deletion scheduler/core/gocron_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo
var jobFunc cron.TimedFuncJob
jobFunc = job.Run

g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc)
entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc)
// Update the enttry id in the job which is handle to be used for removal
job.entryID = entryID
logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v",
job.nameOfSchedule, job.schedule)

Expand Down
2 changes: 1 addition & 1 deletion scheduler/executor/executor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (w *executor) Execute(ctx context.Context, scheduledTime time.Time, s model

literalsInputMap := map[string]*core.Literal{}
// Only add kickoff time input arg for cron based schedules
if len(s.CronExpression) > 0 {
if len(s.CronExpression) > 0 && len(s.KickoffTimeInputArg) > 0 {
literalsInputMap[s.KickoffTimeInputArg] = &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Expand Down
42 changes: 29 additions & 13 deletions scheduler/executor/executor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,36 @@ func setupExecutor(scope string) Executor {
func TestExecutor(t *testing.T) {
executor := setupExecutor("testExecutor1")
active := true
schedule := models.SchedulableEntity{
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "cron_schedule",
Version: "v1",
},
CronExpression: "*/1 * * * *",
KickoffTimeInputArg: "kickoff_time",
Active: &active,
}
mockAdminClient.OnCreateExecutionMatch(context.Background(), mock.Anything).Return(&admin.ExecutionCreateResponse{}, nil)
err := executor.Execute(context.Background(), time.Now(), schedule)
assert.Nil(t, err)
t.Run("kickoff_time_arg", func(t *testing.T) {
schedule := models.SchedulableEntity{
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "cron_schedule",
Version: "v1",
},
CronExpression: "*/1 * * * *",
KickoffTimeInputArg: "kickoff_time",
Active: &active,
}
err := executor.Execute(context.Background(), time.Now(), schedule)
assert.Nil(t, err)
})
t.Run("without kickoff_time_arg", func(t *testing.T) {
schedule := models.SchedulableEntity{
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "cron_schedule",
Version: "v1",
},
CronExpression: "*/1 * * * *",
Active: &active,
}
err := executor.Execute(context.Background(), time.Now(), schedule)
assert.Nil(t, err)
})
}

func TestExecutorAlreadyExists(t *testing.T) {
Expand Down

0 comments on commit 32161db

Please sign in to comment.