-
Notifications
You must be signed in to change notification settings - Fork 284
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
internal/services: move bulk load to experimental service
- Loading branch information
Showing
10 changed files
with
355 additions
and
204 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
package v1 | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"io" | ||
|
||
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" | ||
grpcvalidate "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator" | ||
"github.com/samber/lo" | ||
|
||
"github.com/authzed/spicedb/internal/middleware" | ||
datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" | ||
"github.com/authzed/spicedb/internal/middleware/handwrittenvalidation" | ||
"github.com/authzed/spicedb/internal/middleware/streamtimeout" | ||
"github.com/authzed/spicedb/internal/middleware/usagemetrics" | ||
"github.com/authzed/spicedb/internal/namespace" | ||
"github.com/authzed/spicedb/internal/relationships" | ||
"github.com/authzed/spicedb/internal/services/shared" | ||
options "github.com/authzed/spicedb/internal/services/v1/options" | ||
"github.com/authzed/spicedb/pkg/datastore" | ||
dsoptions "github.com/authzed/spicedb/pkg/datastore/options" | ||
core "github.com/authzed/spicedb/pkg/proto/core/v1" | ||
dispatchv1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" | ||
"github.com/authzed/spicedb/pkg/tuple" | ||
) | ||
|
||
// NewExperimentalServer creates a ExperimentalServiceServer instance. | ||
func NewExperimentalServer(opts ...options.ExperimentalServerOptionsOption) v1.ExperimentalServiceServer { | ||
config := options.NewExperimentalServerOptionsWithOptionsAndDefaults(opts...) | ||
|
||
return &experimentalServer{ | ||
WithServiceSpecificInterceptors: shared.WithServiceSpecificInterceptors{ | ||
Unary: middleware.ChainUnaryServer( | ||
grpcvalidate.UnaryServerInterceptor(true), | ||
handwrittenvalidation.UnaryServerInterceptor, | ||
usagemetrics.UnaryServerInterceptor(), | ||
), | ||
Stream: middleware.ChainStreamServer( | ||
grpcvalidate.StreamServerInterceptor(true), | ||
handwrittenvalidation.StreamServerInterceptor, | ||
usagemetrics.StreamServerInterceptor(), | ||
streamtimeout.MustStreamServerInterceptor(config.StreamReadTimeount), | ||
), | ||
}, | ||
} | ||
} | ||
|
||
type experimentalServer struct { | ||
v1.UnimplementedExperimentalServiceServer | ||
shared.WithServiceSpecificInterceptors | ||
} | ||
|
||
type bulkLoadAdapter struct { | ||
stream v1.ExperimentalService_BulkLoadRelationshipsServer | ||
referencedNamespaceMap map[string]*namespace.TypeSystem | ||
referencedCaveatMap map[string]*core.CaveatDefinition | ||
current core.RelationTuple | ||
caveat core.ContextualizedCaveat | ||
|
||
awaitingNamespaces []string | ||
awaitingCaveats []string | ||
|
||
currentBatch []*v1.Relationship | ||
numSent int | ||
err error | ||
} | ||
|
||
func (a *bulkLoadAdapter) Next(_ context.Context) (*core.RelationTuple, error) { | ||
for a.err == nil && a.numSent == len(a.currentBatch) { | ||
// Load a new batch | ||
batch, err := a.stream.Recv() | ||
if err != nil { | ||
a.err = err | ||
if errors.Is(a.err, io.EOF) { | ||
return nil, nil | ||
} | ||
return nil, a.err | ||
} | ||
|
||
a.currentBatch = batch.Relationships | ||
a.numSent = 0 | ||
|
||
a.awaitingNamespaces, a.awaitingCaveats = extractBatchNewReferencedNamespacesAndCaveats( | ||
a.currentBatch, | ||
a.referencedNamespaceMap, | ||
a.referencedCaveatMap, | ||
) | ||
} | ||
|
||
if len(a.awaitingNamespaces) > 0 || len(a.awaitingCaveats) > 0 { | ||
// Shut down the stream to give our caller a chance to fill in this information | ||
return nil, nil | ||
} | ||
|
||
a.current.Caveat = &a.caveat | ||
tuple.CopyRelationshipToRelationTuple[ | ||
*v1.ObjectReference, | ||
*v1.SubjectReference, | ||
*v1.ContextualizedCaveat, | ||
](a.currentBatch[a.numSent], &a.current) | ||
|
||
if err := relationships.ValidateOneRelationship( | ||
a.referencedNamespaceMap, | ||
a.referencedCaveatMap, | ||
&a.current, | ||
); err != nil { | ||
return nil, err | ||
} | ||
|
||
a.numSent++ | ||
return &a.current, nil | ||
} | ||
|
||
func extractBatchNewReferencedNamespacesAndCaveats( | ||
batch []*v1.Relationship, | ||
existingNamespaces map[string]*namespace.TypeSystem, | ||
existingCaveats map[string]*core.CaveatDefinition, | ||
) ([]string, []string) { | ||
newNamespaces := make(map[string]struct{}) | ||
newCaveats := make(map[string]struct{}) | ||
for _, rel := range batch { | ||
if _, ok := existingNamespaces[rel.Resource.ObjectType]; !ok { | ||
newNamespaces[rel.Resource.ObjectType] = struct{}{} | ||
} | ||
if _, ok := existingNamespaces[rel.Subject.Object.ObjectType]; !ok { | ||
newNamespaces[rel.Subject.Object.ObjectType] = struct{}{} | ||
} | ||
if rel.OptionalCaveat != nil { | ||
if _, ok := existingCaveats[rel.OptionalCaveat.CaveatName]; !ok { | ||
newCaveats[rel.OptionalCaveat.CaveatName] = struct{}{} | ||
} | ||
} | ||
} | ||
|
||
return lo.Keys(newNamespaces), lo.Keys(newCaveats) | ||
} | ||
|
||
func (es *experimentalServer) BulkLoadRelationships(stream v1.ExperimentalService_BulkLoadRelationshipsServer) error { | ||
ds := datastoremw.MustFromContext(stream.Context()) | ||
|
||
var numWritten uint64 | ||
if _, err := ds.ReadWriteTx(stream.Context(), func(rwt datastore.ReadWriteTransaction) error { | ||
loadedNamespaces := make(map[string]*namespace.TypeSystem) | ||
loadedCaveats := make(map[string]*core.CaveatDefinition) | ||
|
||
adapter := &bulkLoadAdapter{ | ||
stream: stream, | ||
referencedNamespaceMap: loadedNamespaces, | ||
referencedCaveatMap: loadedCaveats, | ||
current: core.RelationTuple{ | ||
ResourceAndRelation: &core.ObjectAndRelation{}, | ||
Subject: &core.ObjectAndRelation{}, | ||
}, | ||
caveat: core.ContextualizedCaveat{}, | ||
} | ||
|
||
var streamWritten uint64 | ||
var err error | ||
for ; adapter.err == nil && err == nil; streamWritten, err = rwt.BulkLoad(stream.Context(), adapter) { | ||
numWritten += streamWritten | ||
|
||
// The stream has terminated because we're awaiting namespace and caveat information | ||
if len(adapter.awaitingNamespaces) > 0 { | ||
nsDefs, err := rwt.LookupNamespacesWithNames(stream.Context(), adapter.awaitingNamespaces) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, nsDef := range nsDefs { | ||
nts, err := namespace.NewNamespaceTypeSystem(nsDef.Definition, namespace.ResolverForDatastoreReader(rwt)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
loadedNamespaces[nsDef.Definition.Name] = nts | ||
} | ||
adapter.awaitingNamespaces = nil | ||
} | ||
|
||
if len(adapter.awaitingCaveats) > 0 { | ||
caveats, err := rwt.LookupCaveatsWithNames(stream.Context(), adapter.awaitingCaveats) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, caveat := range caveats { | ||
loadedCaveats[caveat.Definition.Name] = caveat.Definition | ||
} | ||
adapter.awaitingCaveats = nil | ||
} | ||
} | ||
numWritten += streamWritten | ||
|
||
return err | ||
}, dsoptions.WithDisableRetries(true)); err != nil { | ||
return shared.RewriteError(stream.Context(), err) | ||
} | ||
|
||
usagemetrics.SetInContext(stream.Context(), &dispatchv1.ResponseMeta{ | ||
// One request for the whole load | ||
DispatchCount: 1, | ||
}) | ||
|
||
return stream.SendAndClose(&v1.BulkLoadRelationshipsResponse{ | ||
NumLoaded: numWritten, | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package options | ||
|
||
import "time" | ||
|
||
//go:generate go run github.com/ecordell/optgen -output zz_generated.query_options.go . ExperimentalServerOptions | ||
|
||
type ExperimentalServerOptions struct { | ||
StreamReadTimeount time.Duration `debugmap:"visible" default:"600s"` | ||
} |
Oops, something went wrong.