From f39ebe5facacd65b739e4cd372f8609ef761f7dd Mon Sep 17 00:00:00 2001 From: Daniel Cosme Date: Wed, 12 Jun 2024 14:02:21 -0400 Subject: [PATCH] Make configurable the number of concurrent workflows (#616) --- enduro.toml | 6 ++- internal/workflow/semaphore.go | 2 +- main.go | 13 +++-- .../en/docs/user-manual/configuration.md | 51 +++++++++++++++++++ 4 files changed, 64 insertions(+), 8 deletions(-) diff --git a/enduro.toml b/enduro.toml index a6bce9ab..e24e6cf1 100644 --- a/enduro.toml +++ b/enduro.toml @@ -85,7 +85,9 @@ disabled = true processNameMetadata = false [worker] -heartbeatThrottleInterval = "60s" +heartbeatThrottleInterval = "1m" +maxConcurrentWorkflowsExecutionsSize = 15 +maxConcurrentSessionExecutionSize = 15 [workflow] -activityHeartbeatTimeout = "20s" +activityHeartbeatTimeout = "30s" diff --git a/internal/workflow/semaphore.go b/internal/workflow/semaphore.go index 88d413f9..7a6e79b9 100644 --- a/internal/workflow/semaphore.go +++ b/internal/workflow/semaphore.go @@ -33,7 +33,7 @@ func acquirePipeline(ctx temporalsdk_workflow.Context, colsvc collection.Service // Acquire the pipeline semaphore. { ctx := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{ - HeartbeatTimeout: time.Minute, + HeartbeatTimeout: heartBeatTimeout, WaitForCancellation: false, ScheduleToStartTimeout: forever, StartToCloseTimeout: forever, diff --git a/main.go b/main.go index 37dd21ab..f3e58207 100644 --- a/main.go +++ b/main.go @@ -253,10 +253,11 @@ func main() { done := make(chan struct{}) w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{ - EnableSessionWorker: true, - MaxConcurrentSessionExecutionSize: 5000, - MaxHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval, - DefaultHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval, + EnableSessionWorker: true, + MaxConcurrentSessionExecutionSize: config.Worker.MaxConcurrentSessionExecutionSize, + MaxConcurrentWorkflowTaskExecutionSize: config.Worker.MaxConcurrentWorkflowsExecutionsSize, + MaxHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval, + DefaultHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval, }) if err != nil { logger.Error(err, "Error creating Temporal worker.") @@ -402,7 +403,9 @@ type configuration struct { } type WorkerConfig struct { - HeartbeatThrottleInterval time.Duration + HeartbeatThrottleInterval time.Duration + MaxConcurrentSessionExecutionSize int + MaxConcurrentWorkflowsExecutionsSize int } func (c configuration) Validate() error { diff --git a/website/content/en/docs/user-manual/configuration.md b/website/content/en/docs/user-manual/configuration.md index e4a4ad24..c40f5241 100644 --- a/website/content/en/docs/user-manual/configuration.md +++ b/website/content/en/docs/user-manual/configuration.md @@ -462,6 +462,57 @@ does not include a document with checksums, e.g. `checksum.sha1`. E.g.: `false` + +### `[worker]` + +#### `heartbeatThrottleInterval` (String) + +Specifies the interval at which Enduro sends heartbeats to the workflow engine. + +The string should be constructed as a sequence of decimal numbers, each with +optional fraction and a unit suffix, such as "30m", "24h" or "2h30m". +Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + +E.g.: `5m` (String) + +#### `maxConcurrentWorkflowsExecutionsSize` (int) + +Sets the maximum number of concurrent workflow executions that Enduro will +accept from the workflow engine. + +A good rule of thumb is to set this value to twice the sum of the capacities +of all the configured pipelines. For example, if two pipelines are configured +with a capacity of `5` each, the value should be `20`. + +E.g.: `10` + +#### `maxConcurrentSessionExecutionSize` (int) + +Sets the maximum number of concurrently running (workflow engine) sessions that +Enduro supports. This value governs how many concurrent SIPs are going to be +processed at any given time, regardless of pipeline +capacity. This setting can be used to throttle from a single place how many +concurrent pipelines Enduro will run. + +We recommend setting this value to be directly proportional to (or higher than) +the Archivematica pipeline capacity. + +E.g.: `5` + +### `[worker]` + +#### `activityHeartbeatTimeout` (String) + +Specifies the timeout duration for activities that send heartbeats to the workflow engine. +If the activity takes more time to send a heartbeat to the workflow engine, the workflow will fail +with a `heartbeatTimeout` error. + +The string should be constructed as a sequence of decimal numbers, each with +optional fraction and a unit suffix, such as "30m", "24h" or "2h30m". +Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + +E.g.: `5m` (String) + ## Configuration example Source: https://github.com/artefactual-labs/enduro/blob/main/enduro.toml.