diff --git a/ChangeLog b/ChangeLog index dba3147a..65c3629e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,9 @@ +Version 11.3.0 +-------------- + * Add api server pubsub opts + * Add metrics for failed publish requests + * Add no execution in progress metric + Version 11.3.0 -------------- * Add a way to specify the batch size for the response subscription in diff --git a/VERSION b/VERSION index f628d2ea..72773deb 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.3.0 +11.4.0 diff --git a/mettle/api/api.go b/mettle/api/api.go index 358ff308..893b7f32 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -70,12 +70,33 @@ var totalFailedPubSubMessages = prometheus.NewCounter(prometheus.CounterOpts{ Help: "Number of times the Pub/Sub pool has failed", }) +var noExecutionInProgress = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "mettle", + Name: "no_execution_in_progress", +}) + +var requestPublishFailure = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "mettle", + Name: "request_publish_failures", +}) + +var responsePublishFailure = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "mettle", + Name: "response_publish_failures", +}) + var timeToComplete = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "mettle", Name: "time_to_complete_action_secs", Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600, 900, 1200}, }) +var preResponsePublishDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "mettle", + Name: "publish_durations", + Buckets: prometheus.DefBuckets, +}) + var metrics = []prometheus.Collector{ totalRequests, currentRequests, @@ -83,6 +104,10 @@ var metrics = []prometheus.Collector{ totalSuccessfulActions, timeToComplete, totalFailedPubSubMessages, + noExecutionInProgress, + requestPublishFailure, + responsePublishFailure, + preResponsePublishDurations, } func init() { @@ -91,36 +116,48 @@ func init() { } } +// PubSubOpts holds information to configure queue options in the api server +type PubSubOpts struct { + RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"` + ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"` + ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"` + PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"` + NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"` + SubscriptionBatchSize uint `long:"subscription_batch_size" env:"API_SUBSCRIPTION" default:"100"` +} + // ServeForever serves on the given port until terminated. -func ServeForever(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) { - s, lis, err := serve(opts, name, requestQueue, responseQueue, preResponseQueue, apiURL, connTLS, allowedPlatform, storageURL, storageTLS, numPollers, responseBatchSize) +func ServeForever(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) { + s, lis, err := serve(opts, name, queueOpts, apiURL, connTLS, allowedPlatform, storageURL, storageTLS) if err != nil { log.Fatalf("%s", err) } grpcutil.ServeForever(lis, s) } -func serve(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) (*grpc.Server, net.Listener, error) { +func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) (*grpc.Server, net.Listener, error) { if name == "" { name = "mettle API server" } + log.Notice("Contacting CAS server on %s...", storageURL) client, err := rexclient.New(name, storageURL, storageTLS, "") if err != nil { return nil, nil, err } - if numPollers < 1 { - return nil, nil, fmt.Errorf("too few pollers specified: %d", numPollers) + if queueOpts.NumPollers < 1 { + return nil, nil, fmt.Errorf("too few pollers specified: %d", queueOpts.NumPollers) } + preResponseURL := queueOpts.ResponseQueueSuffix + queueOpts.PreResponseQueue srv := &server{ name: name, - requests: common.MustOpenTopic(requestQueue), - responses: common.MustOpenSubscription(responseQueue, responseBatchSize), - preResponses: common.MustOpenTopic(preResponseQueue), + requests: common.MustOpenTopic(queueOpts.RequestQueue), + responses: common.MustOpenSubscription(preResponseURL, queueOpts.SubscriptionBatchSize), + preResponses: common.MustOpenTopic(queueOpts.ResponseQueue), jobs: map[string]*job{}, platform: allowedPlatform, client: client, - numPollers: numPollers, + numPollers: queueOpts.NumPollers, } log.Notice("Allowed platform values:") for k, v := range allowedPlatform { @@ -275,9 +312,12 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + preResponseStartTime := time.Now() if err := common.PublishWithOrderingKey(ctx, s.preResponses, b, req.ActionDigest.Hash, s.name); err != nil { + responsePublishFailure.Inc() log.Error("Failed to communicate pre-response message: %s", err) } + preResponsePublishDurations.Observe(time.Since(preResponseStartTime).Seconds()) b, _ = proto.Marshal(req) ctx, cancel = context.WithTimeout(context.Background(), timeout) defer cancel() @@ -285,6 +325,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ Body: b, Metadata: platform, }); err != nil { + requestPublishFailure.Inc() log.Error("Failed to submit work to stream: %s", err) return err } @@ -313,7 +354,8 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution digest := &pb.Digest{Hash: req.Name} ch, _ := s.eventStream(digest, false) if ch == nil { - log.Warning("Request for execution %s which is not in progress", req.Name) + log.Error("Request for execution %s which is not in progress", req.Name) + noExecutionInProgress.Inc() return status.Errorf(codes.NotFound, "No execution in progress for %s", req.Name) } return s.streamEvents(digest, ch, stream) diff --git a/mettle/api/api_test.go b/mettle/api/api_test.go index 8ccc2502..a5936dfb 100644 --- a/mettle/api/api_test.go +++ b/mettle/api/api_test.go @@ -239,13 +239,20 @@ func setupServers(t *testing.T) (pb.ExecutionClient, *executor, *grpc.Server) { casaddr := setupCASServer() requests := fmt.Sprintf("mem://requests%d", queueID) responses := fmt.Sprintf("mem://responses%d", queueID) + queues := PubSubOpts{ + RequestQueue: requests, + ResponseQueue: responses, + PreResponseQueue: responses, + NumPollers: 1, + SubscriptionBatchSize: 1, + } queueID++ common.MustOpenTopic(requests) // Ensure these are created before anything tries common.MustOpenTopic(responses) // to open a subscription to either. s, lis, err := serve(grpcutil.Opts{ Host: "127.0.0.1", Port: 0, - }, "", requests, responses, responses, "", true, map[string][]string{}, casaddr, false, 1, 1) + }, "", queues, "", true, map[string][]string{}, casaddr, false) require.NoError(t, err) go s.Serve(lis) conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) diff --git a/mettle/main.go b/mettle/main.go index 86f3b655..b70793bb 100644 --- a/mettle/main.go +++ b/mettle/main.go @@ -52,16 +52,9 @@ var opts = struct { URL string `long:"url" description:"URL for communicating with other API servers"` TLS bool `long:"tls" description:"Use TLS for communication between api servers"` } `group:"Options controlling communication with other API servers for bootstrapping zero-downtime deployments." namespace:"api"` - Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"` - GRPC grpcutil.Opts `group:"Options controlling the gRPC server"` - Queues struct { - RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"` - ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"` - ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"` - PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"` - NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"` - ResponseBatchSize uint `long:"response_batch_size" env:"API_RESPONSE_BATCH_SIZE" default:"100"` - } `group:"Options controlling the pub/sub queues"` + Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"` + GRPC grpcutil.Opts `group:"Options controlling the gRPC server"` + Queues api.PubSubOpts `group:"Options controlling the pub/sub queues"` AllowedPlatform map[string][]string `long:"allowed_platform" description:"Allowed values for platform properties"` } `command:"api" description:"Start as an API server"` Worker struct { @@ -170,6 +163,12 @@ func main() { if cmd == "dual" { const requests = "mem://requests" const responses = "mem://responses" + queues := api.PubSubOpts{ + RequestQueue: requests, + ResponseQueue: responses, + NumPollers: 1, + } + // Must ensure the topics are created ahead of time. common.MustOpenTopic(requests) common.MustOpenTopic(responses) @@ -180,11 +179,11 @@ func main() { storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)] go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.CAFile, opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown) } - api.ServeForever(opts.Dual.GRPC, "", requests, responses, responses, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize) + api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS) } else if cmd == "worker" { worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.CAFile, opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown) } else if cmd == "api" { - api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.RequestQueue, opts.API.Queues.ResponseQueue+opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.PreResponseQueue, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize) + api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS) } else if err := one(); err != nil { log.Fatalf("%s", err) }