From 89dcc843e4807c8afebc20a6b68b9fc9e9ce711d Mon Sep 17 00:00:00 2001
From: Forrest <30576607+fspmarshall@users.noreply.github.com>
Date: Sun, 25 Feb 2024 09:41:53 -0800
Subject: [PATCH] add sortcache helper (#38587)
---
lib/utils/sortcache/helpers.go | 42 ++
lib/utils/sortcache/sortcache.go | 262 +++++++++++++
lib/utils/sortcache/sortcache_test.go | 543 ++++++++++++++++++++++++++
3 files changed, 847 insertions(+)
create mode 100644 lib/utils/sortcache/helpers.go
create mode 100644 lib/utils/sortcache/sortcache.go
create mode 100644 lib/utils/sortcache/sortcache_test.go
diff --git a/lib/utils/sortcache/helpers.go b/lib/utils/sortcache/helpers.go
new file mode 100644
index 0000000000000..9e18aab6de121
--- /dev/null
+++ b/lib/utils/sortcache/helpers.go
@@ -0,0 +1,42 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package sortcache
+
+// NextKey returns the lexographically next key of equivalent length. This can be used to find the higher
+// bound for a range whose lower bound is known (e.g. keys of the form `"prefix/suffix"` will all fall between
+// `"prefix/"` and `NextKey("prefix/")`).
+func NextKey(key string) string {
+ end := []byte(key)
+ for i := len(end) - 1; i >= 0; i-- {
+ if end[i] < 0xff {
+ end[i] = end[i] + 1
+ end = end[:i+1]
+ return string(end)
+ }
+ }
+
+ // key is already the lexographically last value for this length and therefore there is no
+ // true 'next' value. using a prefix of this form is somewhat nonsensical and unlikely to
+ // come up in real-world usage. for the purposes of this helper, we treat this scenario as
+ // something analogous to a zero-length slice indexing (`s[:0]`, `s[len(s):]`, etc), and
+ // return the key unmodified. A valid alternative might be to append `0xff`, making the
+ // range effectively be over all suffixes of key whose leading character were less than
+ // `0xff`, but that is arguably more confusing since ranges would return some but not all
+ // of the valid suffixes.
+ return key
+}
diff --git a/lib/utils/sortcache/sortcache.go b/lib/utils/sortcache/sortcache.go
new file mode 100644
index 0000000000000..38a9c7422220a
--- /dev/null
+++ b/lib/utils/sortcache/sortcache.go
@@ -0,0 +1,262 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package sortcache
+
+import (
+ "sync"
+
+ "github.com/google/btree"
+)
+
+// Config configures a [SortCache].
+type Config[T any] struct {
+ // Indexes is a map of index name to key constructor, and defines the set of indexes
+ // upon which lookups can be made. Values that overlap in *any* indexes are treated
+ // as unique. A Put operation for a value that matches an existing value on *any* index
+ // results in the existing value being evicted. This means that special care must be taken
+ // to ensure that values that are not meant to overlap one another do not produce identical
+ // keys across any index. The simplest way to achieve this is to pick one index to be truly
+ // unique, and then use that value as a suffix for all other indexes. For example, if one wanted
+ // to store node resources in a [SortCache], one might create a ServerID index, and then use
+ // ServerID as a suffix for all other indexes (e.g. hostname). The Indexes mapping and its
+ // members *must* be treated as immutable once passed to [New]. Since there is no default index,
+ // at least one index must be supplied for [SortCache] to be usable.
+ Indexes map[string]func(T) string
+}
+
+// SortCache is a helper for storing values that must be sortable across
+// multiple indexes simultaneously. It has an internal read-write lock
+// and is safe for concurrent use, but the supplied configuration must not
+// be modified, and it is generally best to never modify stored resources.
+type SortCache[T any] struct {
+ rw sync.RWMutex
+ indexes map[string]func(T) string
+ trees map[string]*btree.BTreeG[entry]
+ values map[uint64]T
+ counter uint64
+}
+
+type entry struct {
+ key string
+ ref uint64
+}
+
+// New sets up a new [SortCache] based on the provided configuration.
+func New[T any](cfg Config[T]) *SortCache[T] {
+ const (
+ // bTreeDegree of 8 is standard across most of the teleport codebase
+ bTreeDegree = 8
+ )
+
+ trees := make(map[string]*btree.BTreeG[entry], len(cfg.Indexes))
+
+ for index := range cfg.Indexes {
+ trees[index] = btree.NewG(bTreeDegree, func(a, b entry) bool {
+ return a.key < b.key
+ })
+ }
+
+ return &SortCache[T]{
+ indexes: cfg.Indexes,
+ trees: trees,
+ values: make(map[uint64]T),
+ }
+}
+
+// Get loads the value associated with the given index and key. ok will be false if either
+// the index does not exist or no value maps to the provided key on that index. Note that
+// mutating a value such that any of its index keys change is not permissible and will result
+// in permanently bad state. To avoid this, any implementation that might mutate returned
+// values must clone them.
+func (c *SortCache[T]) Get(index, key string) (value T, ok bool) {
+ c.rw.RLock()
+ defer c.rw.RUnlock()
+
+ ref, ok := c.lookup(index, key)
+ if !ok {
+ return value, false
+ }
+
+ return c.values[ref], true
+}
+
+// HasIndex checks if the specified index is present in this cache.
+func (c *SortCache[T]) HasIndex(name string) bool {
+ // index map is treated as immutable once created, so no lock is required.
+ _, ok := c.indexes[name]
+ return ok
+}
+
+// KeyOf gets the key of the supplied value on the given index.
+func (c *SortCache[T]) KeyOf(index string, value T) string {
+ // index map is treated as immutable once created, so no lock is required.
+ fn, ok := c.indexes[index]
+ if !ok {
+ return ""
+ }
+ return fn(value)
+}
+
+// lookup is an internal helper that finds the unique reference id for a value given a specific
+// index/key. Must be called either under read or write lock.
+func (c *SortCache[T]) lookup(index, key string) (ref uint64, ok bool) {
+ tree, exists := c.trees[index]
+ if !exists {
+ return 0, false
+ }
+
+ entry, exists := tree.Get(entry{key: key})
+ if !exists {
+ return 0, false
+ }
+
+ return entry.ref, true
+}
+
+// Put inserts a value into the sort cache, removing any existing values that collide with it. Since all indexes
+// are required to be unique, a single Put can end up evicting up to N existing values where N is the number of
+// indexes if it happens to collide with a different value on each index. implementations that expect evictions
+// to always either be zero or one (e.g. caches of resources with unique IDs) should check the returned eviction
+// count to help detect bugs.
+func (c *SortCache[T]) Put(value T) (evicted int) {
+ c.rw.Lock()
+ defer c.rw.Unlock()
+
+ c.counter++
+ c.values[c.counter] = value
+
+ for index, fn := range c.indexes {
+ key := fn(value)
+ // ensure previous entry in this index is deleted if it exists. note that we are fully
+ // deleting it, not just overwriting the reference for this index.
+ if prev, ok := c.lookup(index, key); ok {
+ c.deleteValue(prev)
+ evicted++
+ }
+ c.trees[index].ReplaceOrInsert(entry{
+ key: key,
+ ref: c.counter,
+ })
+ }
+ return
+}
+
+// Delete deletes the value associated with the specified index/key if one exists.
+func (c *SortCache[T]) Delete(index, key string) {
+ c.rw.Lock()
+ defer c.rw.Unlock()
+
+ ref, ok := c.lookup(index, key)
+ if !ok {
+ return
+ }
+
+ c.deleteValue(ref)
+}
+
+// deleteValue is an internal helper that completely deletes the value associated with the specified
+// unique reference id, including removing all of its associated index entries.
+func (c *SortCache[T]) deleteValue(ref uint64) {
+ value, ok := c.values[ref]
+ if !ok {
+ return
+ }
+ delete(c.values, ref)
+
+ for idx, fn := range c.indexes {
+ c.trees[idx].Delete(entry{key: fn(value)})
+ }
+}
+
+// Ascend iterates the specified range from least to greatest. iteration is terminated early if the
+// supplied closure returns false. if this method is being used to read a range, it is strongly recommended
+// that all values retained be cloned. any mutation that results in changing a value's index keys will put
+// the sort cache into a permanently bad state. empty strings are treated as "open" bounds. passing an empty
+// string for both the start and stop bounds iterates all values.
+//
+// NOTE: ascending ranges are equivalent to the default range logic used across most of teleport, so
+// common helpers like `backend.RangeEnd` will function as expected with this method.
+func (c *SortCache[T]) Ascend(index, start, stop string, iterator func(T) bool) {
+ c.rw.RLock()
+ defer c.rw.RUnlock()
+
+ tree, ok := c.trees[index]
+ if !ok {
+ return
+ }
+
+ fn := func(ent entry) bool {
+ return iterator(c.values[ent.ref])
+ }
+
+ // select the appropriate ascend variant based on wether or not
+ // start/stop points were specified.
+ switch {
+ case start == "" && stop == "":
+ tree.Ascend(fn)
+ case start == "":
+ tree.AscendLessThan(entry{key: stop}, fn)
+ case stop == "":
+ tree.AscendGreaterOrEqual(entry{key: start}, fn)
+ default:
+ tree.AscendRange(entry{key: start}, entry{key: stop}, fn)
+ }
+}
+
+// Descend iterates the specified range from greatest to least. iteration is terminated early if the
+// supplied closure returns false. if this method is being used to read a range, it is strongly recommended
+// that all values retained be cloned. any mutation that results in changing a value's index keys will put
+// the sort cache into a permanently bad state. empty strings are treated as "open" bounds. passing an empty
+// string for both the start and stop bounds iterates all values.
+//
+// NOTE: descending sort order is the *opposite* of what most teleport range-based logic uses, meaning that
+// many common patterns need to be inverted when using this method (e.g. `backend.RangeEnd` actually gives
+// you the start position for descending ranges).
+func (c *SortCache[T]) Descend(index, start, stop string, iterator func(T) bool) {
+ c.rw.RLock()
+ defer c.rw.RUnlock()
+
+ tree, ok := c.trees[index]
+ if !ok {
+ return
+ }
+
+ fn := func(ent entry) bool {
+ return iterator(c.values[ent.ref])
+ }
+
+ // select the appropriate descend variant based on wether or not
+ // start/stop points were specified.
+ switch {
+ case start == "" && stop == "":
+ tree.Descend(fn)
+ case start == "":
+ tree.DescendGreaterThan(entry{key: stop}, fn)
+ case stop == "":
+ tree.DescendLessOrEqual(entry{key: start}, fn)
+ default:
+ tree.DescendRange(entry{key: start}, entry{key: stop}, fn)
+ }
+}
+
+// Len returns the number of values currently stored.
+func (c *SortCache[T]) Len() int {
+ c.rw.RLock()
+ defer c.rw.RUnlock()
+ return len(c.values)
+}
diff --git a/lib/utils/sortcache/sortcache_test.go b/lib/utils/sortcache/sortcache_test.go
new file mode 100644
index 0000000000000..adf50f6e79e41
--- /dev/null
+++ b/lib/utils/sortcache/sortcache_test.go
@@ -0,0 +1,543 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package sortcache
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/require"
+)
+
+const (
+ Kind = "Kind"
+ Name = "Name"
+)
+
+type resource struct {
+ kind string
+ name string
+}
+
+// TestBasics verifies basic expected behaviors for a SortCache.
+func TestBasics(t *testing.T) {
+ t.Parallel()
+
+ cache := New(Config[resource]{
+ Indexes: map[string]func(resource) string{
+ Kind: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.kind, r.name)
+ },
+ Name: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.name, r.kind)
+ },
+ },
+ })
+
+ // verify index checking method
+ require.True(t, cache.HasIndex(Name))
+ require.False(t, cache.HasIndex("Foo"))
+
+ // set up some test resources
+ rscs := []resource{
+ {"node", "001"},
+ {"node", "002"},
+ {"kube", "001"},
+ {"kube", "002"},
+ }
+
+ for _, rsc := range rscs {
+ require.Equal(t, 0, cache.Put(rsc))
+ }
+
+ require.Equal(t, 4, cache.Len())
+
+ // perform some basic lookups
+ r, ok := cache.Get(Kind, "node/001")
+ require.True(t, ok)
+ require.Equal(t, resource{"node", "001"}, r)
+
+ r, ok = cache.Get(Name, "002/kube")
+ require.True(t, ok)
+ require.Equal(t, resource{"kube", "002"}, r)
+
+ // check ascending iteration
+ var out []resource
+ cache.Ascend(Kind, "kube/", NextKey("kube/"), func(r resource) bool {
+ out = append(out, r)
+ return true
+ })
+
+ require.Len(t, out, 2)
+ require.Equal(t, []resource{
+ {"kube", "001"},
+ {"kube", "002"},
+ }, out)
+
+ // check descending iteration
+ out = nil
+ cache.Descend(Kind, NextKey("kube/"), "kube/", func(r resource) bool {
+ out = append(out, r)
+ return true
+ })
+
+ require.Len(t, out, 2)
+ require.Equal(t, []resource{
+ {"kube", "002"},
+ {"kube", "001"},
+ }, out)
+
+ // check removal
+ cache.Delete(Kind, "kube/002")
+ require.Equal(t, 3, cache.Len())
+ _, ok = cache.Get(Kind, "kube/002")
+ require.False(t, ok)
+}
+
+func TestOpenBounds(t *testing.T) {
+ t.Parallel()
+
+ cache := New(Config[resource]{
+ Indexes: map[string]func(resource) string{
+ Kind: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.kind, r.name)
+ },
+ Name: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.name, r.kind)
+ },
+ },
+ })
+
+ // set up some test resources
+ rscs := []resource{
+ {"node", "001"},
+ {"node", "002"},
+ {"kube", "001"},
+ {"kube", "002"},
+ }
+
+ for _, rsc := range rscs {
+ require.Equal(t, 0, cache.Put(rsc))
+ }
+
+ var out []resource
+ iterator := func(r resource) bool {
+ out = append(out, r)
+ return true
+ }
+
+ // verify fully open ascend
+ cache.Ascend(Name, "", "", iterator)
+ require.Equal(t, []resource{
+ {"kube", "001"},
+ {"node", "001"},
+ {"kube", "002"},
+ {"node", "002"},
+ }, out)
+
+ // verify fully open descend
+ out = nil
+ cache.Descend(Name, "", "", iterator)
+ require.Equal(t, []resource{
+ {"node", "002"},
+ {"kube", "002"},
+ {"node", "001"},
+ {"kube", "001"},
+ }, out)
+
+ // verify open-ended ascend
+ out = nil
+ cache.Ascend(Name, "002/kube", "", iterator)
+ require.Equal(t, []resource{
+ {"kube", "002"},
+ {"node", "002"},
+ }, out)
+
+ // verify open-ended descend
+ out = nil
+ cache.Descend(Name, "001/node", "", iterator)
+ require.Equal(t, []resource{
+ {"node", "001"},
+ {"kube", "001"},
+ }, out)
+
+ // verify open-start ascend
+ out = nil
+ cache.Ascend(Name, "", "002/kube", iterator)
+ require.Equal(t, []resource{
+ {"kube", "001"},
+ {"node", "001"},
+ }, out)
+
+ // verify open-start descend
+ out = nil
+ cache.Descend(Name, "", "001/node", iterator)
+ require.Equal(t, []resource{
+ {"node", "002"},
+ {"kube", "002"},
+ }, out)
+}
+
+// TestAscendingPagination verifies expected behavior using a basic pagination setup.
+func TestAscendingPagination(t *testing.T) {
+ const (
+ totalResources = 100_000
+ pageSize = 101
+ )
+
+ t.Parallel()
+
+ cache := New(Config[resource]{
+ Indexes: map[string]func(resource) string{
+ Kind: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.kind, r.name)
+ },
+ Name: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.name, r.kind)
+ },
+ },
+ })
+
+ // insert a bunch of test resources
+ for i := 0; i < totalResources; i++ {
+ require.Equal(t, 0, cache.Put(resource{"node", uuid.New().String()}))
+ }
+
+ // create a paginated getter that accepts optional start key and returns a non-empty next key
+ // if additional resources exist.
+ nextPage := func(start string) (page []resource, next string) {
+ page = make([]resource, 0, pageSize+1)
+
+ cache.Ascend(Kind, start, "", func(r resource) bool {
+ page = append(page, r)
+ return len(page) <= pageSize
+ })
+
+ if len(page) > pageSize {
+ next = cache.KeyOf(Kind, page[pageSize])
+ page = page[:pageSize]
+ }
+ return
+ }
+
+ // consume and aggregate pages from nextPage.
+ var out []resource
+ var k string
+ var n int
+ for {
+ n++
+ if n > totalResources {
+ require.FailNow(t, "too many iterations")
+ }
+ page, nk := nextPage(k)
+ if len(page) != pageSize {
+ require.Empty(t, nk)
+ }
+ out = append(out, page...)
+ if nk == "" {
+ break
+ }
+ k = nk
+ }
+
+ // verify that we got the expected number of resources
+ require.Len(t, out, totalResources)
+}
+
+// TestDescendingPagination verifies expected behavior using a basic pagination setup.
+func TestDescendingPagination(t *testing.T) {
+ const (
+ totalResources = 100_000
+ pageSize = 101
+ )
+
+ t.Parallel()
+
+ cache := New(Config[resource]{
+ Indexes: map[string]func(resource) string{
+ Kind: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.kind, r.name)
+ },
+ Name: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.name, r.kind)
+ },
+ },
+ })
+
+ // insert a bunch of test resources
+ for i := 0; i < totalResources; i++ {
+ require.Equal(t, 0, cache.Put(resource{"node", uuid.New().String()}))
+ }
+
+ // create a paginated getter that accepts optional start key and returns a non-empty next key
+ // if additional resources exist.
+ nextPage := func(start string) (page []resource, next string) {
+ page = make([]resource, 0, pageSize+1)
+
+ cache.Descend(Kind, start, "", func(r resource) bool {
+ page = append(page, r)
+ return len(page) <= pageSize
+ })
+
+ if len(page) > pageSize {
+ next = cache.KeyOf(Kind, page[pageSize])
+ page = page[:pageSize]
+ }
+
+ return
+ }
+
+ // consume and aggregate pages from nextPage.
+ var out []resource
+ var k string
+ var n int
+ for {
+ n++
+ if n > totalResources {
+ require.FailNow(t, "too many iterations")
+ }
+ page, nk := nextPage(k)
+ if len(page) != pageSize {
+ require.Empty(t, nk)
+ }
+ out = append(out, page...)
+ if nk == "" {
+ break
+ }
+ k = nk
+ }
+
+ // verify that we got the expected number of resources
+ require.Len(t, out, totalResources)
+}
+
+// TestOverlap verifies basic expected behavior when multiple resources map to the same
+// value on an index.
+func TestOverlap(t *testing.T) {
+ t.Parallel()
+
+ // set up indexes s.t. resources with different kinds can collide on the name
+ // index and resources with different names can collide on the kind index.
+ cache := New(Config[resource]{
+ Indexes: map[string]func(resource) string{
+ Kind: func(r resource) string {
+ return r.kind
+ },
+ Name: func(r resource) string {
+ return r.name
+ },
+ },
+ })
+
+ // set up test resources s.t. there is a collision on the name index
+ rscs := []resource{
+ {"node", "001"},
+ {"db", "002"},
+ {"kube", "001"},
+ }
+
+ var totalEvicted int
+ for _, rsc := range rscs {
+ totalEvicted += cache.Put(rsc)
+ }
+ require.Equal(t, 1, totalEvicted)
+
+ // expect one of the three resources that were inserted to be overwritten
+ require.Equal(t, 2, cache.Len())
+
+ // verify that the most recently inserted value for our test collision "won", overwriting the
+ // previous resource.
+ r, ok := cache.Get(Name, "001")
+ require.True(t, ok)
+ require.Equal(t, "kube", r.kind)
+
+ // verify that the preexisting value that wasn't part of the collision is preserved.
+ r, ok = cache.Get(Kind, "db")
+ require.True(t, ok)
+ require.Equal(t, "002", r.name)
+
+ // inserting a resource of a different kind, but with the same name as an existing resource, should
+ // cause the existing resource to be fully deleted, including in the indexes that are not being
+ // overwritten by the new value.
+ require.Equal(t, 1, cache.Put(resource{"desktop", "001"}))
+ _, ok = cache.Get(Kind, "kube")
+ require.False(t, ok)
+
+ require.Equal(t, 2, cache.Len())
+
+ // inserting a resource that collides with multiple existing resources should remove all of them.
+ require.Equal(t, 2, cache.Put(resource{"db", "001"}))
+ require.Equal(t, 1, cache.Len())
+}
+
+// TestNextKey asserts the basic behavior of the [NextKey] helper.
+func TestNextKey(t *testing.T) {
+ t.Parallel()
+
+ tts := []struct {
+ key, out string
+ }{
+ {
+ key: "nodes/",
+ out: "nodes0",
+ },
+ {
+ key: "a",
+ out: "b",
+ },
+ {
+ key: string([]byte{0x00, 0x00, 0x00}),
+ out: string([]byte{0x00, 0x00, 0x01}),
+ },
+ {
+ key: string([]byte{0xff, 0xff, 0xff}),
+ out: string([]byte{0xff, 0xff, 0xff}),
+ },
+ {
+ key: "",
+ out: "",
+ },
+ }
+
+ for _, tt := range tts {
+ require.Equal(t, tt.out, NextKey(tt.key), "NextKey(%q)", tt.key)
+ }
+}
+
+// BenchmarkSortCache attempts to benchmark reads under moderately high concurrent load.
+//
+// goos: linux
+// goarch: amd64
+// pkg: github.com/gravitational/teleport/lib/utils/sortcache
+// cpu: Intel(R) Xeon(R) CPU @ 2.80GHz
+// BenchmarkSortCache-4 12 250820820 ns/op
+func BenchmarkSortCache(b *testing.B) {
+ const (
+ concurrency = 100
+ resourcesPerKind = 50_000
+ )
+
+ // set up a basic cache configuration
+ cache := New(Config[resource]{
+ Indexes: map[string]func(resource) string{
+ Kind: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.kind, r.name)
+ },
+ Name: func(r resource) string {
+ return fmt.Sprintf("%s/%s", r.name, r.kind)
+ },
+ },
+ })
+
+ // set up some test resources we can use later to
+ // inject writes
+ r1 := resource{
+ kind: "node",
+ name: uuid.New().String(),
+ }
+
+ r2 := resource{
+ kind: "kube",
+ name: uuid.New().String(),
+ }
+
+ cache.Put(r1)
+ cache.Put(r2)
+
+ // seed cache with lots of additional resources to help simulate large reads
+ for i := 0; i < resourcesPerKind-1; i++ {
+ cache.Put(resource{
+ kind: "node",
+ name: uuid.New().String(),
+ })
+ cache.Put(resource{
+ kind: "kube",
+ name: uuid.New().String(),
+ })
+ }
+
+ // set up a background process to inject concurrent writes in a fairly
+ // tight loop (should roughly simulate the load generated by a background
+ // event stream injecting updates into a replica).
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go func() {
+ for {
+ if ctx.Err() != nil {
+ return
+ }
+ cache.Put(r1)
+ cache.Put(r2)
+ }
+ }()
+
+ // set up a bunch of background concurrent read operations to simulate load.
+ for i := 0; i < concurrency; i++ {
+ go func() {
+ var n int
+ buf := make([]resource, 0, resourcesPerKind)
+ for {
+ if ctx.Err() != nil {
+ return
+ }
+ buf = buf[:0]
+ n++
+ key := "node/"
+ if n%2 == 0 {
+ key = "kube/"
+ }
+
+ cache.Ascend(Kind, key, NextKey(key), func(r resource) bool {
+ buf = append(buf, r)
+ return true
+ })
+
+ if len(buf) != resourcesPerKind {
+ panic("benchmark is misconfigured")
+ }
+ }
+ }()
+ }
+
+ // actual benchmark gets performed against one singular read loop. the goal here being to
+ // figure out what a single reader would experience when trying to pull a large block of
+ // values from a SortCache that is under high concurrent load.
+ b.ResetTimer()
+
+ var n int
+ buf := make([]resource, 0, resourcesPerKind)
+ for i := 0; i < b.N; i++ {
+ buf = buf[:0]
+ n++
+ key := "node/"
+ if n%2 == 0 {
+ key = "kube/"
+ }
+
+ cache.Ascend(Kind, key, NextKey(key), func(r resource) bool {
+ buf = append(buf, r)
+ return true
+ })
+
+ if len(buf) != resourcesPerKind {
+ panic("benchmark is misconfigured")
+ }
+ }
+}