From 32161db97d70ba1e8c97980a641f77d07bde2dcb Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Fri, 27 May 2022 21:35:33 +0530 Subject: [PATCH] Store job entry ID for fixed rate scheduler and also fixes cron schedule for missing key (#431) * Store job entry ID for fixed rate scheduler Signed-off-by: Prafulla Mahindrakar * Add kickoff time input arg only if the user set it in the workflow Signed-off-by: Prafulla Mahindrakar * Added unit test for no kick off time arg Signed-off-by: Prafulla Mahindrakar --- scheduler/core/gocron_scheduler.go | 4 ++- scheduler/executor/executor_impl.go | 2 +- scheduler/executor/executor_impl_test.go | 42 ++++++++++++++++-------- 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/scheduler/core/gocron_scheduler.go b/scheduler/core/gocron_scheduler.go index c1abbfa12..15ca50f8c 100644 --- a/scheduler/core/gocron_scheduler.go +++ b/scheduler/core/gocron_scheduler.go @@ -271,7 +271,9 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo var jobFunc cron.TimedFuncJob jobFunc = job.Run - g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc) + entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc) + // Update the enttry id in the job which is handle to be used for removal + job.entryID = entryID logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v", job.nameOfSchedule, job.schedule) diff --git a/scheduler/executor/executor_impl.go b/scheduler/executor/executor_impl.go index f46dc3542..cc1705b3a 100644 --- a/scheduler/executor/executor_impl.go +++ b/scheduler/executor/executor_impl.go @@ -37,7 +37,7 @@ func (w *executor) Execute(ctx context.Context, scheduledTime time.Time, s model literalsInputMap := map[string]*core.Literal{} // Only add kickoff time input arg for cron based schedules - if len(s.CronExpression) > 0 { + if len(s.CronExpression) > 0 && len(s.KickoffTimeInputArg) > 0 { literalsInputMap[s.KickoffTimeInputArg] = &core.Literal{ Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ diff --git a/scheduler/executor/executor_impl_test.go b/scheduler/executor/executor_impl_test.go index a1f653296..c84a6d527 100644 --- a/scheduler/executor/executor_impl_test.go +++ b/scheduler/executor/executor_impl_test.go @@ -28,20 +28,36 @@ func setupExecutor(scope string) Executor { func TestExecutor(t *testing.T) { executor := setupExecutor("testExecutor1") active := true - schedule := models.SchedulableEntity{ - SchedulableEntityKey: models.SchedulableEntityKey{ - Project: "project", - Domain: "domain", - Name: "cron_schedule", - Version: "v1", - }, - CronExpression: "*/1 * * * *", - KickoffTimeInputArg: "kickoff_time", - Active: &active, - } mockAdminClient.OnCreateExecutionMatch(context.Background(), mock.Anything).Return(&admin.ExecutionCreateResponse{}, nil) - err := executor.Execute(context.Background(), time.Now(), schedule) - assert.Nil(t, err) + t.Run("kickoff_time_arg", func(t *testing.T) { + schedule := models.SchedulableEntity{ + SchedulableEntityKey: models.SchedulableEntityKey{ + Project: "project", + Domain: "domain", + Name: "cron_schedule", + Version: "v1", + }, + CronExpression: "*/1 * * * *", + KickoffTimeInputArg: "kickoff_time", + Active: &active, + } + err := executor.Execute(context.Background(), time.Now(), schedule) + assert.Nil(t, err) + }) + t.Run("without kickoff_time_arg", func(t *testing.T) { + schedule := models.SchedulableEntity{ + SchedulableEntityKey: models.SchedulableEntityKey{ + Project: "project", + Domain: "domain", + Name: "cron_schedule", + Version: "v1", + }, + CronExpression: "*/1 * * * *", + Active: &active, + } + err := executor.Execute(context.Background(), time.Now(), schedule) + assert.Nil(t, err) + }) } func TestExecutorAlreadyExists(t *testing.T) {