From 66e00bb653f27ebfb7736ba78f1e0de54364f2b1 Mon Sep 17 00:00:00 2001 From: Lina Jodoin Date: Wed, 27 Nov 2024 15:23:09 -0800 Subject: [PATCH 1/3] sched2 common package WIP --- components/scheduler2/common/config.go | 53 ++++++++++++++++++ components/scheduler2/common/util.go | 77 ++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 components/scheduler2/common/config.go create mode 100644 components/scheduler2/common/util.go diff --git a/components/scheduler2/common/config.go b/components/scheduler2/common/config.go new file mode 100644 index 00000000000..81480a7d891 --- /dev/null +++ b/components/scheduler2/common/config.go @@ -0,0 +1,53 @@ +package common + +import ( + "time" + + "go.temporal.io/server/common/dynamicconfig" +) + +type ( + Tweakables struct { + DefaultCatchupWindow time.Duration // Default for catchup window + MinCatchupWindow time.Duration // Minimum for catchup window + MaxBufferSize int // MaxBufferSize limits the number of buffered actions pending execution in total + BackfillsPerIteration int // How many backfilled actions to buffer per iteration (implies rate limit since min sleep is 1s) + CanceledTerminatedCountAsFailures bool // Whether cancelled+terminated count for pause-on-failure + + // TODO - incomplete tweakables list + } + + // V2 Scheduler dynamic config, shared among all substate machines. + Config struct { + Tweakables dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables] + ExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + } +) + +var ( + CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting( + "component.scheduler.tweakables", + DefaultTweakables, + "A set of tweakable parameters for the V2 scheduler") + + ExecutionTimeout = dynamicconfig.NewNamespaceDurationSetting( + "component.scheduler.executionTimeout", + time.Second*10, + `ExecutionTimeout is the timeout for executing a single scheduler task.`, + ) + + DefaultTweakables = Tweakables{ + DefaultCatchupWindow: 365 * 24 * time.Hour, + MinCatchupWindow: 10 * time.Second, + MaxBufferSize: 1000, + BackfillsPerIteration: 10, + CanceledTerminatedCountAsFailures: false, + } +) + +func ConfigProvider(dc *dynamicconfig.Collection) *Config { + return &Config{ + Tweakables: CurrentTweakables.Get(dc), + ExecutionTimeout: ExecutionTimeout.Get(dc), + } +} diff --git a/components/scheduler2/common/util.go b/components/scheduler2/common/util.go new file mode 100644 index 00000000000..a6c44bc342d --- /dev/null +++ b/components/scheduler2/common/util.go @@ -0,0 +1,77 @@ +package common + +import ( + "context" + "fmt" + "time" + + servercommon "go.temporal.io/server/common" + "go.temporal.io/server/components/scheduler2/core" + "go.temporal.io/server/service/history/consts" + "go.temporal.io/server/service/history/hsm" +) + +func ValidateTask[ + S comparable, + SM hsm.StateMachine[S], + E any](node *hsm.Node, transition hsm.Transition[S, SM, E]) error { + if err := node.CheckRunning(); err != nil { + return err + } + sm, err := hsm.MachineData[SM](node) + if err != nil { + return err + } + if !transition.Possible(sm) { + return fmt.Errorf( + "%w: %w: cannot transition from state %v to %v", + consts.ErrStaleReference, + hsm.ErrInvalidTransition, + sm.State(), + transition.Destination, + ) + } + return nil +} + +// Intended to be called with a substate machine's node. Returns a cloned copy +// of the top-level Scheduler from its persisted state. +func LoadSchedulerFromParent( + ctx context.Context, + env hsm.Environment, + ref hsm.Ref) (scheduler core.Scheduler, err error) { + err = env.Access(ctx, ref, hsm.AccessRead, func(node *hsm.Node) error { + prevScheduler, err := hsm.MachineData[core.Scheduler](node.Parent) + if err != nil { + return err + } + scheduler = core.Scheduler{ + HsmSchedulerV2State: servercommon.CloneProto(prevScheduler.HsmSchedulerV2State), + } + return nil + }) + return +} + +// Generates a deterministic request ID for a buffered action's time. The request +// ID is deterministic because the jittered actual time (as well as the spec's +// nominal time) is, in turn, also deterministic. +// +// backfillID should be left blank for actions that are being started +// automatically, based on the schedule spec. It must be set for backfills, +// as backfills may generate buffered actions that overlap with both +// automatically-buffered actions, as well as other requested backfills. +func GenerateRequestID(scheduler core.Scheduler, backfillID string, nominal, actual time.Time) string { + if backfillID == "" { + backfillID = "auto" + } + return fmt.Sprintf( + "sched-%s-%s-%s-%d-%d-%d", + backfillID, + scheduler.NamespaceId, + scheduler.ScheduleId, + scheduler.ConflictToken, + nominal.UnixMilli(), + actual.UnixMilli(), + ) +} From 85b73c8f43d14571a81421428f5f6b727287417d Mon Sep 17 00:00:00 2001 From: Lina Jodoin Date: Wed, 4 Dec 2024 17:20:02 -0800 Subject: [PATCH 2/3] common feedback --- components/scheduler2/common/config.go | 2 +- components/scheduler2/common/util.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/components/scheduler2/common/config.go b/components/scheduler2/common/config.go index 81480a7d891..64e57cb01f1 100644 --- a/components/scheduler2/common/config.go +++ b/components/scheduler2/common/config.go @@ -17,7 +17,7 @@ type ( // TODO - incomplete tweakables list } - // V2 Scheduler dynamic config, shared among all substate machines. + // V2 Scheduler dynamic config, shared among all sub state machines. Config struct { Tweakables dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables] ExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter diff --git a/components/scheduler2/common/util.go b/components/scheduler2/common/util.go index a6c44bc342d..360ad88b149 100644 --- a/components/scheduler2/common/util.go +++ b/components/scheduler2/common/util.go @@ -34,7 +34,7 @@ func ValidateTask[ return nil } -// Intended to be called with a substate machine's node. Returns a cloned copy +// Intended to be called with a sub state machine's node. Returns a cloned copy // of the top-level Scheduler from its persisted state. func LoadSchedulerFromParent( ctx context.Context, @@ -46,7 +46,7 @@ func LoadSchedulerFromParent( return err } scheduler = core.Scheduler{ - HsmSchedulerV2State: servercommon.CloneProto(prevScheduler.HsmSchedulerV2State), + SchedulerInternal: servercommon.CloneProto(prevScheduler.SchedulerInternal), } return nil }) From 41430ac888edc6b37ba6c10eebfc9ddc7d0a9973 Mon Sep 17 00:00:00 2001 From: Lina Jodoin Date: Mon, 9 Dec 2024 16:44:58 -0800 Subject: [PATCH 3/3] tests and package move for utils --- components/scheduler2/{common => }/config.go | 7 +-- components/scheduler2/{common => }/util.go | 28 ++--------- components/scheduler2/util_test.go | 52 ++++++++++++++++++++ 3 files changed, 60 insertions(+), 27 deletions(-) rename components/scheduler2/{common => }/config.go (91%) rename components/scheduler2/{common => }/util.go (62%) create mode 100644 components/scheduler2/util_test.go diff --git a/components/scheduler2/common/config.go b/components/scheduler2/config.go similarity index 91% rename from components/scheduler2/common/config.go rename to components/scheduler2/config.go index 64e57cb01f1..21927d1de20 100644 --- a/components/scheduler2/common/config.go +++ b/components/scheduler2/config.go @@ -1,4 +1,4 @@ -package common +package scheduler2 import ( "time" @@ -25,13 +25,14 @@ type ( ) var ( + // TODO - fix namespaces after removal of prototype CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting( - "component.scheduler.tweakables", + "component.scheduler2.tweakables", DefaultTweakables, "A set of tweakable parameters for the V2 scheduler") ExecutionTimeout = dynamicconfig.NewNamespaceDurationSetting( - "component.scheduler.executionTimeout", + "component.scheduler2.executionTimeout", time.Second*10, `ExecutionTimeout is the timeout for executing a single scheduler task.`, ) diff --git a/components/scheduler2/common/util.go b/components/scheduler2/util.go similarity index 62% rename from components/scheduler2/common/util.go rename to components/scheduler2/util.go index 360ad88b149..9214478c129 100644 --- a/components/scheduler2/common/util.go +++ b/components/scheduler2/util.go @@ -1,16 +1,15 @@ -package common +package scheduler2 import ( - "context" "fmt" "time" - servercommon "go.temporal.io/server/common" - "go.temporal.io/server/components/scheduler2/core" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/hsm" ) +// ValidateTask ensures that the given transition is possible with the current +// state machine state. func ValidateTask[ S comparable, SM hsm.StateMachine[S], @@ -34,25 +33,6 @@ func ValidateTask[ return nil } -// Intended to be called with a sub state machine's node. Returns a cloned copy -// of the top-level Scheduler from its persisted state. -func LoadSchedulerFromParent( - ctx context.Context, - env hsm.Environment, - ref hsm.Ref) (scheduler core.Scheduler, err error) { - err = env.Access(ctx, ref, hsm.AccessRead, func(node *hsm.Node) error { - prevScheduler, err := hsm.MachineData[core.Scheduler](node.Parent) - if err != nil { - return err - } - scheduler = core.Scheduler{ - SchedulerInternal: servercommon.CloneProto(prevScheduler.SchedulerInternal), - } - return nil - }) - return -} - // Generates a deterministic request ID for a buffered action's time. The request // ID is deterministic because the jittered actual time (as well as the spec's // nominal time) is, in turn, also deterministic. @@ -61,7 +41,7 @@ func LoadSchedulerFromParent( // automatically, based on the schedule spec. It must be set for backfills, // as backfills may generate buffered actions that overlap with both // automatically-buffered actions, as well as other requested backfills. -func GenerateRequestID(scheduler core.Scheduler, backfillID string, nominal, actual time.Time) string { +func GenerateRequestID(scheduler Scheduler, backfillID string, nominal, actual time.Time) string { if backfillID == "" { backfillID = "auto" } diff --git a/components/scheduler2/util_test.go b/components/scheduler2/util_test.go new file mode 100644 index 00000000000..bc7b52bff5b --- /dev/null +++ b/components/scheduler2/util_test.go @@ -0,0 +1,52 @@ +package scheduler2_test + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.temporal.io/server/api/schedule/v1" + "go.temporal.io/server/components/scheduler2" +) + +func TestGenerateRequestID(t *testing.T) { + scheduler := scheduler2.Scheduler{ + SchedulerInternal: &schedule.SchedulerInternal{ + Namespace: "ns", + NamespaceId: "nsid", + ScheduleId: "mysched", + ConflictToken: 10, + }, + } + nominalTime := time.Now() + actualTime := time.Now() + + // No backfill ID given. + actual := scheduler2.GenerateRequestID( + scheduler, + "", + nominalTime, + actualTime, + ) + expected := fmt.Sprintf( + "sched-auto-nsid-mysched-10-%d-%d", + nominalTime.UnixMilli(), + actualTime.UnixMilli(), + ) + require.Equal(t, expected, actual) + + // Backfill ID given. + actual = scheduler2.GenerateRequestID( + scheduler, + "backfillid", + nominalTime, + actualTime, + ) + expected = fmt.Sprintf( + "sched-backfillid-nsid-mysched-10-%d-%d", + nominalTime.UnixMilli(), + actualTime.UnixMilli(), + ) + require.Equal(t, expected, actual) +}