Skip to content

Commit

Permalink
Merge pull request #1227 from authzed/configurable-replication-factor
Browse files Browse the repository at this point in the history
introduces configurable dispatch hashring replication factor
  • Loading branch information
ecordell authored Mar 29, 2023
2 parents 588f2c0 + 34a4202 commit 237072e
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 31 deletions.
15 changes: 0 additions & 15 deletions cmd/spicedb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,23 @@ import (
"errors"
"os"

"github.com/cespare/xxhash/v2"
"github.com/rs/zerolog"
"github.com/sercand/kuberesolver/v3"
"github.com/spf13/cobra"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/xds"

log "github.com/authzed/spicedb/internal/logging"
consistentbalancer "github.com/authzed/spicedb/pkg/balancer"
"github.com/authzed/spicedb/pkg/cmd"
cmdutil "github.com/authzed/spicedb/pkg/cmd/server"
"github.com/authzed/spicedb/pkg/cmd/testserver"
)

const (
hashringReplicationFactor = 20
backendsPerKey = 1
)

var errParsing = errors.New("parsing error")

func main() {
// Enable Kubernetes gRPC resolver
kuberesolver.RegisterInCluster()

// Enable consistent hashring gRPC load balancer
balancer.Register(consistentbalancer.NewConsistentHashringBuilder(
xxhash.Sum64,
hashringReplicationFactor,
backendsPerKey,
))

log.SetGlobalLogger(zerolog.New(os.Stdout))

// Create a root command
Expand Down
2 changes: 1 addition & 1 deletion internal/testserver/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestClusterWithDispatchAndCacheConfig(t testing.TB, size uint, ds datastore
combineddispatch.UpstreamAddr("test://" + prefix),
combineddispatch.PrometheusSubsystem(fmt.Sprintf("%s_%d_client_dispatch", prefix, i)),
combineddispatch.GrpcDialOpts(
grpc.WithDefaultServiceConfig(hashbalancer.BalancerServiceConfig),
grpc.WithDefaultServiceConfig(hashbalancer.ServiceConfigForBalancerName(hashbalancer.NameForReplicationFactor(100))),
grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
// it's possible grpc tries to dial before we have set the
// buffconn dialers, we have to return a "TempError" so that
Expand Down
22 changes: 17 additions & 5 deletions pkg/balancer/hashring.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package balancer

import (
"fmt"
"math/rand"
"sync"
"time"
Expand All @@ -15,12 +16,12 @@ import (
type ctxKey string

const (
// BalancerName is the name of consistent-hashring balancer.
BalancerName = "consistent-hashring"
// name is the name of consistent-hashring balancer.
name = "consistent-hashring"

// BalancerServiceConfig is a service config that sets the default balancer
// serviceConfig is a service config that sets the default balancer
// to the consistent-hashring balancer
BalancerServiceConfig = `{"loadBalancingPolicy":"consistent-hashring"}`
serviceConfig = `{"loadBalancingPolicy":"%s"}`

// CtxKey is the key for the grpc request's context.Context which points to
// the key to hash for the request. The value it points to must be []byte
Expand All @@ -35,12 +36,23 @@ var logger = grpclog.Component("consistenthashring")
// `balancer.Register(consistent.NewConsistentHashringBuilder(hasher, factor, spread))`
func NewConsistentHashringBuilder(hasher consistent.HasherFunc, replicationFactor uint16, spread uint8) balancer.Builder {
return base.NewBalancerBuilder(
BalancerName,
NameForReplicationFactor(replicationFactor),
&consistentHashringPickerBuilder{hasher: hasher, replicationFactor: replicationFactor, spread: spread},
base.Config{HealthCheck: true},
)
}

// NameForReplicationFactor returns the name of the balancer for a given replication factor
func NameForReplicationFactor(replicationFactor uint16) string {
return fmt.Sprintf(name+"-rf-%d", replicationFactor)
}

// ServiceConfigForBalancerName provides the gRPC service configuration string for
// a hashring balancer with a specific replication factor by its name
func ServiceConfigForBalancerName(balancerName string) string {
return fmt.Sprintf(serviceConfig, balancerName)
}

type subConnMember struct {
balancer.SubConn
key string
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,12 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error {
cmd.Flags().StringVar(&config.DispatchUpstreamAddr, "dispatch-upstream-addr", "", "upstream grpc address to dispatch to")
cmd.Flags().StringVar(&config.DispatchUpstreamCAPath, "dispatch-upstream-ca-path", "", "local path to the TLS CA used when connecting to the dispatch cluster")
cmd.Flags().DurationVar(&config.DispatchUpstreamTimeout, "dispatch-upstream-timeout", 60*time.Second, "maximum duration of a dispatch call an upstream cluster before it times out")

cmd.Flags().Uint16Var(&config.GlobalDispatchConcurrencyLimit, "dispatch-concurrency-limit", 50, "maximum number of parallel goroutines to create for each request or subrequest")

cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.Check, "dispatch-check-permission-concurrency-limit", 0, "maximum number of parallel goroutines to create for each check request or subrequest. defaults to --dispatch-concurrency-limit")
cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.LookupResources, "dispatch-lookup-resources-concurrency-limit", 0, "maximum number of parallel goroutines to create for each lookup resources request or subrequest. defaults to --dispatch-concurrency-limit")
cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.LookupSubjects, "dispatch-lookup-subjects-concurrency-limit", 0, "maximum number of parallel goroutines to create for each lookup subjects request or subrequest. defaults to --dispatch-concurrency-limit")
cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.ReachableResources, "dispatch-reachable-resources-concurrency-limit", 0, "maximum number of parallel goroutines to create for each reachable resources request or subrequest. defaults to --dispatch-concurrency-limit")
cmd.Flags().Uint16Var(&config.DispatchHashringReplicationFactor, "dispatch-hashring-replication-factor", 100, "set the replication factor of the consistent hasher used for the dispatcher")

// Flags for configuring API behavior
cmd.Flags().BoolVar(&config.DisableV1SchemaAPI, "disable-v1-schema-api", false, "disables the V1 schema API")
Expand Down
36 changes: 29 additions & 7 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/authzed/grpcutil"
"github.com/cespare/xxhash/v2"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/hashicorp/go-multierror"
Expand All @@ -19,6 +20,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
grpcbalancer "google.golang.org/grpc/balancer"

"github.com/authzed/spicedb/internal/auth"
"github.com/authzed/spicedb/internal/dashboard"
Expand All @@ -43,11 +45,12 @@ import (
//go:generate go run github.com/ecordell/optgen -output zz_generated.options.go . Config
type Config struct {
// API config
GRPCServer util.GRPCServerConfig
GRPCAuthFunc grpc_auth.AuthFunc
PresharedKey []string
ShutdownGracePeriod time.Duration
DisableVersionResponse bool
GRPCServer util.GRPCServerConfig
GRPCAuthFunc grpc_auth.AuthFunc
DispatchHashringReplicationFactor uint16
PresharedKey []string
ShutdownGracePeriod time.Duration
DisableVersionResponse bool

// GRPC Gateway config
HTTPGateway util.HTTPServerConfig
Expand Down Expand Up @@ -108,6 +111,10 @@ type Config struct {
TelemetryInterval time.Duration
}

const defaultBackendsPerKey = 1

var balancerRegistryMutex = sync.Mutex{}

type closeableStack struct {
closers []func() error
}
Expand Down Expand Up @@ -160,6 +167,19 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {
}
}()

balancerRegistryMutex.Lock()
dispatchBalancerName := balancer.NameForReplicationFactor(c.DispatchHashringReplicationFactor)
if grpcbalancer.Get(dispatchBalancerName) == nil {
// Enable consistent hashring gRPC load balancer with a specific replication factor
grpcbalancer.Register(balancer.NewConsistentHashringBuilder(
xxhash.Sum64,
c.DispatchHashringReplicationFactor,
defaultBackendsPerKey,
))
log.Ctx(ctx).Debug().Uint16("replication-factor", c.DispatchHashringReplicationFactor).Msg("registered new grpc hashring balancer")
}
balancerRegistryMutex.Unlock()

if len(c.PresharedKey) < 1 && c.GRPCAuthFunc == nil {
return nil, fmt.Errorf("a preshared key must be provided to authenticate API requests")
}
Expand Down Expand Up @@ -217,15 +237,17 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {

specificConcurrencyLimits := c.DispatchConcurrencyLimits
concurrencyLimits := specificConcurrencyLimits.WithOverallDefaultLimit(c.GlobalDispatchConcurrencyLimit)
log.Ctx(ctx).Info().EmbedObject(concurrencyLimits).Msg("configured dispatch concurrency limits")
log.Ctx(ctx).Info().EmbedObject(concurrencyLimits).
Uint16("hashring-replication-factor", c.DispatchHashringReplicationFactor).
Msg("configured dispatch concurrency limits")

dispatcher, err = combineddispatch.NewDispatcher(
combineddispatch.UpstreamAddr(c.DispatchUpstreamAddr),
combineddispatch.UpstreamCAPath(c.DispatchUpstreamCAPath),
combineddispatch.GrpcPresharedKey(dispatchPresharedKey),
combineddispatch.GrpcDialOpts(
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithDefaultServiceConfig(balancer.BalancerServiceConfig),
grpc.WithDefaultServiceConfig(balancer.ServiceConfigForBalancerName(dispatchBalancerName)),
),
combineddispatch.MetricsEnabled(c.DispatchClientMetricsEnabled),
combineddispatch.PrometheusSubsystem(c.DispatchClientMetricsPrefix),
Expand Down
41 changes: 40 additions & 1 deletion pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (

"github.com/authzed/spicedb/internal/datastore/memdb"
"github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/balancer"
"github.com/authzed/spicedb/pkg/cmd/util"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"
grpcbalancer "google.golang.org/grpc/balancer"
)

func TestServerGracefulTermination(t *testing.T) {
Expand All @@ -20,7 +23,21 @@ func TestServerGracefulTermination(t *testing.T) {
ds, err := memdb.NewMemdbDatastore(0, 1*time.Second, 10*time.Second)
require.NoError(t, err)

c := ConfigWithOptions(&Config{}, WithPresharedKey("psk"), WithDatastore(ds))
c := ConfigWithOptions(
&Config{},
WithPresharedKey("psk"),
WithDatastore(ds),
WithGRPCServer(util.GRPCServerConfig{
Network: util.BufferedNetwork,
Enabled: true,
}),
WithNamespaceCacheConfig(CacheConfig{Enabled: true}),
WithDispatchCacheConfig(CacheConfig{Enabled: true}),
WithClusterDispatchCacheConfig(CacheConfig{Enabled: true}),
WithHTTPGateway(util.HTTPServerConfig{Enabled: true}),
WithDashboardAPI(util.HTTPServerConfig{Enabled: true}),
WithMetricsAPI(util.HTTPServerConfig{Enabled: true}),
)
rs, err := c.Complete(ctx)
require.NoError(t, err)

Expand All @@ -33,6 +50,28 @@ func TestServerGracefulTermination(t *testing.T) {
<-ch
}

func TestBalancerRegistration(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds, err := memdb.NewMemdbDatastore(0, 1*time.Second, 10*time.Second)
require.NoError(t, err)

c := ConfigWithOptions(
&Config{},
WithPresharedKey("psk"),
WithDatastore(ds),
WithDispatchHashringReplicationFactor(1000),
)
srv, err := c.Complete(ctx)
require.NoError(t, err)
require.NoError(t, srv.(*completedServerConfig).closeFunc())

require.NotNil(t, grpcbalancer.Get(balancer.NameForReplicationFactor(1000)))
require.Nil(t, grpcbalancer.Get(balancer.NameForReplicationFactor(100)))
}

func TestServerGracefulTerminationOnError(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

Expand Down
8 changes: 8 additions & 0 deletions pkg/cmd/server/zz_generated.options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 237072e

Please sign in to comment.