Skip to content

Commit

Permalink
Introduce a dedicated backend key type (#45706) (#45712)
Browse files Browse the repository at this point in the history
For the moment the type is nothing more than an alias for []byte.
This will allow for transitioning enterprise to use the new type
without breaking the builds. Once enterprise is updated and all
assumptions about a Key being a []byte are removed the alias will
be dropped in favor of making a Key its own type.
  • Loading branch information
rosstimothy authored Aug 28, 2024
1 parent e00453e commit b6f5914
Show file tree
Hide file tree
Showing 46 changed files with 253 additions and 237 deletions.
2 changes: 1 addition & 1 deletion lib/auth/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type stateBackend interface {
// exists, updates it otherwise)
Put(ctx context.Context, i backend.Item) (*backend.Lease, error)
// Get returns a single item or not found error
Get(ctx context.Context, key []byte) (*backend.Item, error)
Get(ctx context.Context, key backend.Key) (*backend.Item, error)
}

// ProcessStorage is a backend for local process state,
Expand Down
2 changes: 1 addition & 1 deletion lib/backend/atomicwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func Delete() Action {
type ConditionalAction struct {
// Key is the key against which the associated condition and action are to
// be applied.
Key []byte
Key Key

// Condition must be one of Exists|NotExists|Revision(<revision>)|Whatever
Condition Condition
Expand Down
40 changes: 20 additions & 20 deletions lib/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ type Backend interface {
Update(ctx context.Context, i Item) (*Lease, error)

// Get returns a single item or not found error
Get(ctx context.Context, key []byte) (*Item, error)
Get(ctx context.Context, key Key) (*Item, error)

// GetRange returns query range
GetRange(ctx context.Context, startKey []byte, endKey []byte, limit int) (*GetResult, error)
GetRange(ctx context.Context, startKey, endKey Key, limit int) (*GetResult, error)

// Delete deletes item by key, returns NotFound error
// if item does not exist
Delete(ctx context.Context, key []byte) error
Delete(ctx context.Context, key Key) error

// DeleteRange deletes range of items with keys between startKey and endKey
DeleteRange(ctx context.Context, startKey, endKey []byte) error
DeleteRange(ctx context.Context, startKey, endKey Key) error

// KeepAlive keeps object from expiring, updates lease on the existing object,
// expires contains the new expiry to set on the lease,
Expand All @@ -90,7 +90,7 @@ type Backend interface {
ConditionalUpdate(ctx context.Context, i Item) (*Lease, error)

// ConditionalDelete deletes the item by key if the revision matches the stored revision.
ConditionalDelete(ctx context.Context, key []byte, revision string) error
ConditionalDelete(ctx context.Context, key Key, revision string) error

// AtomicWrite executes a batch of conditional actions atomically s.t. all actions happen if all
// conditions are met, but no actions happen if any condition fails to hold. If one or more conditions
Expand Down Expand Up @@ -130,7 +130,7 @@ func New(ctx context.Context, backend string, params Params) (Backend, error) {
}

// IterateRange is a helper for stepping over a range
func IterateRange(ctx context.Context, bk Backend, startKey []byte, endKey []byte, limit int, fn func([]Item) (stop bool, err error)) error {
func IterateRange(ctx context.Context, bk Backend, startKey, endKey Key, limit int, fn func([]Item) (stop bool, err error)) error {
if limit == 0 || limit > 10_000 {
limit = 10_000
}
Expand Down Expand Up @@ -167,7 +167,7 @@ func IterateRange(ctx context.Context, bk Backend, startKey []byte, endKey []byt
//
// 2. allow individual backends to expose custom streaming methods s.t. the most performant
// impl for a given backend may be used.
func StreamRange(ctx context.Context, bk Backend, startKey, endKey []byte, pageSize int) stream.Stream[Item] {
func StreamRange(ctx context.Context, bk Backend, startKey, endKey Key, pageSize int) stream.Stream[Item] {
return stream.PageFunc[Item](func() ([]Item, error) {
if startKey == nil {
return nil, io.EOF
Expand Down Expand Up @@ -196,7 +196,7 @@ func StreamRange(ctx context.Context, bk Backend, startKey, endKey []byte, pageS
// err = backend.KeepAlive(ctx, lease, expires)
type Lease struct {
// Key is the resource identifier.
Key []byte
Key Key
// ID is a lease ID, could be empty.
// Deprecated: use Revision instead
ID int64
Expand All @@ -211,7 +211,7 @@ type Watch struct {
Name string
// Prefixes specifies prefixes to watch,
// passed to the backend implementation
Prefixes [][]byte
Prefixes []Key
// QueueSize is an optional queue size
QueueSize int
// MetricComponent if set will start reporting
Expand Down Expand Up @@ -255,7 +255,7 @@ type Event struct {
// Item is a key value item
type Item struct {
// Key is a key of the key value item
Key []byte
Key Key
// Value is a value of the key value item
Value []byte
// Expires is an optional record expiry time
Expand Down Expand Up @@ -308,7 +308,7 @@ const NoLimit = 0
// nextKey returns the next possible key.
// If used with a key prefix, this will return
// the end of the range for that key prefix.
func nextKey(key []byte) []byte {
func nextKey(key Key) Key {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
Expand All @@ -322,10 +322,10 @@ func nextKey(key []byte) []byte {
return noEnd
}

var noEnd = []byte{0}
var noEnd = Key{0}

// RangeEnd returns end of the range for given key.
func RangeEnd(key []byte) []byte {
func RangeEnd(key Key) Key {
return nextKey(key)
}

Expand All @@ -346,7 +346,7 @@ type KeyedItem interface {
// have the HostID part.
func NextPaginationKey(ki KeyedItem) string {
key := GetPaginationKey(ki)
return string(nextKey([]byte(key)))
return string(nextKey(Key(key)))
}

// GetPaginationKey returns the pagination key given item.
Expand All @@ -362,13 +362,13 @@ func GetPaginationKey(ki KeyedItem) string {

// MaskKeyName masks the given key name.
// e.g "123456789" -> "******789"
func MaskKeyName(keyName string) []byte {
func MaskKeyName(keyName string) string {
maskedBytes := []byte(keyName)
hiddenBefore := int(0.75 * float64(len(keyName)))
for i := 0; i < hiddenBefore; i++ {
maskedBytes[i] = '*'
}
return maskedBytes
return string(maskedBytes)
}

// Items is a sortable list of backend items
Expand Down Expand Up @@ -442,20 +442,20 @@ const Separator = '/'

// NewKey joins parts into path separated by Separator,
// makes sure path always starts with Separator ("/")
func NewKey(parts ...string) []byte {
func NewKey(parts ...string) Key {
return internalKey("", parts...)
}

// ExactKey is like Key, except a Separator is appended to the result
// path of Key. This is to ensure range matching of a path will only
// math child paths and not other paths that have the resulting path
// as a prefix.
func ExactKey(parts ...string) []byte {
func ExactKey(parts ...string) Key {
return append(NewKey(parts...), Separator)
}

func internalKey(internalPrefix string, parts ...string) []byte {
return []byte(strings.Join(append([]string{internalPrefix}, parts...), string(Separator)))
func internalKey(internalPrefix string, parts ...string) Key {
return Key(strings.Join(append([]string{internalPrefix}, parts...), string(Separator)))
}

// CreateRevision generates a new identifier to be used
Expand Down
2 changes: 1 addition & 1 deletion lib/backend/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *CircularBuffer) fanOutEvent(r Event) {
}

// RemoveRedundantPrefixes will remove redundant prefixes from the given prefix list.
func RemoveRedundantPrefixes(prefixes [][]byte) [][]byte {
func RemoveRedundantPrefixes(prefixes []Key) []Key {
if len(prefixes) == 0 {
return prefixes
}
Expand Down
63 changes: 32 additions & 31 deletions lib/backend/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package backend

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -50,17 +51,17 @@ func TestWatcherSimple(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: 1}})
b.Emit(Event{Item: Item{Key: Key("/1")}})

select {
case e := <-w.Events():
require.Equal(t, int64(1), e.Item.ID)
require.Equal(t, Key("/1"), e.Item.Key)
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout waiting for event.")
}

b.Close()
b.Emit(Event{Item: Item{ID: 2}})
b.Emit(Event{Item: Item{Key: Key("/2")}})

select {
case <-w.Done():
Expand Down Expand Up @@ -103,12 +104,12 @@ func TestWatcherCapacity(t *testing.T) {
// emit and then consume 10 events. this is much larger than our queue size,
// but should succeed since we consume within our grace period.
for i := 0; i < 10; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(i + 1)}})
b.Emit(Event{Item: Item{Key: Key(fmt.Sprintf("/%d", i+1))}})
}
for i := 0; i < 10; i++ {
select {
case e := <-w.Events():
require.Equal(t, e.Item.ID, int64(i+1))
require.Equal(t, fmt.Sprintf("/%d", i+1), string(e.Item.Key))
default:
t.Fatalf("Expected events to be immediately available")
}
Expand All @@ -118,7 +119,7 @@ func TestWatcherCapacity(t *testing.T) {
clock.Advance(gracePeriod + time.Second)

// emit another event, which will cause buffer to reevaluate the grace period.
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(11)}})
b.Emit(Event{Item: Item{Key: Key("/11")}})

// ensure that buffer did not close watcher, since previously created backlog
// was drained within grace period.
Expand All @@ -130,13 +131,13 @@ func TestWatcherCapacity(t *testing.T) {

// create backlog again, and this time advance past grace period without draining it.
for i := 0; i < 10; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(i + 12)}})
b.Emit(Event{Item: Item{Key: Key(fmt.Sprintf("/%d", i+12))}})
}
clock.Advance(gracePeriod + time.Second)

// emit another event, which will cause buffer to realize that watcher is past
// its grace period.
b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: int64(22)}})
b.Emit(Event{Item: Item{Key: Key("/22")}})

select {
case <-w.Done():
Expand Down Expand Up @@ -176,7 +177,7 @@ func TestWatcherCreationGracePeriod(t *testing.T) {

// emit enough events to create a backlog
for i := 0; i < queueSize*2; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}}})
b.Emit(Event{Item: Item{Key: Key{Separator}}})
}

select {
Expand All @@ -191,7 +192,7 @@ func TestWatcherCreationGracePeriod(t *testing.T) {
// advance well past the backlog grace period, but not past the creation grace period
clock.Advance(backlogGracePeriod * 2)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})
b.Emit(Event{Item: Item{Key: Key{Separator}}})

select {
case <-w.Done():
Expand All @@ -202,7 +203,7 @@ func TestWatcherCreationGracePeriod(t *testing.T) {
// advance well past creation grace period
clock.Advance(creationGracePeriod)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})
b.Emit(Event{Item: Item{Key: Key{Separator}}})
select {
case <-w.Done():
default:
Expand Down Expand Up @@ -238,29 +239,29 @@ func TestWatcherClose(t *testing.T) {
// TestRemoveRedundantPrefixes removes redundant prefixes
func TestRemoveRedundantPrefixes(t *testing.T) {
type tc struct {
in [][]byte
out [][]byte
in []Key
out []Key
}
tcs := []tc{
{
in: [][]byte{},
out: [][]byte{},
in: []Key{},
out: []Key{},
},
{
in: [][]byte{[]byte("/a")},
out: [][]byte{[]byte("/a")},
in: []Key{Key("/a")},
out: []Key{Key("/a")},
},
{
in: [][]byte{[]byte("/a"), []byte("/")},
out: [][]byte{[]byte("/")},
in: []Key{Key("/a"), Key("/")},
out: []Key{Key("/")},
},
{
in: [][]byte{[]byte("/b"), []byte("/a")},
out: [][]byte{[]byte("/a"), []byte("/b")},
in: []Key{Key("/b"), Key("/a")},
out: []Key{Key("/a"), Key("/b")},
},
{
in: [][]byte{[]byte("/a/b"), []byte("/a"), []byte("/a/b/c"), []byte("/d")},
out: [][]byte{[]byte("/a"), []byte("/d")},
in: []Key{Key("/a/b"), Key("/a"), Key("/a/b/c"), Key("/d")},
out: []Key{Key("/a"), Key("/d")},
},
}
for _, tc := range tcs {
Expand All @@ -278,7 +279,7 @@ func TestWatcherMulti(t *testing.T) {
defer b.Close()
b.SetInit()

w, err := b.NewWatcher(ctx, Watch{Prefixes: [][]byte{[]byte("/a"), []byte("/a/b")}})
w, err := b.NewWatcher(ctx, Watch{Prefixes: []Key{Key("/a"), Key("/a/b")}})
require.NoError(t, err)
defer w.Close()

Expand All @@ -289,11 +290,11 @@ func TestWatcherMulti(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte("/a/b/c"), ID: 1}})
b.Emit(Event{Item: Item{Key: Key("/a/b/c")}})

select {
case e := <-w.Events():
require.Equal(t, int64(1), e.Item.ID)
require.Equal(t, Key("/a/b/c"), e.Item.Key)
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout waiting for event.")
}
Expand Down Expand Up @@ -321,7 +322,7 @@ func TestWatcherReset(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: 1}})
b.Emit(Event{Item: Item{Key: Key("/1")}})
b.Clear()

// make sure watcher has been closed
Expand All @@ -342,11 +343,11 @@ func TestWatcherReset(t *testing.T) {
t.Fatalf("Timeout waiting for event.")
}

b.Emit(Event{Item: Item{Key: []byte{Separator}, ID: 2}})
b.Emit(Event{Item: Item{Key: Key("/2")}})

select {
case e := <-w2.Events():
require.Equal(t, int64(2), e.Item.ID)
require.Equal(t, Key("/2"), e.Item.Key)
case <-time.After(100 * time.Millisecond):
t.Fatalf("Timeout waiting for event.")
}
Expand All @@ -357,10 +358,10 @@ func TestWatcherTree(t *testing.T) {
wt := newWatcherTree()
require.False(t, wt.rm(nil))

w1 := &BufferWatcher{Watch: Watch{Prefixes: [][]byte{[]byte("/a"), []byte("/a/a1"), []byte("/c")}}}
w1 := &BufferWatcher{Watch: Watch{Prefixes: []Key{Key("/a"), Key("/a/a1"), Key("/c")}}}
require.False(t, wt.rm(w1))

w2 := &BufferWatcher{Watch: Watch{Prefixes: [][]byte{[]byte("/a")}}}
w2 := &BufferWatcher{Watch: Watch{Prefixes: []Key{Key("/a")}}}

wt.add(w1)
wt.add(w2)
Expand Down
Loading

0 comments on commit b6f5914

Please sign in to comment.