Skip to content

Commit

Permalink
Add dynamic config to limit multi cursor predicate size (#6458)
Browse files Browse the repository at this point in the history
## What changed?

Add dynamic config to limit multi-cursor predicate size.
There are two separate configs:
- `history.queueMaxPredicateSize` for all history queues except for the
outbound queue - set to unlimited by default to avoid changing behavior
- `history.outboundQueueMaxPredicateSize` for the outbound queue - set
to 10K by default

## Why?

Limit the size of the shard info record. In stress test scenarios we've
seen this record grow to over 10MB.

## How did you test it?

Added unit tests.

## Is hotfix candidate?

This should be applied to any server deployments enabling Nexus
workloads.
  • Loading branch information
bergundy authored Aug 29, 2024
1 parent ec4023b commit 1ed0b3e
Show file tree
Hide file tree
Showing 25 changed files with 260 additions and 25 deletions.
19 changes: 19 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,15 @@ limit is in addition to queuePendingTaskCriticalCount which controls when to unl
prevent loading new tasks. Ideally this max count limit should not be hit and task unloading should happen once critical
count is exceeded. But since queue action is async, we need this hard limit.
NOTE: The outbound queue has a separate configuration: outboundQueuePendingTaskMaxCount.
`,
)
QueueMaxPredicateSize = NewGlobalIntSetting(
"history.queueMaxPredicateSize",
0,
`The max size of the multi-cursor predicate structure stored in the shard info record. 0 is considered
unlimited. When the predicate size is surpassed for a given scope, the predicate is converted to a universal predicate,
which causes all tasks in the scope's range to eventually be reprocessed without applying any filtering logic.
NOTE: The outbound queue has a separate configuration: outboundQueueMaxPredicateSize.
`,
)

Expand Down Expand Up @@ -1657,6 +1666,16 @@ critical count is exceeded. But since queue action is async, we need this hard l
9000,
`Max number of pending tasks in the outbound queue before triggering slice splitting and unloading.`,
)
OutboundQueueMaxPredicateSize = NewGlobalIntSetting(
"history.outboundQueueMaxPredicateSize",
10*1024,
`The max size of the multi-cursor predicate structure stored in the shard info record for the outbound queue. 0
is considered unlimited. When the predicate size is surpassed for a given scope, the predicate is converted to a
universal predicate, which causes all tasks in the scope's range to eventually be reprocessed without applying any
filtering logic.
`,
)

OutboundProcessorMaxPollRPS = NewGlobalIntSetting(
"history.outboundProcessorMaxPollRPS",
20,
Expand Down
9 changes: 9 additions & 0 deletions common/predicates/and.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ func (a *AndImpl[T]) Equals(
return predicatesEqual(a.Predicates, andPredicate.Predicates)
}

func (a *AndImpl[T]) Size() int {
size := EmptyPredicateProtoSize
for _, p := range a.Predicates {
size += p.Size()
}

return size
}

// appendPredicates adds new predicates to the slice of existing predicates
// dropping any duplicated predicates where duplication is determined by Predicate.Equals.
// appendPredicates assumes that there's no duplication in new predicates.
Expand Down
8 changes: 8 additions & 0 deletions common/predicates/and_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,11 @@ func (s *andSuite) TestAnd_Equals() {
s.False(p.Equals(Empty[int]()))
s.False(p.Equals(Universal[int]()))
}

func (s *andSuite) TestAnd_Size() {
p1 := newTestPredicate(1, 2, 3)
p2 := newTestPredicate(2, 3, 4)
p := And(p1, p2)

s.Equal(52, p.Size()) // 8 bytes per int64 * 6 ints + 4 bytes of overhead.
}
16 changes: 16 additions & 0 deletions common/predicates/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,22 @@

package predicates

import (
"go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
)

type (
EmptyImpl[T any] struct{}
)

var EmptyPredicateProtoSize = (&persistencespb.Predicate{
PredicateType: enums.PREDICATE_TYPE_EMPTY,
Attributes: &persistencespb.Predicate_EmptyPredicateAttributes{
EmptyPredicateAttributes: &persistencespb.EmptyPredicateAttributes{},
},
}).Size()

func Empty[T any]() Predicate[T] {
return &EmptyImpl[T]{}
}
Expand All @@ -42,3 +54,7 @@ func (n *EmptyImpl[T]) Equals(
_, ok := predicate.(*EmptyImpl[T])
return ok
}

func (*EmptyImpl[T]) Size() int {
return EmptyPredicateProtoSize
}
4 changes: 4 additions & 0 deletions common/predicates/not.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ func (n *NotImpl[T]) Equals(
}
return n.Predicate.Equals(notPredicate.Predicate)
}

func (n *NotImpl[T]) Size() int {
return n.Predicate.Size() + EmptyPredicateProtoSize
}
7 changes: 7 additions & 0 deletions common/predicates/not_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,10 @@ func (s *notSuite) TestNot_Equals() {
s.False(p.Equals(Empty[int]()))
s.False(p.Equals(Universal[int]()))
}

func (s *notSuite) TestNot_Size() {
p1 := newTestPredicate(1, 2, 3)
p := Not(p1)

s.Equal(28, p.Size()) // 8 bytes per int64 * 3 ints + 4 bytes of overhead.
}
9 changes: 9 additions & 0 deletions common/predicates/or.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,12 @@ func (o *OrImpl[T]) Equals(

return predicatesEqual(o.Predicates, orPredicate.Predicates)
}

func (o *OrImpl[T]) Size() int {
size := EmptyPredicateProtoSize
for _, p := range o.Predicates {
size += p.Size()
}

return size
}
8 changes: 8 additions & 0 deletions common/predicates/or_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,11 @@ func (s *orSuite) TestOr_Equals() {
s.False(p.Equals(Empty[int]()))
s.False(p.Equals(Universal[int]()))
}

func (s *orSuite) TestOr_Size() {
p1 := newTestPredicate(1, 2, 3)
p2 := newTestPredicate(2, 3, 4)
p := Or(p1, p2)

s.Equal(52, p.Size()) // 8 bytes per int64 * 6 ints + 4 bytes of overhead.
}
5 changes: 5 additions & 0 deletions common/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,10 @@ type (
// two predicates are mathmatically equivalent, Equals may still
// return false.
Equals(Predicate[T]) bool

// Size gets the estimated size in bytes of this predicate.
// Implementation may keep this estimate rough and mostly account for elements that may take up considerable
// space such as strings and slices.
Size() int
}
)
5 changes: 5 additions & 0 deletions common/predicates/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package predicates

import (
"maps"
"strconv"
)

var _ Predicate[int] = (*testPredicate)(nil)
Expand Down Expand Up @@ -59,3 +60,7 @@ func (p *testPredicate) Equals(predicate Predicate[int]) bool {

return maps.Equal(p.nums, testPrediate.nums)
}

func (p *testPredicate) Size() int {
return strconv.IntSize / 8 * len(p.nums)
}
4 changes: 4 additions & 0 deletions common/predicates/universal.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ func (a *UniversalImpl[T]) Equals(
_, ok := predicate.(*UniversalImpl[T])
return ok
}

func (*UniversalImpl[T]) Size() int {
return EmptyPredicateProtoSize
}
1 change: 1 addition & 0 deletions service/history/archival_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
BatchSize: f.Config.ArchivalTaskBatchSize,
MaxPendingTasksCount: f.Config.QueuePendingTaskMaxCount,
PollBackoffInterval: f.Config.ArchivalProcessorPollBackoffInterval,
MaxPredicateSize: f.Config.QueueMaxPredicateSize,
},
MonitorOptions: queues.MonitorOptions{
PendingTasksCriticalCount: f.Config.QueuePendingTaskCriticalCount,
Expand Down
4 changes: 4 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type Config struct {
QueueReaderStuckCriticalAttempts dynamicconfig.IntPropertyFn
QueueCriticalSlicesCount dynamicconfig.IntPropertyFn
QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn
QueueMaxPredicateSize dynamicconfig.IntPropertyFn

TaskDLQEnabled dynamicconfig.BoolPropertyFn
TaskDLQUnexpectedErrorAttempts dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -169,6 +170,7 @@ type Config struct {
OutboundProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
OutboundQueuePendingTaskCriticalCount dynamicconfig.IntPropertyFn
OutboundQueuePendingTaskMaxCount dynamicconfig.IntPropertyFn
OutboundQueueMaxPredicateSize dynamicconfig.IntPropertyFn
OutboundQueueMaxReaderCount dynamicconfig.IntPropertyFn
OutboundQueueGroupLimiterBufferSize dynamicconfig.IntPropertyFnWithDestinationFilter
OutboundQueueGroupLimiterConcurrency dynamicconfig.IntPropertyFnWithDestinationFilter
Expand Down Expand Up @@ -445,6 +447,7 @@ func NewConfig(
QueueReaderStuckCriticalAttempts: dynamicconfig.QueueReaderStuckCriticalAttempts.Get(dc),
QueueCriticalSlicesCount: dynamicconfig.QueueCriticalSlicesCount.Get(dc),
QueuePendingTaskMaxCount: dynamicconfig.QueuePendingTaskMaxCount.Get(dc),
QueueMaxPredicateSize: dynamicconfig.QueueMaxPredicateSize.Get(dc),

TaskDLQEnabled: dynamicconfig.HistoryTaskDLQEnabled.Get(dc),
TaskDLQUnexpectedErrorAttempts: dynamicconfig.HistoryTaskDLQUnexpectedErrorAttempts.Get(dc),
Expand Down Expand Up @@ -500,6 +503,7 @@ func NewConfig(
OutboundProcessorPollBackoffInterval: dynamicconfig.OutboundProcessorPollBackoffInterval.Get(dc),
OutboundQueuePendingTaskCriticalCount: dynamicconfig.OutboundQueuePendingTaskCriticalCount.Get(dc),
OutboundQueuePendingTaskMaxCount: dynamicconfig.OutboundQueuePendingTaskMaxCount.Get(dc),
OutboundQueueMaxPredicateSize: dynamicconfig.OutboundQueueMaxPredicateSize.Get(dc),
OutboundQueueMaxReaderCount: dynamicconfig.OutboundQueueMaxReaderCount.Get(dc),
OutboundQueueGroupLimiterBufferSize: dynamicconfig.OutboundQueueGroupLimiterBufferSize.Get(dc),
OutboundQueueGroupLimiterConcurrency: dynamicconfig.OutboundQueueGroupLimiterConcurrency.Get(dc),
Expand Down
1 change: 1 addition & 0 deletions service/history/outbound_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func (f *outboundQueueFactory) CreateQueue(
BatchSize: f.Config.OutboundTaskBatchSize,
MaxPendingTasksCount: f.Config.OutboundQueuePendingTaskMaxCount,
PollBackoffInterval: f.Config.OutboundProcessorPollBackoffInterval,
MaxPredicateSize: f.Config.OutboundQueueMaxPredicateSize,
},
MonitorOptions: queues.MonitorOptions{
PendingTasksCriticalCount: f.Config.OutboundQueuePendingTaskCriticalCount,
Expand Down
3 changes: 2 additions & 1 deletion service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func newQueueBase(

slices := make([]Slice, 0, len(scopes))
for _, scope := range scopes {
slices = append(slices, NewSlice(paginationFnProvider, executableFactory, monitor, scope, grouper))
slices = append(slices, NewSlice(paginationFnProvider, executableFactory, monitor, scope, grouper, options.ReaderOptions.MaxPredicateSize))
}
readerGroup.NewReader(readerID, slices...)

Expand Down Expand Up @@ -281,6 +281,7 @@ func (p *queueBase) processNewRange() {
p.monitor,
newReadScope,
p.grouper,
p.options.ReaderOptions.MaxPredicateSize,
))
}

Expand Down
1 change: 1 addition & 0 deletions service/history/queues/queue_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var testQueueOptions = &Options{
BatchSize: dynamicconfig.GetIntPropertyFn(10),
MaxPendingTasksCount: dynamicconfig.GetIntPropertyFn(100),
PollBackoffInterval: dynamicconfig.GetDurationPropertyFn(200 * time.Millisecond),
MaxPredicateSize: dynamicconfig.GetIntPropertyFn(0),
},
MonitorOptions: MonitorOptions{
PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000),
Expand Down
1 change: 1 addition & 0 deletions service/history/queues/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type (
BatchSize dynamicconfig.IntPropertyFn
MaxPendingTasksCount dynamicconfig.IntPropertyFn
PollBackoffInterval dynamicconfig.DurationPropertyFn
MaxPredicateSize dynamicconfig.IntPropertyFn
}

SliceIterator func(s Slice)
Expand Down
7 changes: 4 additions & 3 deletions service/history/queues/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *readerSuite) TestMergeSlices() {
incomingScopes := NewRandomScopes(rand.Intn(10))
incomingSlices := make([]Slice, 0, len(incomingScopes))
for _, incomingScope := range incomingScopes {
incomingSlices = append(incomingSlices, NewSlice(nil, s.executableFactory, s.monitor, incomingScope, GrouperNamespaceID{}))
incomingSlices = append(incomingSlices, NewSlice(nil, s.executableFactory, s.monitor, incomingScope, GrouperNamespaceID{}, noPredicateSizeLimit))
}

reader.MergeSlices(incomingSlices...)
Expand All @@ -217,7 +217,7 @@ func (s *readerSuite) TestAppendSlices() {
incomingScopes := scopes[totalScopes/2:]
incomingSlices := make([]Slice, 0, len(incomingScopes))
for _, incomingScope := range incomingScopes {
incomingSlices = append(incomingSlices, NewSlice(nil, s.executableFactory, s.monitor, incomingScope, GrouperNamespaceID{}))
incomingSlices = append(incomingSlices, NewSlice(nil, s.executableFactory, s.monitor, incomingScope, GrouperNamespaceID{}, noPredicateSizeLimit))
}

reader.AppendSlices(incomingSlices...)
Expand Down Expand Up @@ -501,7 +501,7 @@ func (s *readerSuite) newTestReader(
) *ReaderImpl {
slices := make([]Slice, 0, len(scopes))
for _, scope := range scopes {
slice := NewSlice(paginationFnProvider, s.executableFactory, s.monitor, scope, GrouperNamespaceID{})
slice := NewSlice(paginationFnProvider, s.executableFactory, s.monitor, scope, GrouperNamespaceID{}, noPredicateSizeLimit)
slices = append(slices, slice)
}

Expand All @@ -512,6 +512,7 @@ func (s *readerSuite) newTestReader(
BatchSize: dynamicconfig.GetIntPropertyFn(10),
MaxPendingTasksCount: dynamicconfig.GetIntPropertyFn(100),
PollBackoffInterval: dynamicconfig.GetDurationPropertyFn(200 * time.Millisecond),
MaxPredicateSize: dynamicconfig.GetIntPropertyFn(10),
},
s.mockScheduler,
s.mockRescheduler,
Expand Down
30 changes: 25 additions & 5 deletions service/history/queues/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package queues
import (
"fmt"

"go.temporal.io/server/common/predicates"
"go.temporal.io/server/service/history/tasks"
expmaps "golang.org/x/exp/maps"
)
Expand All @@ -47,8 +48,8 @@ type (
SplitByRange(tasks.Key) (left Slice, right Slice)
SplitByPredicate(tasks.Predicate) (pass Slice, fail Slice)
CanMergeWithSlice(Slice) bool
MergeWithSlice(Slice) []Slice
CompactWithSlice(Slice) Slice
MergeWithSlice(slice Slice) []Slice
CompactWithSlice(slice Slice) Slice
ShrinkScope() int
SelectTasks(readerID int64, batchSize int) ([]Executable, error)
MoreTasks() bool
Expand All @@ -71,6 +72,8 @@ type (

*executableTracker
monitor Monitor

maxPredicateSizeFn func() int
}
)

Expand All @@ -80,17 +83,21 @@ func NewSlice(
monitor Monitor,
scope Scope,
grouper Grouper,
maxPredicateSizeFn func() int,
) *SliceImpl {
return &SliceImpl{
s := &SliceImpl{
paginationFnProvider: paginationFnProvider,
executableFactory: executableFactory,
scope: scope,
iterators: []Iterator{
NewIterator(paginationFnProvider, scope.Range),
},
executableTracker: newExecutableTracker(grouper),
monitor: monitor,
executableTracker: newExecutableTracker(grouper),
monitor: monitor,
maxPredicateSizeFn: maxPredicateSizeFn,
}
s.ensurePredicateSizeLimit()
return s
}

func (s *SliceImpl) Scope() Scope {
Expand Down Expand Up @@ -359,6 +366,7 @@ func (s *SliceImpl) shrinkPredicate() {
}

s.scope.Predicate = s.grouper.Predicate(expmaps.Keys(pendingPerKey))
s.ensurePredicateSizeLimit()
}

func (s *SliceImpl) SelectTasks(readerID int64, batchSize int) ([]Executable, error) {
Expand Down Expand Up @@ -459,12 +467,24 @@ func (s *SliceImpl) newSlice(
iterators: iterators,
executableTracker: tracker,
monitor: s.monitor,
maxPredicateSizeFn: s.maxPredicateSizeFn,
}
slice.ensurePredicateSizeLimit()
slice.monitor.SetSlicePendingTaskCount(slice, len(slice.executableTracker.pendingExecutables))

return slice
}

func (s *SliceImpl) ensurePredicateSizeLimit() {
maxPredicateSize := s.maxPredicateSizeFn()
// 0 == unlimited
if maxPredicateSize > 0 && s.scope.Predicate.Size() > maxPredicateSize {
// Due to the limitations in predicate merging logic, the predicate size can easily grow unbounded.
// The simplest mitigation is to stop merging and replace with the univeral predicate.
s.scope.Predicate = predicates.Universal[tasks.Task]()
}
}

func appendMergedSlice(
mergedSlices []Slice,
s *SliceImpl,
Expand Down
Loading

0 comments on commit 1ed0b3e

Please sign in to comment.