Skip to content

Commit

Permalink
pkg/datastore: add an option to disable retries on RWTs
Browse files Browse the repository at this point in the history
  • Loading branch information
jakedt committed May 12, 2023
1 parent 08c6918 commit 83890d3
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 12 deletions.
4 changes: 3 additions & 1 deletion internal/datastore/proxy/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"unsafe"

"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/util"

"github.com/authzed/spicedb/pkg/schemadsl/compiler"
Expand Down Expand Up @@ -79,11 +80,12 @@ func (p *definitionCachingProxy) SnapshotReader(rev datastore.Revision) datastor
func (p *definitionCachingProxy) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
opts ...options.RWTOptionsOption,
) (datastore.Revision, error) {
return p.Datastore.ReadWriteTx(ctx, func(delegateRWT datastore.ReadWriteTransaction) error {
rwt := &definitionCachingRWT{delegateRWT, &sync.Map{}}
return f(rwt)
})
}, opts...)
}

const (
Expand Down
7 changes: 5 additions & 2 deletions internal/datastore/proxy/caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/authzed/spicedb/pkg/caveats"
caveattypes "github.com/authzed/spicedb/pkg/caveats/types"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/datastore/revision"
ns "github.com/authzed/spicedb/pkg/namespace"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
Expand All @@ -29,6 +30,8 @@ var (
zero = revision.NewFromDecimal(decimal.NewFromInt(0))
one = revision.NewFromDecimal(decimal.NewFromInt(1))
two = revision.NewFromDecimal(decimal.NewFromInt(2))

nilOpts []options.RWTOptionsOption
)

const (
Expand Down Expand Up @@ -209,7 +212,7 @@ func TestRWTCaching(t *testing.T) {

require := require.New(t)

dsMock.On("ReadWriteTx").Return(rwtMock, one, nil).Once()
dsMock.On("ReadWriteTx", nilOpts).Return(rwtMock, one, nil).Once()
rwtMock.On(tester.readSingleFunctionName, nsA).Return(nil, zero, nil).Once()

ctx := context.Background()
Expand Down Expand Up @@ -246,7 +249,7 @@ func TestRWTCacheWithWrites(t *testing.T) {

require := require.New(t)

dsMock.On("ReadWriteTx").Return(rwtMock, one, nil).Once()
dsMock.On("ReadWriteTx", nilOpts).Return(rwtMock, one, nil).Once()
rwtMock.On(tester.readSingleFunctionName, nsA).Return(nil, zero, tester.notFoundErr).Once()

ctx := context.Background()
Expand Down
14 changes: 12 additions & 2 deletions internal/datastore/proxy/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/datastore/test"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

Expand Down Expand Up @@ -38,8 +39,12 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.Datasto

type ctxProxy struct{ delegate datastore.Datastore }

func (p *ctxProxy) ReadWriteTx(ctx context.Context, f datastore.TxUserFunc) (datastore.Revision, error) {
return p.delegate.ReadWriteTx(ctx, f)
func (p *ctxProxy) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
opts ...options.RWTOptionsOption,
) (datastore.Revision, error) {
return p.delegate.ReadWriteTx(ctx, f, opts...)
}

func (p *ctxProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
Expand Down Expand Up @@ -85,6 +90,11 @@ func (p *ctxProxy) Unwrap() datastore.Datastore {
return p.delegate
}

// Implement the TestableDatastore interface
func (p *ctxProxy) ExampleRetryableError() error {
return p.delegate.(test.TestableDatastore).ExampleRetryableError()
}

type ctxReader struct{ delegate datastore.Reader }

func (r *ctxReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) {
Expand Down
8 changes: 6 additions & 2 deletions internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reade
return &observableReader{delegateReader}
}

func (p *observableProxy) ReadWriteTx(ctx context.Context, f datastore.TxUserFunc) (datastore.Revision, error) {
func (p *observableProxy) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
opts ...options.RWTOptionsOption,
) (datastore.Revision, error) {
return p.delegate.ReadWriteTx(ctx, func(delegateRWT datastore.ReadWriteTransaction) error {
return f(&observableRWT{&observableReader{delegateRWT}, delegateRWT})
})
}, opts...)
}

func (p *observableProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
Expand Down
3 changes: 2 additions & 1 deletion internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader
func (dm *MockDatastore) ReadWriteTx(
_ context.Context,
f datastore.TxUserFunc,
opts ...options.RWTOptionsOption,
) (datastore.Revision, error) {
args := dm.Called()
args := dm.Called(opts)
mockRWT := args.Get(0).(datastore.ReadWriteTransaction)

if err := f(mockRWT); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion internal/datastore/proxy/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
)

var errReadOnly = datastore.NewReadonlyErr()
Expand All @@ -18,6 +19,10 @@ func NewReadonlyDatastore(delegate datastore.Datastore) datastore.Datastore {
return roDatastore{Datastore: delegate}
}

func (rd roDatastore) ReadWriteTx(context.Context, datastore.TxUserFunc) (datastore.Revision, error) {
func (rd roDatastore) ReadWriteTx(
context.Context,
datastore.TxUserFunc,
...options.RWTOptionsOption,
) (datastore.Revision, error) {
return datastore.NoRevision, errReadOnly
}
3 changes: 2 additions & 1 deletion internal/testfixtures/validating.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (vd validatingDatastore) SnapshotReader(revision datastore.Revision) datast
func (vd validatingDatastore) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
opts ...options.RWTOptionsOption,
) (datastore.Revision, error) {
if f == nil {
return datastore.NoRevision, fmt.Errorf("nil delegate function")
Expand All @@ -39,7 +40,7 @@ func (vd validatingDatastore) ReadWriteTx(
return vd.Datastore.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error {
txDelegate := validatingReadWriteTransaction{validatingSnapshotReader{rwt}, rwt}
return f(txDelegate)
})
}, opts...)
}

func (vd validatingDatastore) Unwrap() datastore.Datastore {
Expand Down
2 changes: 1 addition & 1 deletion pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ type Datastore interface {

// ReadWriteTx tarts a read/write transaction, which will be committed if no error is
// returned and rolled back if an error is returned.
ReadWriteTx(context.Context, TxUserFunc) (Revision, error)
ReadWriteTx(context.Context, TxUserFunc, ...options.RWTOptionsOption) (Revision, error)

// OptimizedRevision gets a revision that will likely already be replicated
// and will likely be shared amongst many queries.
Expand Down
8 changes: 7 additions & 1 deletion pkg/datastore/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

//go:generate go run github.com/ecordell/optgen -output zz_generated.query_options.go . QueryOptions ReverseQueryOptions
//go:generate go run github.com/ecordell/optgen -output zz_generated.query_options.go . QueryOptions ReverseQueryOptions RWTOptions

// SortOrder is an enum which represents the order in which the caller would like
// the data returned.
Expand Down Expand Up @@ -47,6 +47,12 @@ type ResourceRelation struct {
Relation string
}

// RWTOptions are options that can affect the way a read-write transaction is
// executed.
type RWTOptions struct {
DisableRetries bool
}

var (
one = uint64(1)

Expand Down
33 changes: 33 additions & 0 deletions pkg/datastore/options/zz_generated.query_options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/datastore/test/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func (f DatastoreTesterFunc) New(revisionQuantization, gcInterval, gcWindow time
return f(revisionQuantization, gcInterval, gcWindow, watchBufferLength)
}

type TestableDatastore interface {
datastore.Datastore

ExampleRetryableError() error
}

// AllExceptWatch runs all generic datastore tests on a DatastoreTester, except
// those invoking the watch API.
func AllExceptWatch(t *testing.T, tester DatastoreTester) {
Expand Down Expand Up @@ -71,6 +77,8 @@ func AllExceptWatch(t *testing.T, tester DatastoreTester) {

t.Run("TestStats", func(t *testing.T) { StatsTest(t, tester) })

t.Run("TestRetries", func(t *testing.T) { RetryTest(t, tester) })

t.Run("TestCaveatNotFound", func(t *testing.T) { CaveatNotFoundTest(t, tester) })
t.Run("TestWriteReadDeleteCaveat", func(t *testing.T) { WriteReadDeleteCaveatTest(t, tester) })
t.Run("TestWriteCaveatedRelationship", func(t *testing.T) { WriteCaveatedRelationshipTest(t, tester) })
Expand Down
56 changes: 56 additions & 0 deletions pkg/datastore/test/transactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package test

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
)

func RetryTest(t *testing.T, tester DatastoreTester) {
disableRetriesOptions := []options.RWTOptionsOption{options.WithDisableRetries(true)}

testCases := []struct {
name string
returnRetryableError bool
txOptions []options.RWTOptionsOption
countAssertion func(require.TestingT, interface{}, ...interface{})
}{
{"retryable with retries", true, nil, require.Positive},
{"non-retryable with retries", false, nil, require.Zero},
{"retryable retries disabled", true, disableRetriesOptions, require.Zero},
{"non-retryable retries disabled", false, disableRetriesOptions, require.Zero},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require := require.New(t)

rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)

ds := rawDS.(TestableDatastore)

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

var attempts int
_, err = ds.ReadWriteTx(ctx, func(rwt datastore.ReadWriteTransaction) error {
attempts++

if tc.returnRetryableError {
return ds.ExampleRetryableError()
}
return errors.New("not retryable")
}, tc.txOptions...)
require.Error(err)
retries := attempts - 1
tc.countAssertion(t, retries)
})
}
}

0 comments on commit 83890d3

Please sign in to comment.