Skip to content

Commit

Permalink
Group pubsub opts for api server and add metrics related to failing to (
Browse files Browse the repository at this point in the history
#262)

* Group pubsub opts for api server and add metrics related to failing to
publish messages
  • Loading branch information
Hamishpk authored Oct 26, 2023
1 parent 302e821 commit 4376ad7
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 24 deletions.
6 changes: 6 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Version 11.3.0
--------------
* Add api server pubsub opts
* Add metrics for failed publish requests
* Add no execution in progress metric

Version 11.3.0
--------------
* Add a way to specify the batch size for the response subscription in
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.3.0
11.4.0
62 changes: 52 additions & 10 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,44 @@ var totalFailedPubSubMessages = prometheus.NewCounter(prometheus.CounterOpts{
Help: "Number of times the Pub/Sub pool has failed",
})

var noExecutionInProgress = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "no_execution_in_progress",
})

var requestPublishFailure = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "request_publish_failures",
})

var responsePublishFailure = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "response_publish_failures",
})

var timeToComplete = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "mettle",
Name: "time_to_complete_action_secs",
Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600, 900, 1200},
})

var preResponsePublishDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "mettle",
Name: "publish_durations",
Buckets: prometheus.DefBuckets,
})

var metrics = []prometheus.Collector{
totalRequests,
currentRequests,
totalFailedActions,
totalSuccessfulActions,
timeToComplete,
totalFailedPubSubMessages,
noExecutionInProgress,
requestPublishFailure,
responsePublishFailure,
preResponsePublishDurations,
}

func init() {
Expand All @@ -91,36 +116,48 @@ func init() {
}
}

// PubSubOpts holds information to configure queue options in the api server
type PubSubOpts struct {
RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"`
ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"`
ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"`
PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"`
NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"`
SubscriptionBatchSize uint `long:"subscription_batch_size" env:"API_SUBSCRIPTION" default:"100"`
}

// ServeForever serves on the given port until terminated.
func ServeForever(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) {
s, lis, err := serve(opts, name, requestQueue, responseQueue, preResponseQueue, apiURL, connTLS, allowedPlatform, storageURL, storageTLS, numPollers, responseBatchSize)
func ServeForever(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) {
s, lis, err := serve(opts, name, queueOpts, apiURL, connTLS, allowedPlatform, storageURL, storageTLS)
if err != nil {
log.Fatalf("%s", err)
}
grpcutil.ServeForever(lis, s)
}

func serve(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) (*grpc.Server, net.Listener, error) {
func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) (*grpc.Server, net.Listener, error) {
if name == "" {
name = "mettle API server"
}

log.Notice("Contacting CAS server on %s...", storageURL)
client, err := rexclient.New(name, storageURL, storageTLS, "")
if err != nil {
return nil, nil, err
}
if numPollers < 1 {
return nil, nil, fmt.Errorf("too few pollers specified: %d", numPollers)
if queueOpts.NumPollers < 1 {
return nil, nil, fmt.Errorf("too few pollers specified: %d", queueOpts.NumPollers)
}
preResponseURL := queueOpts.ResponseQueueSuffix + queueOpts.PreResponseQueue
srv := &server{
name: name,
requests: common.MustOpenTopic(requestQueue),
responses: common.MustOpenSubscription(responseQueue, responseBatchSize),
preResponses: common.MustOpenTopic(preResponseQueue),
requests: common.MustOpenTopic(queueOpts.RequestQueue),
responses: common.MustOpenSubscription(preResponseURL, queueOpts.SubscriptionBatchSize),
preResponses: common.MustOpenTopic(queueOpts.ResponseQueue),
jobs: map[string]*job{},
platform: allowedPlatform,
client: client,
numPollers: numPollers,
numPollers: queueOpts.NumPollers,
}
log.Notice("Allowed platform values:")
for k, v := range allowedPlatform {
Expand Down Expand Up @@ -275,16 +312,20 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
preResponseStartTime := time.Now()
if err := common.PublishWithOrderingKey(ctx, s.preResponses, b, req.ActionDigest.Hash, s.name); err != nil {
responsePublishFailure.Inc()
log.Error("Failed to communicate pre-response message: %s", err)
}
preResponsePublishDurations.Observe(time.Since(preResponseStartTime).Seconds())
b, _ = proto.Marshal(req)
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := s.requests.Send(ctx, &pubsub.Message{
Body: b,
Metadata: platform,
}); err != nil {
requestPublishFailure.Inc()
log.Error("Failed to submit work to stream: %s", err)
return err
}
Expand Down Expand Up @@ -313,7 +354,8 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution
digest := &pb.Digest{Hash: req.Name}
ch, _ := s.eventStream(digest, false)
if ch == nil {
log.Warning("Request for execution %s which is not in progress", req.Name)
log.Error("Request for execution %s which is not in progress", req.Name)
noExecutionInProgress.Inc()
return status.Errorf(codes.NotFound, "No execution in progress for %s", req.Name)
}
return s.streamEvents(digest, ch, stream)
Expand Down
9 changes: 8 additions & 1 deletion mettle/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,20 @@ func setupServers(t *testing.T) (pb.ExecutionClient, *executor, *grpc.Server) {
casaddr := setupCASServer()
requests := fmt.Sprintf("mem://requests%d", queueID)
responses := fmt.Sprintf("mem://responses%d", queueID)
queues := PubSubOpts{
RequestQueue: requests,
ResponseQueue: responses,
PreResponseQueue: responses,
NumPollers: 1,
SubscriptionBatchSize: 1,
}
queueID++
common.MustOpenTopic(requests) // Ensure these are created before anything tries
common.MustOpenTopic(responses) // to open a subscription to either.
s, lis, err := serve(grpcutil.Opts{
Host: "127.0.0.1",
Port: 0,
}, "", requests, responses, responses, "", true, map[string][]string{}, casaddr, false, 1, 1)
}, "", queues, "", true, map[string][]string{}, casaddr, false)
require.NoError(t, err)
go s.Serve(lis)
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
Expand Down
23 changes: 11 additions & 12 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,9 @@ var opts = struct {
URL string `long:"url" description:"URL for communicating with other API servers"`
TLS bool `long:"tls" description:"Use TLS for communication between api servers"`
} `group:"Options controlling communication with other API servers for bootstrapping zero-downtime deployments." namespace:"api"`
Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"`
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
Queues struct {
RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"`
ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"`
ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"`
PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"`
NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"`
ResponseBatchSize uint `long:"response_batch_size" env:"API_RESPONSE_BATCH_SIZE" default:"100"`
} `group:"Options controlling the pub/sub queues"`
Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"`
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
Queues api.PubSubOpts `group:"Options controlling the pub/sub queues"`
AllowedPlatform map[string][]string `long:"allowed_platform" description:"Allowed values for platform properties"`
} `command:"api" description:"Start as an API server"`
Worker struct {
Expand Down Expand Up @@ -170,6 +163,12 @@ func main() {
if cmd == "dual" {
const requests = "mem://requests"
const responses = "mem://responses"
queues := api.PubSubOpts{
RequestQueue: requests,
ResponseQueue: responses,
NumPollers: 1,
}

// Must ensure the topics are created ahead of time.
common.MustOpenTopic(requests)
common.MustOpenTopic(responses)
Expand All @@ -180,11 +179,11 @@ func main() {
storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)]
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.CAFile, opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
}
api.ServeForever(opts.Dual.GRPC, "", requests, responses, responses, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize)
api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS)
} else if cmd == "worker" {
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.CAFile, opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
} else if cmd == "api" {
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.RequestQueue, opts.API.Queues.ResponseQueue+opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.PreResponseQueue, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize)
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS)
} else if err := one(); err != nil {
log.Fatalf("%s", err)
}
Expand Down

0 comments on commit 4376ad7

Please sign in to comment.