diff --git a/lib/utils/sortcache/helpers.go b/lib/utils/sortcache/helpers.go new file mode 100644 index 0000000000000..9e18aab6de121 --- /dev/null +++ b/lib/utils/sortcache/helpers.go @@ -0,0 +1,42 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package sortcache + +// NextKey returns the lexographically next key of equivalent length. This can be used to find the higher +// bound for a range whose lower bound is known (e.g. keys of the form `"prefix/suffix"` will all fall between +// `"prefix/"` and `NextKey("prefix/")`). +func NextKey(key string) string { + end := []byte(key) + for i := len(end) - 1; i >= 0; i-- { + if end[i] < 0xff { + end[i] = end[i] + 1 + end = end[:i+1] + return string(end) + } + } + + // key is already the lexographically last value for this length and therefore there is no + // true 'next' value. using a prefix of this form is somewhat nonsensical and unlikely to + // come up in real-world usage. for the purposes of this helper, we treat this scenario as + // something analogous to a zero-length slice indexing (`s[:0]`, `s[len(s):]`, etc), and + // return the key unmodified. A valid alternative might be to append `0xff`, making the + // range effectively be over all suffixes of key whose leading character were less than + // `0xff`, but that is arguably more confusing since ranges would return some but not all + // of the valid suffixes. + return key +} diff --git a/lib/utils/sortcache/sortcache.go b/lib/utils/sortcache/sortcache.go new file mode 100644 index 0000000000000..38a9c7422220a --- /dev/null +++ b/lib/utils/sortcache/sortcache.go @@ -0,0 +1,262 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package sortcache + +import ( + "sync" + + "github.com/google/btree" +) + +// Config configures a [SortCache]. +type Config[T any] struct { + // Indexes is a map of index name to key constructor, and defines the set of indexes + // upon which lookups can be made. Values that overlap in *any* indexes are treated + // as unique. A Put operation for a value that matches an existing value on *any* index + // results in the existing value being evicted. This means that special care must be taken + // to ensure that values that are not meant to overlap one another do not produce identical + // keys across any index. The simplest way to achieve this is to pick one index to be truly + // unique, and then use that value as a suffix for all other indexes. For example, if one wanted + // to store node resources in a [SortCache], one might create a ServerID index, and then use + // ServerID as a suffix for all other indexes (e.g. hostname). The Indexes mapping and its + // members *must* be treated as immutable once passed to [New]. Since there is no default index, + // at least one index must be supplied for [SortCache] to be usable. + Indexes map[string]func(T) string +} + +// SortCache is a helper for storing values that must be sortable across +// multiple indexes simultaneously. It has an internal read-write lock +// and is safe for concurrent use, but the supplied configuration must not +// be modified, and it is generally best to never modify stored resources. +type SortCache[T any] struct { + rw sync.RWMutex + indexes map[string]func(T) string + trees map[string]*btree.BTreeG[entry] + values map[uint64]T + counter uint64 +} + +type entry struct { + key string + ref uint64 +} + +// New sets up a new [SortCache] based on the provided configuration. +func New[T any](cfg Config[T]) *SortCache[T] { + const ( + // bTreeDegree of 8 is standard across most of the teleport codebase + bTreeDegree = 8 + ) + + trees := make(map[string]*btree.BTreeG[entry], len(cfg.Indexes)) + + for index := range cfg.Indexes { + trees[index] = btree.NewG(bTreeDegree, func(a, b entry) bool { + return a.key < b.key + }) + } + + return &SortCache[T]{ + indexes: cfg.Indexes, + trees: trees, + values: make(map[uint64]T), + } +} + +// Get loads the value associated with the given index and key. ok will be false if either +// the index does not exist or no value maps to the provided key on that index. Note that +// mutating a value such that any of its index keys change is not permissible and will result +// in permanently bad state. To avoid this, any implementation that might mutate returned +// values must clone them. +func (c *SortCache[T]) Get(index, key string) (value T, ok bool) { + c.rw.RLock() + defer c.rw.RUnlock() + + ref, ok := c.lookup(index, key) + if !ok { + return value, false + } + + return c.values[ref], true +} + +// HasIndex checks if the specified index is present in this cache. +func (c *SortCache[T]) HasIndex(name string) bool { + // index map is treated as immutable once created, so no lock is required. + _, ok := c.indexes[name] + return ok +} + +// KeyOf gets the key of the supplied value on the given index. +func (c *SortCache[T]) KeyOf(index string, value T) string { + // index map is treated as immutable once created, so no lock is required. + fn, ok := c.indexes[index] + if !ok { + return "" + } + return fn(value) +} + +// lookup is an internal helper that finds the unique reference id for a value given a specific +// index/key. Must be called either under read or write lock. +func (c *SortCache[T]) lookup(index, key string) (ref uint64, ok bool) { + tree, exists := c.trees[index] + if !exists { + return 0, false + } + + entry, exists := tree.Get(entry{key: key}) + if !exists { + return 0, false + } + + return entry.ref, true +} + +// Put inserts a value into the sort cache, removing any existing values that collide with it. Since all indexes +// are required to be unique, a single Put can end up evicting up to N existing values where N is the number of +// indexes if it happens to collide with a different value on each index. implementations that expect evictions +// to always either be zero or one (e.g. caches of resources with unique IDs) should check the returned eviction +// count to help detect bugs. +func (c *SortCache[T]) Put(value T) (evicted int) { + c.rw.Lock() + defer c.rw.Unlock() + + c.counter++ + c.values[c.counter] = value + + for index, fn := range c.indexes { + key := fn(value) + // ensure previous entry in this index is deleted if it exists. note that we are fully + // deleting it, not just overwriting the reference for this index. + if prev, ok := c.lookup(index, key); ok { + c.deleteValue(prev) + evicted++ + } + c.trees[index].ReplaceOrInsert(entry{ + key: key, + ref: c.counter, + }) + } + return +} + +// Delete deletes the value associated with the specified index/key if one exists. +func (c *SortCache[T]) Delete(index, key string) { + c.rw.Lock() + defer c.rw.Unlock() + + ref, ok := c.lookup(index, key) + if !ok { + return + } + + c.deleteValue(ref) +} + +// deleteValue is an internal helper that completely deletes the value associated with the specified +// unique reference id, including removing all of its associated index entries. +func (c *SortCache[T]) deleteValue(ref uint64) { + value, ok := c.values[ref] + if !ok { + return + } + delete(c.values, ref) + + for idx, fn := range c.indexes { + c.trees[idx].Delete(entry{key: fn(value)}) + } +} + +// Ascend iterates the specified range from least to greatest. iteration is terminated early if the +// supplied closure returns false. if this method is being used to read a range, it is strongly recommended +// that all values retained be cloned. any mutation that results in changing a value's index keys will put +// the sort cache into a permanently bad state. empty strings are treated as "open" bounds. passing an empty +// string for both the start and stop bounds iterates all values. +// +// NOTE: ascending ranges are equivalent to the default range logic used across most of teleport, so +// common helpers like `backend.RangeEnd` will function as expected with this method. +func (c *SortCache[T]) Ascend(index, start, stop string, iterator func(T) bool) { + c.rw.RLock() + defer c.rw.RUnlock() + + tree, ok := c.trees[index] + if !ok { + return + } + + fn := func(ent entry) bool { + return iterator(c.values[ent.ref]) + } + + // select the appropriate ascend variant based on wether or not + // start/stop points were specified. + switch { + case start == "" && stop == "": + tree.Ascend(fn) + case start == "": + tree.AscendLessThan(entry{key: stop}, fn) + case stop == "": + tree.AscendGreaterOrEqual(entry{key: start}, fn) + default: + tree.AscendRange(entry{key: start}, entry{key: stop}, fn) + } +} + +// Descend iterates the specified range from greatest to least. iteration is terminated early if the +// supplied closure returns false. if this method is being used to read a range, it is strongly recommended +// that all values retained be cloned. any mutation that results in changing a value's index keys will put +// the sort cache into a permanently bad state. empty strings are treated as "open" bounds. passing an empty +// string for both the start and stop bounds iterates all values. +// +// NOTE: descending sort order is the *opposite* of what most teleport range-based logic uses, meaning that +// many common patterns need to be inverted when using this method (e.g. `backend.RangeEnd` actually gives +// you the start position for descending ranges). +func (c *SortCache[T]) Descend(index, start, stop string, iterator func(T) bool) { + c.rw.RLock() + defer c.rw.RUnlock() + + tree, ok := c.trees[index] + if !ok { + return + } + + fn := func(ent entry) bool { + return iterator(c.values[ent.ref]) + } + + // select the appropriate descend variant based on wether or not + // start/stop points were specified. + switch { + case start == "" && stop == "": + tree.Descend(fn) + case start == "": + tree.DescendGreaterThan(entry{key: stop}, fn) + case stop == "": + tree.DescendLessOrEqual(entry{key: start}, fn) + default: + tree.DescendRange(entry{key: start}, entry{key: stop}, fn) + } +} + +// Len returns the number of values currently stored. +func (c *SortCache[T]) Len() int { + c.rw.RLock() + defer c.rw.RUnlock() + return len(c.values) +} diff --git a/lib/utils/sortcache/sortcache_test.go b/lib/utils/sortcache/sortcache_test.go new file mode 100644 index 0000000000000..adf50f6e79e41 --- /dev/null +++ b/lib/utils/sortcache/sortcache_test.go @@ -0,0 +1,543 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package sortcache + +import ( + "context" + "fmt" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +const ( + Kind = "Kind" + Name = "Name" +) + +type resource struct { + kind string + name string +} + +// TestBasics verifies basic expected behaviors for a SortCache. +func TestBasics(t *testing.T) { + t.Parallel() + + cache := New(Config[resource]{ + Indexes: map[string]func(resource) string{ + Kind: func(r resource) string { + return fmt.Sprintf("%s/%s", r.kind, r.name) + }, + Name: func(r resource) string { + return fmt.Sprintf("%s/%s", r.name, r.kind) + }, + }, + }) + + // verify index checking method + require.True(t, cache.HasIndex(Name)) + require.False(t, cache.HasIndex("Foo")) + + // set up some test resources + rscs := []resource{ + {"node", "001"}, + {"node", "002"}, + {"kube", "001"}, + {"kube", "002"}, + } + + for _, rsc := range rscs { + require.Equal(t, 0, cache.Put(rsc)) + } + + require.Equal(t, 4, cache.Len()) + + // perform some basic lookups + r, ok := cache.Get(Kind, "node/001") + require.True(t, ok) + require.Equal(t, resource{"node", "001"}, r) + + r, ok = cache.Get(Name, "002/kube") + require.True(t, ok) + require.Equal(t, resource{"kube", "002"}, r) + + // check ascending iteration + var out []resource + cache.Ascend(Kind, "kube/", NextKey("kube/"), func(r resource) bool { + out = append(out, r) + return true + }) + + require.Len(t, out, 2) + require.Equal(t, []resource{ + {"kube", "001"}, + {"kube", "002"}, + }, out) + + // check descending iteration + out = nil + cache.Descend(Kind, NextKey("kube/"), "kube/", func(r resource) bool { + out = append(out, r) + return true + }) + + require.Len(t, out, 2) + require.Equal(t, []resource{ + {"kube", "002"}, + {"kube", "001"}, + }, out) + + // check removal + cache.Delete(Kind, "kube/002") + require.Equal(t, 3, cache.Len()) + _, ok = cache.Get(Kind, "kube/002") + require.False(t, ok) +} + +func TestOpenBounds(t *testing.T) { + t.Parallel() + + cache := New(Config[resource]{ + Indexes: map[string]func(resource) string{ + Kind: func(r resource) string { + return fmt.Sprintf("%s/%s", r.kind, r.name) + }, + Name: func(r resource) string { + return fmt.Sprintf("%s/%s", r.name, r.kind) + }, + }, + }) + + // set up some test resources + rscs := []resource{ + {"node", "001"}, + {"node", "002"}, + {"kube", "001"}, + {"kube", "002"}, + } + + for _, rsc := range rscs { + require.Equal(t, 0, cache.Put(rsc)) + } + + var out []resource + iterator := func(r resource) bool { + out = append(out, r) + return true + } + + // verify fully open ascend + cache.Ascend(Name, "", "", iterator) + require.Equal(t, []resource{ + {"kube", "001"}, + {"node", "001"}, + {"kube", "002"}, + {"node", "002"}, + }, out) + + // verify fully open descend + out = nil + cache.Descend(Name, "", "", iterator) + require.Equal(t, []resource{ + {"node", "002"}, + {"kube", "002"}, + {"node", "001"}, + {"kube", "001"}, + }, out) + + // verify open-ended ascend + out = nil + cache.Ascend(Name, "002/kube", "", iterator) + require.Equal(t, []resource{ + {"kube", "002"}, + {"node", "002"}, + }, out) + + // verify open-ended descend + out = nil + cache.Descend(Name, "001/node", "", iterator) + require.Equal(t, []resource{ + {"node", "001"}, + {"kube", "001"}, + }, out) + + // verify open-start ascend + out = nil + cache.Ascend(Name, "", "002/kube", iterator) + require.Equal(t, []resource{ + {"kube", "001"}, + {"node", "001"}, + }, out) + + // verify open-start descend + out = nil + cache.Descend(Name, "", "001/node", iterator) + require.Equal(t, []resource{ + {"node", "002"}, + {"kube", "002"}, + }, out) +} + +// TestAscendingPagination verifies expected behavior using a basic pagination setup. +func TestAscendingPagination(t *testing.T) { + const ( + totalResources = 100_000 + pageSize = 101 + ) + + t.Parallel() + + cache := New(Config[resource]{ + Indexes: map[string]func(resource) string{ + Kind: func(r resource) string { + return fmt.Sprintf("%s/%s", r.kind, r.name) + }, + Name: func(r resource) string { + return fmt.Sprintf("%s/%s", r.name, r.kind) + }, + }, + }) + + // insert a bunch of test resources + for i := 0; i < totalResources; i++ { + require.Equal(t, 0, cache.Put(resource{"node", uuid.New().String()})) + } + + // create a paginated getter that accepts optional start key and returns a non-empty next key + // if additional resources exist. + nextPage := func(start string) (page []resource, next string) { + page = make([]resource, 0, pageSize+1) + + cache.Ascend(Kind, start, "", func(r resource) bool { + page = append(page, r) + return len(page) <= pageSize + }) + + if len(page) > pageSize { + next = cache.KeyOf(Kind, page[pageSize]) + page = page[:pageSize] + } + return + } + + // consume and aggregate pages from nextPage. + var out []resource + var k string + var n int + for { + n++ + if n > totalResources { + require.FailNow(t, "too many iterations") + } + page, nk := nextPage(k) + if len(page) != pageSize { + require.Empty(t, nk) + } + out = append(out, page...) + if nk == "" { + break + } + k = nk + } + + // verify that we got the expected number of resources + require.Len(t, out, totalResources) +} + +// TestDescendingPagination verifies expected behavior using a basic pagination setup. +func TestDescendingPagination(t *testing.T) { + const ( + totalResources = 100_000 + pageSize = 101 + ) + + t.Parallel() + + cache := New(Config[resource]{ + Indexes: map[string]func(resource) string{ + Kind: func(r resource) string { + return fmt.Sprintf("%s/%s", r.kind, r.name) + }, + Name: func(r resource) string { + return fmt.Sprintf("%s/%s", r.name, r.kind) + }, + }, + }) + + // insert a bunch of test resources + for i := 0; i < totalResources; i++ { + require.Equal(t, 0, cache.Put(resource{"node", uuid.New().String()})) + } + + // create a paginated getter that accepts optional start key and returns a non-empty next key + // if additional resources exist. + nextPage := func(start string) (page []resource, next string) { + page = make([]resource, 0, pageSize+1) + + cache.Descend(Kind, start, "", func(r resource) bool { + page = append(page, r) + return len(page) <= pageSize + }) + + if len(page) > pageSize { + next = cache.KeyOf(Kind, page[pageSize]) + page = page[:pageSize] + } + + return + } + + // consume and aggregate pages from nextPage. + var out []resource + var k string + var n int + for { + n++ + if n > totalResources { + require.FailNow(t, "too many iterations") + } + page, nk := nextPage(k) + if len(page) != pageSize { + require.Empty(t, nk) + } + out = append(out, page...) + if nk == "" { + break + } + k = nk + } + + // verify that we got the expected number of resources + require.Len(t, out, totalResources) +} + +// TestOverlap verifies basic expected behavior when multiple resources map to the same +// value on an index. +func TestOverlap(t *testing.T) { + t.Parallel() + + // set up indexes s.t. resources with different kinds can collide on the name + // index and resources with different names can collide on the kind index. + cache := New(Config[resource]{ + Indexes: map[string]func(resource) string{ + Kind: func(r resource) string { + return r.kind + }, + Name: func(r resource) string { + return r.name + }, + }, + }) + + // set up test resources s.t. there is a collision on the name index + rscs := []resource{ + {"node", "001"}, + {"db", "002"}, + {"kube", "001"}, + } + + var totalEvicted int + for _, rsc := range rscs { + totalEvicted += cache.Put(rsc) + } + require.Equal(t, 1, totalEvicted) + + // expect one of the three resources that were inserted to be overwritten + require.Equal(t, 2, cache.Len()) + + // verify that the most recently inserted value for our test collision "won", overwriting the + // previous resource. + r, ok := cache.Get(Name, "001") + require.True(t, ok) + require.Equal(t, "kube", r.kind) + + // verify that the preexisting value that wasn't part of the collision is preserved. + r, ok = cache.Get(Kind, "db") + require.True(t, ok) + require.Equal(t, "002", r.name) + + // inserting a resource of a different kind, but with the same name as an existing resource, should + // cause the existing resource to be fully deleted, including in the indexes that are not being + // overwritten by the new value. + require.Equal(t, 1, cache.Put(resource{"desktop", "001"})) + _, ok = cache.Get(Kind, "kube") + require.False(t, ok) + + require.Equal(t, 2, cache.Len()) + + // inserting a resource that collides with multiple existing resources should remove all of them. + require.Equal(t, 2, cache.Put(resource{"db", "001"})) + require.Equal(t, 1, cache.Len()) +} + +// TestNextKey asserts the basic behavior of the [NextKey] helper. +func TestNextKey(t *testing.T) { + t.Parallel() + + tts := []struct { + key, out string + }{ + { + key: "nodes/", + out: "nodes0", + }, + { + key: "a", + out: "b", + }, + { + key: string([]byte{0x00, 0x00, 0x00}), + out: string([]byte{0x00, 0x00, 0x01}), + }, + { + key: string([]byte{0xff, 0xff, 0xff}), + out: string([]byte{0xff, 0xff, 0xff}), + }, + { + key: "", + out: "", + }, + } + + for _, tt := range tts { + require.Equal(t, tt.out, NextKey(tt.key), "NextKey(%q)", tt.key) + } +} + +// BenchmarkSortCache attempts to benchmark reads under moderately high concurrent load. +// +// goos: linux +// goarch: amd64 +// pkg: github.com/gravitational/teleport/lib/utils/sortcache +// cpu: Intel(R) Xeon(R) CPU @ 2.80GHz +// BenchmarkSortCache-4 12 250820820 ns/op +func BenchmarkSortCache(b *testing.B) { + const ( + concurrency = 100 + resourcesPerKind = 50_000 + ) + + // set up a basic cache configuration + cache := New(Config[resource]{ + Indexes: map[string]func(resource) string{ + Kind: func(r resource) string { + return fmt.Sprintf("%s/%s", r.kind, r.name) + }, + Name: func(r resource) string { + return fmt.Sprintf("%s/%s", r.name, r.kind) + }, + }, + }) + + // set up some test resources we can use later to + // inject writes + r1 := resource{ + kind: "node", + name: uuid.New().String(), + } + + r2 := resource{ + kind: "kube", + name: uuid.New().String(), + } + + cache.Put(r1) + cache.Put(r2) + + // seed cache with lots of additional resources to help simulate large reads + for i := 0; i < resourcesPerKind-1; i++ { + cache.Put(resource{ + kind: "node", + name: uuid.New().String(), + }) + cache.Put(resource{ + kind: "kube", + name: uuid.New().String(), + }) + } + + // set up a background process to inject concurrent writes in a fairly + // tight loop (should roughly simulate the load generated by a background + // event stream injecting updates into a replica). + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + for { + if ctx.Err() != nil { + return + } + cache.Put(r1) + cache.Put(r2) + } + }() + + // set up a bunch of background concurrent read operations to simulate load. + for i := 0; i < concurrency; i++ { + go func() { + var n int + buf := make([]resource, 0, resourcesPerKind) + for { + if ctx.Err() != nil { + return + } + buf = buf[:0] + n++ + key := "node/" + if n%2 == 0 { + key = "kube/" + } + + cache.Ascend(Kind, key, NextKey(key), func(r resource) bool { + buf = append(buf, r) + return true + }) + + if len(buf) != resourcesPerKind { + panic("benchmark is misconfigured") + } + } + }() + } + + // actual benchmark gets performed against one singular read loop. the goal here being to + // figure out what a single reader would experience when trying to pull a large block of + // values from a SortCache that is under high concurrent load. + b.ResetTimer() + + var n int + buf := make([]resource, 0, resourcesPerKind) + for i := 0; i < b.N; i++ { + buf = buf[:0] + n++ + key := "node/" + if n%2 == 0 { + key = "kube/" + } + + cache.Ascend(Kind, key, NextKey(key), func(r resource) bool { + buf = append(buf, r) + return true + }) + + if len(buf) != resourcesPerKind { + panic("benchmark is misconfigured") + } + } +}