From 9a458a009684ce50bec2817fce39a60823ac0cf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rainone?= <476650+arl@users.noreply.github.com> Date: Mon, 27 Mar 2023 17:15:34 +0200 Subject: [PATCH] input: in KCL, make LeaseDuration configurable (#216) * input: in KCL, make LeaseDuration configurable * Update CHANGELOG.md --- CHANGELOG.md | 1 + input/kcl.go | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46464f3b..490f9840 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `DiscardEmptyFiles` option to the `FileWriter` output [#204](https://github.com/AdRoll/baker/pull/204) - Add `URLEscape` and `URLParam` filters [#206](https://github.com/AdRoll/baker/pull/206) - Add `QueueNames` parameter to SQS Input [#210](https://github.com/AdRoll/baker/pull/210) +- Make `LeaseDuration` configurable on the KCL input [#216](https://github.com/AdRoll/baker/pull/216) ### Changed diff --git a/input/kcl.go b/input/kcl.go index 0b1da3a8..3686a465 100644 --- a/input/kcl.go +++ b/input/kcl.go @@ -43,6 +43,7 @@ type KCLConfig struct { AppName string `help:"Used by KCL to allow multiple app to consume the same stream." required:"true"` MaxShards int `help:"Max shards this Worker can handle at a time" default:"32767"` ShardSync time.Duration `help:"Time between tasks to sync leases and Kinesis shards" default:"60s"` + LeaseDuration time.Duration `help:"Time after which a worker should have renewed all shard leases before not being considered owner anymore" default:"60s"` InitialPosition string `help:"Position in the stream where a new application should start from. Values: LATEST or TRIM_HORIZON" default:"LATEST"` initialPosition config.InitialPositionInStream } @@ -75,6 +76,9 @@ func (cfg *KCLConfig) fillDefaults() { if cfg.ShardSync == 0 { cfg.ShardSync = time.Minute } + if cfg.LeaseDuration == 0 { + cfg.LeaseDuration = time.Minute + } } // KCL is a Baker input reading from Kinesis with the KCL (Kinesis Client Library). @@ -119,8 +123,6 @@ func NewKCL(cfg baker.InputParams) (baker.Input, error) { } const ( - // Leases not renewed within this period will be claimed by others - leaseDuration = 60 * time.Second // Period before the end of lease during which a lease is refreshed by the owner leaseRefreshPeriod = 20 * time.Second // Max records to read per Kinesis getRecords() call @@ -136,7 +138,7 @@ func NewKCL(cfg baker.InputParams) (baker.Input, error) { WithMaxRecords(maxRecords). WithMaxLeasesForWorker(dcfg.MaxShards). WithShardSyncIntervalMillis(int(dcfg.ShardSync / time.Millisecond)). - WithFailoverTimeMillis(int(leaseDuration / time.Millisecond)). + WithFailoverTimeMillis(int(dcfg.LeaseDuration / time.Millisecond)). WithLeaseRefreshPeriodMillis(int(leaseRefreshPeriod / time.Millisecond)). WithInitialPositionInStream(dcfg.initialPosition). WithMonitoringService(&kcl.metrics).