Skip to content

Commit

Permalink
input: in KCL, make LeaseDuration configurable (#216)
Browse files Browse the repository at this point in the history
* input: in KCL, make LeaseDuration configurable

* Update CHANGELOG.md
  • Loading branch information
arl authored Mar 27, 2023
1 parent f893acc commit 9a458a0
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add `DiscardEmptyFiles` option to the `FileWriter` output [#204](https://github.com/AdRoll/baker/pull/204)
- Add `URLEscape` and `URLParam` filters [#206](https://github.com/AdRoll/baker/pull/206)
- Add `QueueNames` parameter to SQS Input [#210](https://github.com/AdRoll/baker/pull/210)
- Make `LeaseDuration` configurable on the KCL input [#216](https://github.com/AdRoll/baker/pull/216)

### Changed

Expand Down
8 changes: 5 additions & 3 deletions input/kcl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type KCLConfig struct {
AppName string `help:"Used by KCL to allow multiple app to consume the same stream." required:"true"`
MaxShards int `help:"Max shards this Worker can handle at a time" default:"32767"`
ShardSync time.Duration `help:"Time between tasks to sync leases and Kinesis shards" default:"60s"`
LeaseDuration time.Duration `help:"Time after which a worker should have renewed all shard leases before not being considered owner anymore" default:"60s"`
InitialPosition string `help:"Position in the stream where a new application should start from. Values: LATEST or TRIM_HORIZON" default:"LATEST"`
initialPosition config.InitialPositionInStream
}
Expand Down Expand Up @@ -75,6 +76,9 @@ func (cfg *KCLConfig) fillDefaults() {
if cfg.ShardSync == 0 {
cfg.ShardSync = time.Minute
}
if cfg.LeaseDuration == 0 {
cfg.LeaseDuration = time.Minute
}
}

// KCL is a Baker input reading from Kinesis with the KCL (Kinesis Client Library).
Expand Down Expand Up @@ -119,8 +123,6 @@ func NewKCL(cfg baker.InputParams) (baker.Input, error) {
}

const (
// Leases not renewed within this period will be claimed by others
leaseDuration = 60 * time.Second
// Period before the end of lease during which a lease is refreshed by the owner
leaseRefreshPeriod = 20 * time.Second
// Max records to read per Kinesis getRecords() call
Expand All @@ -136,7 +138,7 @@ func NewKCL(cfg baker.InputParams) (baker.Input, error) {
WithMaxRecords(maxRecords).
WithMaxLeasesForWorker(dcfg.MaxShards).
WithShardSyncIntervalMillis(int(dcfg.ShardSync / time.Millisecond)).
WithFailoverTimeMillis(int(leaseDuration / time.Millisecond)).
WithFailoverTimeMillis(int(dcfg.LeaseDuration / time.Millisecond)).
WithLeaseRefreshPeriodMillis(int(leaseRefreshPeriod / time.Millisecond)).
WithInitialPositionInStream(dcfg.initialPosition).
WithMonitoringService(&kcl.metrics).
Expand Down

0 comments on commit 9a458a0

Please sign in to comment.