From 9eda649a7b0980c0b2faeb61dd10db1ad04eabb3 Mon Sep 17 00:00:00 2001 From: hpitkeathly Date: Thu, 19 Oct 2023 14:14:53 +0100 Subject: [PATCH 1/6] Group pubsub opts for api server and add metrics related to failing to publish messages --- mettle/api/api.go | 57 ++++++++++++++++++++++++++++++++++-------- mettle/api/api_test.go | 9 ++++++- mettle/common/BUILD | 1 + mettle/main.go | 23 ++++++++--------- 4 files changed, 66 insertions(+), 24 deletions(-) diff --git a/mettle/api/api.go b/mettle/api/api.go index 358ff308..8a5f33ae 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -1,4 +1,4 @@ -// Package api implements the remote execution API server. +// Package api implements the remote execution API servere package api import ( @@ -70,6 +70,21 @@ var totalFailedPubSubMessages = prometheus.NewCounter(prometheus.CounterOpts{ Help: "Number of times the Pub/Sub pool has failed", }) +var noExecutionInProgress = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "mettle", + Name: "no_execution_in_progress", +}) + +var requestPublishFailure = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "mettle", + Name: "request_publish_failures", +}) + +var responsePublishFailure = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "mettle", + Name: "response_publish_failures", +}) + var timeToComplete = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "mettle", Name: "time_to_complete_action_secs", @@ -83,6 +98,9 @@ var metrics = []prometheus.Collector{ totalSuccessfulActions, timeToComplete, totalFailedPubSubMessages, + noExecutionInProgress, + requestPublishFailure, + responsePublishFailure, } func init() { @@ -91,36 +109,50 @@ func init() { } } +// PubSubOpts holds information to configure queue options in the api server +type PubSubOpts struct { + RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"` + ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"` + ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"` + PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"` + NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"` + SubscriptionBatchSize uint `long:"subscription_batch_size" env:"API_SUBSCRIPTION" default:"100"` +} + // ServeForever serves on the given port until terminated. -func ServeForever(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) { - s, lis, err := serve(opts, name, requestQueue, responseQueue, preResponseQueue, apiURL, connTLS, allowedPlatform, storageURL, storageTLS, numPollers, responseBatchSize) +func ServeForever(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) { + s, lis, err := serve(opts, name, queueOpts, apiURL, connTLS, allowedPlatform, storageURL, storageTLS) if err != nil { log.Fatalf("%s", err) } grpcutil.ServeForever(lis, s) } -func serve(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) (*grpc.Server, net.Listener, error) { +func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) (*grpc.Server, net.Listener, error) { + responseSubscriptionName := queueOpts.ResponseQueue if name == "" { name = "mettle API server" + } else { + responseSubscriptionName = responseSubscriptionName + name } + log.Notice("Contacting CAS server on %s...", storageURL) client, err := rexclient.New(name, storageURL, storageTLS, "") if err != nil { return nil, nil, err } - if numPollers < 1 { - return nil, nil, fmt.Errorf("too few pollers specified: %d", numPollers) + if queueOpts.NumPollers < 1 { + return nil, nil, fmt.Errorf("too few pollers specified: %d", queueOpts.NumPollers) } srv := &server{ name: name, - requests: common.MustOpenTopic(requestQueue), - responses: common.MustOpenSubscription(responseQueue, responseBatchSize), - preResponses: common.MustOpenTopic(preResponseQueue), + requests: common.MustOpenTopic(queueOpts.RequestQueue), + responses: common.MustOpenSubscription(responseSubscriptionName, queueOpts.SubscriptionBatchSize), + preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue), jobs: map[string]*job{}, platform: allowedPlatform, client: client, - numPollers: numPollers, + numPollers: queueOpts.NumPollers, } log.Notice("Allowed platform values:") for k, v := range allowedPlatform { @@ -276,6 +308,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() if err := common.PublishWithOrderingKey(ctx, s.preResponses, b, req.ActionDigest.Hash, s.name); err != nil { + responsePublishFailure.Inc() log.Error("Failed to communicate pre-response message: %s", err) } b, _ = proto.Marshal(req) @@ -285,6 +318,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ Body: b, Metadata: platform, }); err != nil { + requestPublishFailure.Inc() log.Error("Failed to submit work to stream: %s", err) return err } @@ -313,7 +347,8 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution digest := &pb.Digest{Hash: req.Name} ch, _ := s.eventStream(digest, false) if ch == nil { - log.Warning("Request for execution %s which is not in progress", req.Name) + log.Error("Request for execution %s which is not in progress", req.Name) + noExecutionInProgress.Inc() return status.Errorf(codes.NotFound, "No execution in progress for %s", req.Name) } return s.streamEvents(digest, ch, stream) diff --git a/mettle/api/api_test.go b/mettle/api/api_test.go index 8ccc2502..a5936dfb 100644 --- a/mettle/api/api_test.go +++ b/mettle/api/api_test.go @@ -239,13 +239,20 @@ func setupServers(t *testing.T) (pb.ExecutionClient, *executor, *grpc.Server) { casaddr := setupCASServer() requests := fmt.Sprintf("mem://requests%d", queueID) responses := fmt.Sprintf("mem://responses%d", queueID) + queues := PubSubOpts{ + RequestQueue: requests, + ResponseQueue: responses, + PreResponseQueue: responses, + NumPollers: 1, + SubscriptionBatchSize: 1, + } queueID++ common.MustOpenTopic(requests) // Ensure these are created before anything tries common.MustOpenTopic(responses) // to open a subscription to either. s, lis, err := serve(grpcutil.Opts{ Host: "127.0.0.1", Port: 0, - }, "", requests, responses, responses, "", true, map[string][]string{}, casaddr, false, 1, 1) + }, "", queues, "", true, map[string][]string{}, casaddr, false) require.NoError(t, err) go s.Serve(lis) conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) diff --git a/mettle/common/BUILD b/mettle/common/BUILD index 8b30051a..2ce9e3d6 100644 --- a/mettle/common/BUILD +++ b/mettle/common/BUILD @@ -9,6 +9,7 @@ go_library( "///third_party/go/github.com_peterebden_go-cli-init_v4//logging", "///third_party/go/gocloud.dev//pubsub", "///third_party/go/gocloud.dev//pubsub/gcppubsub", + "///third_party/go/gocloud.dev//gcp", "///third_party/go/google.golang.org_genproto//googleapis/longrunning", "///third_party/go/google.golang.org_genproto//googleapis/pubsub/v1", "///third_party/go/google.golang.org_grpc//codes", diff --git a/mettle/main.go b/mettle/main.go index 86f3b655..b70793bb 100644 --- a/mettle/main.go +++ b/mettle/main.go @@ -52,16 +52,9 @@ var opts = struct { URL string `long:"url" description:"URL for communicating with other API servers"` TLS bool `long:"tls" description:"Use TLS for communication between api servers"` } `group:"Options controlling communication with other API servers for bootstrapping zero-downtime deployments." namespace:"api"` - Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"` - GRPC grpcutil.Opts `group:"Options controlling the gRPC server"` - Queues struct { - RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"` - ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"` - ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"` - PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"` - NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"` - ResponseBatchSize uint `long:"response_batch_size" env:"API_RESPONSE_BATCH_SIZE" default:"100"` - } `group:"Options controlling the pub/sub queues"` + Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"` + GRPC grpcutil.Opts `group:"Options controlling the gRPC server"` + Queues api.PubSubOpts `group:"Options controlling the pub/sub queues"` AllowedPlatform map[string][]string `long:"allowed_platform" description:"Allowed values for platform properties"` } `command:"api" description:"Start as an API server"` Worker struct { @@ -170,6 +163,12 @@ func main() { if cmd == "dual" { const requests = "mem://requests" const responses = "mem://responses" + queues := api.PubSubOpts{ + RequestQueue: requests, + ResponseQueue: responses, + NumPollers: 1, + } + // Must ensure the topics are created ahead of time. common.MustOpenTopic(requests) common.MustOpenTopic(responses) @@ -180,11 +179,11 @@ func main() { storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)] go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.CAFile, opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown) } - api.ServeForever(opts.Dual.GRPC, "", requests, responses, responses, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize) + api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS) } else if cmd == "worker" { worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.CAFile, opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown) } else if cmd == "api" { - api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.RequestQueue, opts.API.Queues.ResponseQueue+opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.PreResponseQueue, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize) + api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS) } else if err := one(); err != nil { log.Fatalf("%s", err) } From 68b521387eb7cfb4cbffb48cb0164ddff775750c Mon Sep 17 00:00:00 2001 From: hpitkeathly Date: Wed, 25 Oct 2023 16:17:27 +0100 Subject: [PATCH 2/6] Clean up --- mettle/api/api.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mettle/api/api.go b/mettle/api/api.go index 8a5f33ae..9d2cfc64 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -1,4 +1,4 @@ -// Package api implements the remote execution API servere +// Package api implements the remote execution API server package api import ( @@ -129,11 +129,11 @@ func ServeForever(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL } func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) (*grpc.Server, net.Listener, error) { - responseSubscriptionName := queueOpts.ResponseQueue + responseSubscription := queueOpts.ResponseQueue if name == "" { name = "mettle API server" } else { - responseSubscriptionName = responseSubscriptionName + name + responseSubscription = responseSubscription + name } log.Notice("Contacting CAS server on %s...", storageURL) @@ -147,7 +147,7 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, srv := &server{ name: name, requests: common.MustOpenTopic(queueOpts.RequestQueue), - responses: common.MustOpenSubscription(responseSubscriptionName, queueOpts.SubscriptionBatchSize), + responses: common.MustOpenSubscription(responseSubscription, queueOpts.SubscriptionBatchSize), preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue), jobs: map[string]*job{}, platform: allowedPlatform, From 4dc5f686ba395e5b90ad19a40e4d9b55445a5ba5 Mon Sep 17 00:00:00 2001 From: hpitkeathly Date: Wed, 25 Oct 2023 16:26:14 +0100 Subject: [PATCH 3/6] lint --- ChangeLog | 6 ++++++ VERSION | 2 +- mettle/api/api.go | 4 ++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/ChangeLog b/ChangeLog index dba3147a..65c3629e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,9 @@ +Version 11.3.0 +-------------- + * Add api server pubsub opts + * Add metrics for failed publish requests + * Add no execution in progress metric + Version 11.3.0 -------------- * Add a way to specify the batch size for the response subscription in diff --git a/VERSION b/VERSION index f628d2ea..72773deb 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.3.0 +11.4.0 diff --git a/mettle/api/api.go b/mettle/api/api.go index 9d2cfc64..f75d4a15 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -1,4 +1,4 @@ -// Package api implements the remote execution API server +// Package api implements the remote execution API server. package api import ( @@ -133,7 +133,7 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, if name == "" { name = "mettle API server" } else { - responseSubscription = responseSubscription + name + responseSubscription += name } log.Notice("Contacting CAS server on %s...", storageURL) From 172297971307cb51f30a304c5b9076af1395b9dc Mon Sep 17 00:00:00 2001 From: hpitkeathly Date: Wed, 25 Oct 2023 16:32:21 +0100 Subject: [PATCH 4/6] Remove unneeded import --- mettle/common/BUILD | 1 - 1 file changed, 1 deletion(-) diff --git a/mettle/common/BUILD b/mettle/common/BUILD index 2ce9e3d6..8b30051a 100644 --- a/mettle/common/BUILD +++ b/mettle/common/BUILD @@ -9,7 +9,6 @@ go_library( "///third_party/go/github.com_peterebden_go-cli-init_v4//logging", "///third_party/go/gocloud.dev//pubsub", "///third_party/go/gocloud.dev//pubsub/gcppubsub", - "///third_party/go/gocloud.dev//gcp", "///third_party/go/google.golang.org_genproto//googleapis/longrunning", "///third_party/go/google.golang.org_genproto//googleapis/pubsub/v1", "///third_party/go/google.golang.org_grpc//codes", From ada32ec558962b8eba9e2ab8d45335a02a2a387c Mon Sep 17 00:00:00 2001 From: hpitkeathly Date: Wed, 25 Oct 2023 16:59:37 +0100 Subject: [PATCH 5/6] correct subscription name --- mettle/api/api.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/mettle/api/api.go b/mettle/api/api.go index f75d4a15..fba9a816 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -129,11 +129,8 @@ func ServeForever(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL } func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) (*grpc.Server, net.Listener, error) { - responseSubscription := queueOpts.ResponseQueue if name == "" { name = "mettle API server" - } else { - responseSubscription += name } log.Notice("Contacting CAS server on %s...", storageURL) @@ -144,11 +141,12 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, if queueOpts.NumPollers < 1 { return nil, nil, fmt.Errorf("too few pollers specified: %d", queueOpts.NumPollers) } + preResponseURL := queueOpts.ResponseQueueSuffix + queueOpts.PreResponseQueue srv := &server{ name: name, requests: common.MustOpenTopic(queueOpts.RequestQueue), - responses: common.MustOpenSubscription(responseSubscription, queueOpts.SubscriptionBatchSize), - preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue), + responses: common.MustOpenSubscription(preResponseURL, queueOpts.SubscriptionBatchSize), + preResponses: common.MustOpenTopic(queueOpts.ResponseQueue), jobs: map[string]*job{}, platform: allowedPlatform, client: client, From 8bfdeabcf927191e13dfe0e4a54e9503ceb5fe09 Mon Sep 17 00:00:00 2001 From: hpitkeathly Date: Thu, 26 Oct 2023 09:14:20 +0100 Subject: [PATCH 6/6] Add preresponse publish duration metric --- mettle/api/api.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/mettle/api/api.go b/mettle/api/api.go index fba9a816..893b7f32 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -91,6 +91,12 @@ var timeToComplete = prometheus.NewHistogram(prometheus.HistogramOpts{ Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600, 900, 1200}, }) +var preResponsePublishDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "mettle", + Name: "publish_durations", + Buckets: prometheus.DefBuckets, +}) + var metrics = []prometheus.Collector{ totalRequests, currentRequests, @@ -101,6 +107,7 @@ var metrics = []prometheus.Collector{ noExecutionInProgress, requestPublishFailure, responsePublishFailure, + preResponsePublishDurations, } func init() { @@ -305,10 +312,12 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + preResponseStartTime := time.Now() if err := common.PublishWithOrderingKey(ctx, s.preResponses, b, req.ActionDigest.Hash, s.name); err != nil { responsePublishFailure.Inc() log.Error("Failed to communicate pre-response message: %s", err) } + preResponsePublishDurations.Observe(time.Since(preResponseStartTime).Seconds()) b, _ = proto.Marshal(req) ctx, cancel = context.WithTimeout(context.Background(), timeout) defer cancel()