Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group pubsub opts for api server and add metrics related to failing to #262

Merged
merged 6 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Version 11.3.0
--------------
* Add api server pubsub opts
* Add metrics for failed publish requests
* Add no execution in progress metric

Version 11.3.0
--------------
* Add a way to specify the batch size for the response subscription in
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.3.0
11.4.0
55 changes: 45 additions & 10 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ var totalFailedPubSubMessages = prometheus.NewCounter(prometheus.CounterOpts{
Help: "Number of times the Pub/Sub pool has failed",
})

var noExecutionInProgress = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "no_execution_in_progress",
})

var requestPublishFailure = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "request_publish_failures",
})

var responsePublishFailure = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "response_publish_failures",
})

fische marked this conversation as resolved.
Show resolved Hide resolved
var timeToComplete = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "mettle",
Name: "time_to_complete_action_secs",
Expand All @@ -83,6 +98,9 @@ var metrics = []prometheus.Collector{
totalSuccessfulActions,
timeToComplete,
totalFailedPubSubMessages,
noExecutionInProgress,
requestPublishFailure,
responsePublishFailure,
}

func init() {
Expand All @@ -91,36 +109,50 @@ func init() {
}
}

// PubSubOpts holds information to configure queue options in the api server
type PubSubOpts struct {
RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"`
ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"`
ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"`
PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"`
NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"`
SubscriptionBatchSize uint `long:"subscription_batch_size" env:"API_SUBSCRIPTION" default:"100"`
}

// ServeForever serves on the given port until terminated.
func ServeForever(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) {
s, lis, err := serve(opts, name, requestQueue, responseQueue, preResponseQueue, apiURL, connTLS, allowedPlatform, storageURL, storageTLS, numPollers, responseBatchSize)
func ServeForever(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) {
s, lis, err := serve(opts, name, queueOpts, apiURL, connTLS, allowedPlatform, storageURL, storageTLS)
if err != nil {
log.Fatalf("%s", err)
}
grpcutil.ServeForever(lis, s)
}

func serve(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) (*grpc.Server, net.Listener, error) {
func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) (*grpc.Server, net.Listener, error) {
responseSubscription := queueOpts.ResponseQueue
if name == "" {
name = "mettle API server"
} else {
responseSubscription += name
Hamishpk marked this conversation as resolved.
Show resolved Hide resolved
}

log.Notice("Contacting CAS server on %s...", storageURL)
client, err := rexclient.New(name, storageURL, storageTLS, "")
if err != nil {
return nil, nil, err
}
if numPollers < 1 {
return nil, nil, fmt.Errorf("too few pollers specified: %d", numPollers)
if queueOpts.NumPollers < 1 {
return nil, nil, fmt.Errorf("too few pollers specified: %d", queueOpts.NumPollers)
}
srv := &server{
name: name,
requests: common.MustOpenTopic(requestQueue),
responses: common.MustOpenSubscription(responseQueue, responseBatchSize),
preResponses: common.MustOpenTopic(preResponseQueue),
requests: common.MustOpenTopic(queueOpts.RequestQueue),
responses: common.MustOpenSubscription(responseSubscription, queueOpts.SubscriptionBatchSize),
preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue),
jobs: map[string]*job{},
platform: allowedPlatform,
client: client,
numPollers: numPollers,
numPollers: queueOpts.NumPollers,
}
log.Notice("Allowed platform values:")
for k, v := range allowedPlatform {
Expand Down Expand Up @@ -276,6 +308,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := common.PublishWithOrderingKey(ctx, s.preResponses, b, req.ActionDigest.Hash, s.name); err != nil {
responsePublishFailure.Inc()
log.Error("Failed to communicate pre-response message: %s", err)
}
b, _ = proto.Marshal(req)
Expand All @@ -285,6 +318,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
Body: b,
Metadata: platform,
}); err != nil {
requestPublishFailure.Inc()
log.Error("Failed to submit work to stream: %s", err)
return err
}
Expand Down Expand Up @@ -313,7 +347,8 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution
digest := &pb.Digest{Hash: req.Name}
ch, _ := s.eventStream(digest, false)
if ch == nil {
log.Warning("Request for execution %s which is not in progress", req.Name)
log.Error("Request for execution %s which is not in progress", req.Name)
noExecutionInProgress.Inc()
return status.Errorf(codes.NotFound, "No execution in progress for %s", req.Name)
}
return s.streamEvents(digest, ch, stream)
Expand Down
9 changes: 8 additions & 1 deletion mettle/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,20 @@ func setupServers(t *testing.T) (pb.ExecutionClient, *executor, *grpc.Server) {
casaddr := setupCASServer()
requests := fmt.Sprintf("mem://requests%d", queueID)
responses := fmt.Sprintf("mem://responses%d", queueID)
queues := PubSubOpts{
RequestQueue: requests,
ResponseQueue: responses,
PreResponseQueue: responses,
NumPollers: 1,
SubscriptionBatchSize: 1,
}
queueID++
common.MustOpenTopic(requests) // Ensure these are created before anything tries
common.MustOpenTopic(responses) // to open a subscription to either.
s, lis, err := serve(grpcutil.Opts{
Host: "127.0.0.1",
Port: 0,
}, "", requests, responses, responses, "", true, map[string][]string{}, casaddr, false, 1, 1)
}, "", queues, "", true, map[string][]string{}, casaddr, false)
require.NoError(t, err)
go s.Serve(lis)
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
Expand Down
1 change: 1 addition & 0 deletions mettle/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"///third_party/go/github.com_peterebden_go-cli-init_v4//logging",
"///third_party/go/gocloud.dev//pubsub",
"///third_party/go/gocloud.dev//pubsub/gcppubsub",
"///third_party/go/gocloud.dev//gcp",
"///third_party/go/google.golang.org_genproto//googleapis/longrunning",
"///third_party/go/google.golang.org_genproto//googleapis/pubsub/v1",
"///third_party/go/google.golang.org_grpc//codes",
Expand Down
23 changes: 11 additions & 12 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,9 @@ var opts = struct {
URL string `long:"url" description:"URL for communicating with other API servers"`
TLS bool `long:"tls" description:"Use TLS for communication between api servers"`
} `group:"Options controlling communication with other API servers for bootstrapping zero-downtime deployments." namespace:"api"`
Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"`
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
Queues struct {
RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"`
ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"`
ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"`
PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"`
NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"`
ResponseBatchSize uint `long:"response_batch_size" env:"API_RESPONSE_BATCH_SIZE" default:"100"`
} `group:"Options controlling the pub/sub queues"`
Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"`
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
Queues api.PubSubOpts `group:"Options controlling the pub/sub queues"`
AllowedPlatform map[string][]string `long:"allowed_platform" description:"Allowed values for platform properties"`
} `command:"api" description:"Start as an API server"`
Worker struct {
Expand Down Expand Up @@ -170,6 +163,12 @@ func main() {
if cmd == "dual" {
const requests = "mem://requests"
const responses = "mem://responses"
queues := api.PubSubOpts{
RequestQueue: requests,
ResponseQueue: responses,
NumPollers: 1,
}

// Must ensure the topics are created ahead of time.
common.MustOpenTopic(requests)
common.MustOpenTopic(responses)
Expand All @@ -180,11 +179,11 @@ func main() {
storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)]
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.CAFile, opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
}
api.ServeForever(opts.Dual.GRPC, "", requests, responses, responses, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize)
api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS)
} else if cmd == "worker" {
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.CAFile, opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
} else if cmd == "api" {
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.RequestQueue, opts.API.Queues.ResponseQueue+opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.PreResponseQueue, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize)
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS)
} else if err := one(); err != nil {
log.Fatalf("%s", err)
}
Expand Down
Loading