Skip to content

Commit

Permalink
Introduce a dedicated backend key type (#45706)
Browse files Browse the repository at this point in the history
For the moment the type is nothing more than an alias for []byte.
This will allow for transitioning enterprise to use the new type
without breaking the builds. Once enterprise is updated and all
assumptions about a Key being a []byte are removed the alias will
be dropped in favor of making a Key its own type.
  • Loading branch information
rosstimothy committed Aug 23, 2024
1 parent f3c3fbe commit b5a7b05
Show file tree
Hide file tree
Showing 44 changed files with 230 additions and 214 deletions.
2 changes: 1 addition & 1 deletion lib/auth/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type stateBackend interface {
// exists, updates it otherwise)
Put(ctx context.Context, i backend.Item) (*backend.Lease, error)
// Get returns a single item or not found error
Get(ctx context.Context, key []byte) (*backend.Item, error)
Get(ctx context.Context, key backend.Key) (*backend.Item, error)
}

// ProcessStorage is a backend for local process state,
Expand Down
38 changes: 19 additions & 19 deletions lib/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ type Backend interface {
Update(ctx context.Context, i Item) (*Lease, error)

// Get returns a single item or not found error
Get(ctx context.Context, key []byte) (*Item, error)
Get(ctx context.Context, key Key) (*Item, error)

// GetRange returns query range
GetRange(ctx context.Context, startKey []byte, endKey []byte, limit int) (*GetResult, error)
GetRange(ctx context.Context, startKey, endKey Key, limit int) (*GetResult, error)

// Delete deletes item by key, returns NotFound error
// if item does not exist
Delete(ctx context.Context, key []byte) error
Delete(ctx context.Context, key Key) error

// DeleteRange deletes range of items with keys between startKey and endKey
DeleteRange(ctx context.Context, startKey, endKey []byte) error
DeleteRange(ctx context.Context, startKey, endKey Key) error

// KeepAlive keeps object from expiring, updates lease on the existing object,
// expires contains the new expiry to set on the lease,
Expand All @@ -93,7 +93,7 @@ type Backend interface {
}

// IterateRange is a helper for stepping over a range
func IterateRange(ctx context.Context, bk Backend, startKey []byte, endKey []byte, limit int, fn func([]Item) (stop bool, err error)) error {
func IterateRange(ctx context.Context, bk Backend, startKey, endKey Key, limit int, fn func([]Item) (stop bool, err error)) error {
if limit == 0 || limit > 10_000 {
limit = 10_000
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func IterateRange(ctx context.Context, bk Backend, startKey []byte, endKey []byt
//
// 2. allow individual backends to expose custom streaming methods s.t. the most performant
// impl for a given backend may be used.
func StreamRange(ctx context.Context, bk Backend, startKey, endKey []byte, pageSize int) stream.Stream[Item] {
func StreamRange(ctx context.Context, bk Backend, startKey, endKey Key, pageSize int) stream.Stream[Item] {
return stream.PageFunc[Item](func() ([]Item, error) {
if startKey == nil {
return nil, io.EOF
Expand Down Expand Up @@ -167,7 +167,7 @@ type Batch interface {
// err = backend.KeepAlive(ctx, lease, expires)
type Lease struct {
// Key is an object representing lease
Key []byte
Key Key
// ID is a lease ID, could be empty
// Deprecated: use Revision instead
ID int64
Expand All @@ -187,7 +187,7 @@ type Watch struct {
Name string
// Prefixes specifies prefixes to watch,
// passed to the backend implementation
Prefixes [][]byte
Prefixes []Key
// QueueSize is an optional queue size
QueueSize int
// MetricComponent if set will start reporting
Expand Down Expand Up @@ -231,7 +231,7 @@ type Event struct {
// Item is a key value item
type Item struct {
// Key is a key of the key value item
Key []byte
Key Key
// Value is a value of the key value item
Value []byte
// Expires is an optional record expiry time
Expand Down Expand Up @@ -287,7 +287,7 @@ const NoLimit = 0
// nextKey returns the next possible key.
// If used with a key prefix, this will return
// the end of the range for that key prefix.
func nextKey(key []byte) []byte {
func nextKey(key Key) Key {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
Expand All @@ -301,10 +301,10 @@ func nextKey(key []byte) []byte {
return noEnd
}

var noEnd = []byte{0}
var noEnd = Key{0}

// RangeEnd returns end of the range for given key.
func RangeEnd(key []byte) []byte {
func RangeEnd(key Key) Key {
return nextKey(key)
}

Expand All @@ -325,7 +325,7 @@ type KeyedItem interface {
// have the HostID part.
func NextPaginationKey(ki KeyedItem) string {
key := GetPaginationKey(ki)
return string(nextKey([]byte(key)))
return string(nextKey(Key(key)))
}

// GetPaginationKey returns the pagination key given item.
Expand All @@ -341,13 +341,13 @@ func GetPaginationKey(ki KeyedItem) string {

// MaskKeyName masks the given key name.
// e.g "123456789" -> "******789"
func MaskKeyName(keyName string) []byte {
func MaskKeyName(keyName string) string {
maskedBytes := []byte(keyName)
hiddenBefore := int(0.75 * float64(len(keyName)))
for i := 0; i < hiddenBefore; i++ {
maskedBytes[i] = '*'
}
return maskedBytes
return string(maskedBytes)
}

// Items is a sortable list of backend items
Expand Down Expand Up @@ -421,18 +421,18 @@ const Separator = '/'

// NewKey joins parts into path separated by Separator,
// makes sure path always starts with Separator ("/")
func NewKey(parts ...string) []byte {
func NewKey(parts ...string) Key {
return internalKey("", parts...)
}

// ExactKey is like Key, except a Separator is appended to the result
// path of Key. This is to ensure range matching of a path will only
// math child paths and not other paths that have the resulting path
// as a prefix.
func ExactKey(parts ...string) []byte {
func ExactKey(parts ...string) Key {
return append(NewKey(parts...), Separator)
}

func internalKey(internalPrefix string, parts ...string) []byte {
return []byte(strings.Join(append([]string{internalPrefix}, parts...), string(Separator)))
func internalKey(internalPrefix string, parts ...string) Key {
return Key(strings.Join(append([]string{internalPrefix}, parts...), string(Separator)))
}
2 changes: 1 addition & 1 deletion lib/backend/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (c *CircularBuffer) fanOutEvent(r Event) {
}
}

func removeRedundantPrefixes(prefixes [][]byte) [][]byte {
func removeRedundantPrefixes(prefixes []Key) []Key {
if len(prefixes) == 0 {
return prefixes
}
Expand Down
65 changes: 33 additions & 32 deletions lib/backend/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package backend

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -48,17 +49,17 @@ func TestWatcherSimple(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: 1}})
b.Emit(Event{Item: Item{Key: Key("/1")}})

select {
case e := <-w.Events():
require.Equal(t, e.Item.ID, int64(1))
require.Equal(t, Key("/1"), e.Item.Key)
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout waiting for event.")
}

b.Close()
b.Emit(Event{Item: Item{ID: 2}})
b.Emit(Event{Item: Item{Key: Key("/2")}})

select {
case <-w.Done():
Expand Down Expand Up @@ -101,12 +102,12 @@ func TestWatcherCapacity(t *testing.T) {
// emit and then consume 10 events. this is much larger than our queue size,
// but should succeed since we consume within our grace period.
for i := 0; i < 10; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(i + 1)}})
b.Emit(Event{Item: Item{Key: Key(fmt.Sprintf("/%d", i+1))}})
}
for i := 0; i < 10; i++ {
select {
case e := <-w.Events():
require.Equal(t, e.Item.ID, int64(i+1))
require.Equal(t, fmt.Sprintf("/%d", i+1), string(e.Item.Key))
default:
t.Fatalf("Expected events to be immediately available")
}
Expand All @@ -116,7 +117,7 @@ func TestWatcherCapacity(t *testing.T) {
clock.Advance(gracePeriod + time.Second)

// emit another event, which will cause buffer to reevaluate the grace period.
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(11)}})
b.Emit(Event{Item: Item{Key: Key("/11")}})

// ensure that buffer did not close watcher, since previously created backlog
// was drained within grace period.
Expand All @@ -128,13 +129,13 @@ func TestWatcherCapacity(t *testing.T) {

// create backlog again, and this time advance past grace period without draining it.
for i := 0; i < 10; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(i + 12)}})
b.Emit(Event{Item: Item{Key: Key(fmt.Sprintf("/%d", i+12))}})
}
clock.Advance(gracePeriod + time.Second)

// emit another event, which will cause buffer to realize that watcher is past
// its grace period.
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(22)}})
b.Emit(Event{Item: Item{Key: Key("/22")}})

select {
case <-w.Done():
Expand Down Expand Up @@ -174,7 +175,7 @@ func TestWatcherCreationGracePeriod(t *testing.T) {

// emit enough events to create a backlog
for i := 0; i < queueSize*2; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}}})
b.Emit(Event{Item: Item{Key: Key{Separator}}})
}

select {
Expand All @@ -189,7 +190,7 @@ func TestWatcherCreationGracePeriod(t *testing.T) {
// advance well past the backlog grace period, but not past the creation grace period
clock.Advance(backlogGracePeriod * 2)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})
b.Emit(Event{Item: Item{Key: Key{Separator}}})

select {
case <-w.Done():
Expand All @@ -200,7 +201,7 @@ func TestWatcherCreationGracePeriod(t *testing.T) {
// advance well past creation grace period
clock.Advance(creationGracePeriod)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})
b.Emit(Event{Item: Item{Key: Key{Separator}}})
select {
case <-w.Done():
default:
Expand Down Expand Up @@ -236,29 +237,29 @@ func TestWatcherClose(t *testing.T) {
// TestRemoveRedundantPrefixes removes redundant prefixes
func TestRemoveRedundantPrefixes(t *testing.T) {
type tc struct {
in [][]byte
out [][]byte
in []Key
out []Key
}
tcs := []tc{
{
in: [][]byte{},
out: [][]byte{},
in: []Key{},
out: []Key{},
},
{
in: [][]byte{[]byte("/a")},
out: [][]byte{[]byte("/a")},
in: []Key{Key("/a")},
out: []Key{Key("/a")},
},
{
in: [][]byte{[]byte("/a"), []byte("/")},
out: [][]byte{[]byte("/")},
in: []Key{Key("/a"), Key("/")},
out: []Key{Key("/")},
},
{
in: [][]byte{[]byte("/b"), []byte("/a")},
out: [][]byte{[]byte("/a"), []byte("/b")},
in: []Key{Key("/b"), Key("/a")},
out: []Key{Key("/a"), Key("/b")},
},
{
in: [][]byte{[]byte("/a/b"), []byte("/a"), []byte("/a/b/c"), []byte("/d")},
out: [][]byte{[]byte("/a"), []byte("/d")},
in: []Key{Key("/a/b"), Key("/a"), Key("/a/b/c"), Key("/d")},
out: []Key{Key("/a"), Key("/d")},
},
}
for _, tc := range tcs {
Expand All @@ -276,7 +277,7 @@ func TestWatcherMulti(t *testing.T) {
defer b.Close()
b.SetInit()

w, err := b.NewWatcher(ctx, Watch{Prefixes: [][]byte{[]byte("/a"), []byte("/a/b")}})
w, err := b.NewWatcher(ctx, Watch{Prefixes: []Key{Key("/a"), Key("/a/b")}})
require.NoError(t, err)
defer w.Close()

Expand All @@ -287,11 +288,11 @@ func TestWatcherMulti(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte("/a/b/c"), ID: 1}})
b.Emit(Event{Item: Item{Key: Key("/a/b/c")}})

select {
case e := <-w.Events():
require.Equal(t, e.Item.ID, int64(1))
require.Equal(t, Key("/a/b/c"), e.Item.Key)
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout waiting for event.")
}
Expand Down Expand Up @@ -320,7 +321,7 @@ func TestWatcherReset(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: 1}})
b.Emit(Event{Item: Item{Key: Key("/1")}})
b.Clear()

// make sure watcher has been closed
Expand All @@ -341,11 +342,11 @@ func TestWatcherReset(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: 2}})
b.Emit(Event{Item: Item{Key: Key("/2")}})

select {
case e := <-w2.Events():
require.Equal(t, e.Item.ID, int64(2))
require.Equal(t, Key("/2"), e.Item.Key)
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout waiting for event.")
}
Expand All @@ -356,10 +357,10 @@ func TestWatcherTree(t *testing.T) {
wt := newWatcherTree()
require.Equal(t, wt.rm(nil), false)

w1 := &BufferWatcher{Watch: Watch{Prefixes: [][]byte{[]byte("/a"), []byte("/a/a1"), []byte("/c")}}}
require.Equal(t, wt.rm(w1), false)
w1 := &BufferWatcher{Watch: Watch{Prefixes: []Key{Key("/a"), Key("/a/a1"), Key("/c")}}}
require.False(t, wt.rm(w1))

w2 := &BufferWatcher{Watch: Watch{Prefixes: [][]byte{[]byte("/a")}}}
w2 := &BufferWatcher{Watch: Watch{Prefixes: []Key{Key("/a")}}}

wt.add(w1)
wt.add(w2)
Expand Down
Loading

0 comments on commit b5a7b05

Please sign in to comment.