diff --git a/components/scheduler2/config.go b/components/scheduler2/config.go new file mode 100644 index 00000000000..21927d1de20 --- /dev/null +++ b/components/scheduler2/config.go @@ -0,0 +1,54 @@ +package scheduler2 + +import ( + "time" + + "go.temporal.io/server/common/dynamicconfig" +) + +type ( + Tweakables struct { + DefaultCatchupWindow time.Duration // Default for catchup window + MinCatchupWindow time.Duration // Minimum for catchup window + MaxBufferSize int // MaxBufferSize limits the number of buffered actions pending execution in total + BackfillsPerIteration int // How many backfilled actions to buffer per iteration (implies rate limit since min sleep is 1s) + CanceledTerminatedCountAsFailures bool // Whether cancelled+terminated count for pause-on-failure + + // TODO - incomplete tweakables list + } + + // V2 Scheduler dynamic config, shared among all sub state machines. + Config struct { + Tweakables dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables] + ExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + } +) + +var ( + // TODO - fix namespaces after removal of prototype + CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting( + "component.scheduler2.tweakables", + DefaultTweakables, + "A set of tweakable parameters for the V2 scheduler") + + ExecutionTimeout = dynamicconfig.NewNamespaceDurationSetting( + "component.scheduler2.executionTimeout", + time.Second*10, + `ExecutionTimeout is the timeout for executing a single scheduler task.`, + ) + + DefaultTweakables = Tweakables{ + DefaultCatchupWindow: 365 * 24 * time.Hour, + MinCatchupWindow: 10 * time.Second, + MaxBufferSize: 1000, + BackfillsPerIteration: 10, + CanceledTerminatedCountAsFailures: false, + } +) + +func ConfigProvider(dc *dynamicconfig.Collection) *Config { + return &Config{ + Tweakables: CurrentTweakables.Get(dc), + ExecutionTimeout: ExecutionTimeout.Get(dc), + } +} diff --git a/components/scheduler2/util.go b/components/scheduler2/util.go new file mode 100644 index 00000000000..9214478c129 --- /dev/null +++ b/components/scheduler2/util.go @@ -0,0 +1,57 @@ +package scheduler2 + +import ( + "fmt" + "time" + + "go.temporal.io/server/service/history/consts" + "go.temporal.io/server/service/history/hsm" +) + +// ValidateTask ensures that the given transition is possible with the current +// state machine state. +func ValidateTask[ + S comparable, + SM hsm.StateMachine[S], + E any](node *hsm.Node, transition hsm.Transition[S, SM, E]) error { + if err := node.CheckRunning(); err != nil { + return err + } + sm, err := hsm.MachineData[SM](node) + if err != nil { + return err + } + if !transition.Possible(sm) { + return fmt.Errorf( + "%w: %w: cannot transition from state %v to %v", + consts.ErrStaleReference, + hsm.ErrInvalidTransition, + sm.State(), + transition.Destination, + ) + } + return nil +} + +// Generates a deterministic request ID for a buffered action's time. The request +// ID is deterministic because the jittered actual time (as well as the spec's +// nominal time) is, in turn, also deterministic. +// +// backfillID should be left blank for actions that are being started +// automatically, based on the schedule spec. It must be set for backfills, +// as backfills may generate buffered actions that overlap with both +// automatically-buffered actions, as well as other requested backfills. +func GenerateRequestID(scheduler Scheduler, backfillID string, nominal, actual time.Time) string { + if backfillID == "" { + backfillID = "auto" + } + return fmt.Sprintf( + "sched-%s-%s-%s-%d-%d-%d", + backfillID, + scheduler.NamespaceId, + scheduler.ScheduleId, + scheduler.ConflictToken, + nominal.UnixMilli(), + actual.UnixMilli(), + ) +} diff --git a/components/scheduler2/util_test.go b/components/scheduler2/util_test.go new file mode 100644 index 00000000000..bc7b52bff5b --- /dev/null +++ b/components/scheduler2/util_test.go @@ -0,0 +1,52 @@ +package scheduler2_test + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.temporal.io/server/api/schedule/v1" + "go.temporal.io/server/components/scheduler2" +) + +func TestGenerateRequestID(t *testing.T) { + scheduler := scheduler2.Scheduler{ + SchedulerInternal: &schedule.SchedulerInternal{ + Namespace: "ns", + NamespaceId: "nsid", + ScheduleId: "mysched", + ConflictToken: 10, + }, + } + nominalTime := time.Now() + actualTime := time.Now() + + // No backfill ID given. + actual := scheduler2.GenerateRequestID( + scheduler, + "", + nominalTime, + actualTime, + ) + expected := fmt.Sprintf( + "sched-auto-nsid-mysched-10-%d-%d", + nominalTime.UnixMilli(), + actualTime.UnixMilli(), + ) + require.Equal(t, expected, actual) + + // Backfill ID given. + actual = scheduler2.GenerateRequestID( + scheduler, + "backfillid", + nominalTime, + actualTime, + ) + expected = fmt.Sprintf( + "sched-backfillid-nsid-mysched-10-%d-%d", + nominalTime.UnixMilli(), + actualTime.UnixMilli(), + ) + require.Equal(t, expected, actual) +}