Skip to content

Commit

Permalink
feat: add gRPC healthchecks (#863)
Browse files Browse the repository at this point in the history
* adds a gRPC healthcheck to the "metrics" port
* supports both gRPC and HTTP on "metrics" port
* adds relevant doc

Signed-off-by: Todd Baert <[email protected]>
Co-authored-by: Michael Beemer <[email protected]>
  • Loading branch information
toddbaert and beeme1mr authored Aug 29, 2023
1 parent 50df791 commit da30b7b
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 41 deletions.
57 changes: 37 additions & 20 deletions core/pkg/service/flag-evaluation/connect_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync"
"time"

Expand All @@ -21,7 +22,12 @@ import (
"github.com/open-feature/flagd/core/pkg/telemetry"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/protobuf/encoding/protojson"
)

Expand Down Expand Up @@ -127,18 +133,16 @@ func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listene
protojson.UnmarshalOptions{DiscardUnknown: true},
)

path, handler := schemaConnectV1.NewServiceHandler(fes, append(svcConf.Options, marshalOpts)...)
mux.Handle(path, handler)
mux.Handle(schemaConnectV1.NewServiceHandler(fes, append(svcConf.Options, marshalOpts)...))

s.serverMtx.Lock()
s.server = &http.Server{
ReadHeaderTimeout: time.Second,
Handler: handler,
Handler: mux,
}
s.serverMtx.Unlock()

// Add middlewares

metricsMiddleware := metricsmw.NewHTTPMetric(metricsmw.Config{
Service: svcConf.ServiceName,
MetricRecorder: s.metrics,
Expand Down Expand Up @@ -197,28 +201,41 @@ func (s *ConnectService) startServer(svcConf service.Configuration) error {

func (s *ConnectService) startMetricsServer(svcConf service.Configuration) error {
s.logger.Info(fmt.Sprintf("metrics and probes listening at %d", svcConf.MetricsPort))

grpc := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(grpc, health.NewServer())

mux := http.NewServeMux()
mux.Handle("/healthz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
mux.Handle("/readyz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if s.readinessEnabled && svcConf.ReadinessProbe() {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusPreconditionFailed)
}
}))
mux.Handle("/metrics", promhttp.Handler())

handler := http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
// if this is 'application/grpc' and HTTP2, handle with gRPC, otherwise HTTP.
if request.ProtoMajor == 2 && strings.HasPrefix(request.Header.Get("Content-Type"), "application/grpc") {
grpc.ServeHTTP(writer, request)
} else {
mux.ServeHTTP(writer, request)
return
}
})

s.metricsServerMtx.Lock()
s.metricsServer = &http.Server{
Addr: fmt.Sprintf(":%d", svcConf.MetricsPort),
ReadHeaderTimeout: 3 * time.Second,
Handler: h2c.NewHandler(handler, &http2.Server{}), // we need to use h2c to support plaintext HTTP2
}
s.metricsServerMtx.Unlock()
s.metricsServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/healthz":
w.WriteHeader(http.StatusOK)
case "/readyz":
if s.readinessEnabled && svcConf.ReadinessProbe() {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusPreconditionFailed)
}
case "/metrics":
promhttp.Handler().ServeHTTP(w, r)
default:
w.WriteHeader(http.StatusNotFound)
}
})

if err := s.metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("error returned from metrics server: %w", err)
}
Expand Down
51 changes: 34 additions & 17 deletions core/pkg/service/sync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@ import (
"fmt"
"net"
"net/http"
"strings"
"time"

rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc"
"github.com/open-feature/flagd/core/pkg/logger"
iservice "github.com/open-feature/flagd/core/pkg/service"
syncStore "github.com/open-feature/flagd/core/pkg/sync-store"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)

type Server struct {
Expand Down Expand Up @@ -101,26 +106,38 @@ func (s *Server) startServer() error {

func (s *Server) startMetricsServer() error {
s.Logger.Info(fmt.Sprintf("binding metrics to %d", s.config.MetricsPort))
s.metricsServer = &http.Server{
ReadHeaderTimeout: 3 * time.Second,
Addr: fmt.Sprintf(":%d", s.config.MetricsPort),
}
s.metricsServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/healthz":

grpc := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(grpc, health.NewServer())

mux := http.NewServeMux()
mux.Handle("/healthz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
mux.Handle("/readyz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if s.metricServerReady && s.config.ReadinessProbe() {
w.WriteHeader(http.StatusOK)
case "/readyz":
if s.metricServerReady && s.config.ReadinessProbe() {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusPreconditionFailed)
}
case "/metrics":
promhttp.Handler().ServeHTTP(w, r)
default:
w.WriteHeader(http.StatusNotFound)
} else {
w.WriteHeader(http.StatusPreconditionFailed)
}
}))
mux.Handle("/metrics", promhttp.Handler())

handler := http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
// if this is 'application/grpc' and HTTP2, handle with gRPC, otherwise HTTP.
if request.ProtoMajor == 2 && strings.HasPrefix(request.Header.Get("Content-Type"), "application/grpc") {
grpc.ServeHTTP(writer, request)
} else {
mux.ServeHTTP(writer, request)
return
}
})

s.metricsServer = &http.Server{
Addr: fmt.Sprintf(":%d", s.config.MetricsPort),
ReadHeaderTimeout: 3 * time.Second,
Handler: h2c.NewHandler(handler, &http2.Server{}), // we need to use h2c to support plaintext HTTP2
}
if err := s.metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("error returned from metrics server: %w", err)
}
Expand Down
9 changes: 7 additions & 2 deletions docs/other_resources/high_level_architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ process gets pushed to event subscribers.

## Readiness & Liveness probes

### HTTP

Flagd exposes HTTP liveness and readiness probes.
These probes can be used for K8s deployments.
With default
start-up configurations, these probes are exposed at the following URLs,
With default start-up configurations, these probes are exposed on the metrics port (default: 8014) at the following URLs,

- Liveness: <http://localhost:8014/healthz>
- Readiness: <http://localhost:8014/readyz>

### gRPC

Flagd exposes a [standard gRPC liveness check](https://github.com/grpc/grpc/blob/master/doc/health-checking.md) on the metrics port (default: 8014).

### Definition of Liveness

The liveness probe becomes active and HTTP 200 status is served as soon as Flagd service is up and running.
Expand Down
9 changes: 7 additions & 2 deletions web-docs/concepts/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ process gets pushed to event subscribers.

## Readiness & Liveness probes

### HTTP

Flagd exposes HTTP liveness and readiness probes.
These probes can be used for K8s deployments.
With default
start-up configurations, these probes are exposed at the following URLs,
With default start-up configurations, these probes are exposed on the metrics port (default: 8014) at the following URLs,

- Liveness: <http://localhost:8014/healthz>
- Readiness: <http://localhost:8014/readyz>

### gRPC

Flagd exposes a [standard gRPC liveness check](https://github.com/grpc/grpc/blob/master/doc/health-checking.md) on the metrics port (default: 8014).

### Definition of Liveness

The liveness probe becomes active and HTTP 200 status is served as soon as Flagd service is up and running.
Expand Down

0 comments on commit da30b7b

Please sign in to comment.