Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to set connection pool size in redis clients. #298

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Version 11.9.6
--------------
* Refactor of redis client initialisation so it's easier to add new
options.
* Add option to set connection pool size in redis clients.

Version 11.9.5
--------------
* Register new redis_latency_ms metrics so it can be scraped.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.9.5
11.9.6
1 change: 1 addition & 0 deletions mettle/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ go_binary(
srcs = ["main.go"],
visibility = ["//package:all"],
deps = [
"///third_party/go/github.com_go-redis_redis_v8//:v8",
"///third_party/go/github.com_peterebden_go-cli-init_v4//flags",
"///third_party/go/github.com_peterebden_go-cli-init_v4//logging",
"//cli",
Expand Down
52 changes: 49 additions & 3 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
package main

import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"runtime"
"runtime/pprof"
"strings"
"time"

"github.com/go-redis/redis/v8"
"github.com/peterebden/go-cli-init/v4/flags"
"github.com/peterebden/go-cli-init/v4/logging"

Expand All @@ -32,6 +35,8 @@ type RedisOpts struct {
ReadURL string `long:"read_url" env:"REDIS_READ_URL" description:"host:port of a Redis read replica, if set any read operation will be routed to it"`
Password string `long:"password" description:"AUTH password"`
PasswordFile string `long:"password_file" env:"REDIS_PASSWORD_FILE" description:"File containing AUTH password"`
PoolSize int `long:"pool_size" env:"REDIS_POOL_SIZE" default:"10" description:"Size of connection pool on primary redis client"`
ReadPoolSize int `long:"read_pool_size" env:"REDIS_READ_POOL_SIZE" default:"10" description:"Size of connection pool on reading redis client"`
CAFile string `long:"ca_file" env:"REDIS_CA_FILE" description:"File containing the Redis instance CA cert"`
TLS bool `long:"tls" description:"Use TLS for connecting to Redis"`
}
Expand Down Expand Up @@ -170,6 +175,7 @@ func main() {
NumPollers: 1,
SubscriptionBatchSize: 100,
}
primaryRedis, readRedis := opts.Dual.Redis.Clients()

// Must ensure the topics are created ahead of time.
common.MustOpenTopic(requests)
Expand All @@ -179,11 +185,12 @@ func main() {
}
for i := 0; i < opts.Dual.NumWorkers; i++ {
storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)]
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.CAFile, opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
}
api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS)
} else if cmd == "worker" {
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.CAFile, opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
primaryRedis, readRedis := opts.Worker.Redis.Clients()
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
} else if cmd == "api" {
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS)
} else if err := one(); err != nil {
Expand Down Expand Up @@ -211,14 +218,38 @@ func one() error {
defer f.Close()
defer pprof.WriteHeapProfile(f)
}
primaryRedis, readRedis := opts.One.Redis.Clients()
for _, action := range opts.One.Args.Actions {
if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, opts.One.Redis.URL, opts.One.Redis.ReadURL, opts.One.Redis.ReadPassword(), opts.One.Redis.CAFile, opts.One.Redis.TLS, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil {
if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, primaryRedis, readRedis, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil {
return err
}
}
return nil
}

func (r RedisOpts) Clients() (primary, read *redis.Client) {
password := r.ReadPassword()
tlsConfig := r.ReadTLSConfig()

primary = redis.NewClient(&redis.Options{
Addr: r.URL,
Password: password,
TLSConfig: tlsConfig,
PoolSize: r.PoolSize,
})
if r.ReadURL != "" {
read = redis.NewClient(&redis.Options{
Addr: r.ReadURL,
Password: password,
TLSConfig: tlsConfig,
PoolSize: r.ReadPoolSize,
})
} else {
read = primary
}
return
}

func (r RedisOpts) ReadPassword() string {
if r.Password != "" {
return r.Password
Expand All @@ -231,3 +262,18 @@ func (r RedisOpts) ReadPassword() string {
}
return strings.TrimSpace(string(b))
}

func (r RedisOpts) ReadTLSConfig() *tls.Config {
if !r.TLS {
return nil
}
caCert, err := os.ReadFile(r.CAFile)
if err != nil {
log.Fatalf("Failed to read CA file at %s or load TLS config for Redis: %v", r.CAFile, err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return &tls.Config{
RootCAs: caCertPool,
}
}
47 changes: 8 additions & 39 deletions mettle/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package worker

import (
"context"
"crypto/tls"
"crypto/x509"
"os"
"time"

Expand Down Expand Up @@ -36,48 +34,19 @@ var redisLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Buckets: []float64{20, 50, 100, 200, 500, 1000, 2000, 5000, 10000},
}, []string{"command"})

func getTLSConfig(caFile string) (*tls.Config, error) {
caCert, err := os.ReadFile(caFile)
if err != nil {
return &tls.Config{}, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return &tls.Config{
RootCAs: caCertPool,
}, nil
}

// newRedisClient augments an existing elan.Client with a Redis connection.
// All usage of Redis is best-effort only.
// If readURL is set, all reads will happen on this URL. If not, everything
// will go to url.
func newRedisClient(client elan.Client, url, readURL, password, caFile string, useTLS bool) elan.Client {
primaryOpts := &redis.Options{
Addr: url,
Password: password,
}
readOpts := &redis.Options{
Addr: readURL,
Password: password,
}
if useTLS {
tlsConfig, err := getTLSConfig(caFile)
if err != nil {
log.Fatalf("Failed to read CA file at %s or load TLS config for Redis: %s", caFile, err)
}
primaryOpts.TLSConfig = tlsConfig
readOpts.TLSConfig = tlsConfig
}
primaryClient := redis.NewClient(primaryOpts)
readClient := primaryClient
if readURL != "" {
readClient = redis.NewClient(readOpts)
// If readRedis is set, all reads will happen on this client. If not, everything
// will go to the primary client.
func newRedisClient(client elan.Client, primaryRedis, readRedis *redis.Client) elan.Client {
// This is a safeguard in case the caller does not pass readRedis.
if readRedis == nil {
readRedis = primaryRedis
}
return &elanRedisWrapper{
elan: client,
redis: &monitoredRedisClient{primaryClient},
readRedis: &monitoredRedisClient{readClient},
redis: &monitoredRedisClient{primaryRedis},
readRedis: &monitoredRedisClient{readRedis},
timeout: 1 * time.Second,
maxSize: 200 * 1012, // 200 Kelly-Bootle standard units
limiter: rate.NewLimiter(rate.Every(time.Second*10), 10),
Expand Down
19 changes: 10 additions & 9 deletions mettle/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/dgraph-io/ristretto"
"github.com/dustin/go-humanize"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
Expand Down Expand Up @@ -147,16 +148,16 @@ func init() {
}

// RunForever runs the worker, receiving jobs until terminated.
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown)
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown)
log.Fatalf("Failed to run: %s", err)
}

// RunOne runs one single request, returning any error received.
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile string, primaryRedis, readRedis *redis.Client, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
// Must create this to submit on first
topic := common.MustOpenTopic("mem://requests")
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, primaryRedis, readRedis, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
if err != nil {
return err
}
Expand All @@ -183,8 +184,8 @@ func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tok
return nil
}

func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
if err != nil {
return err
}
Expand Down Expand Up @@ -236,7 +237,7 @@ func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, c
}
}

func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) {
func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) {
// Make sure we have a directory to run in
if err := os.MkdirAll(dir, os.ModeDir|0755); err != nil {
return nil, fmt.Errorf("Failed to create working directory: %s", err)
Expand Down Expand Up @@ -312,8 +313,8 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage,
costs: map[string]*bbru.MonetaryResourceUsage_Expense{},
metricTicker: time.NewTicker(5 * time.Minute),
}
if redis != "" {
w.client = newRedisClient(client, redis, readRedis, redisPassword, redisCAFile, redisTLS)
if primaryRedis != nil {
w.client = newRedisClient(client, primaryRedis, readRedis)
}
if ackExtension > 0 {
if !strings.HasPrefix(requestQueue, "gcppubsub://") {
Expand Down
Loading