Skip to content

Commit

Permalink
adds ServiceContext method to allow Unit cancellation using context.C…
Browse files Browse the repository at this point in the history
…ontext (#6)
  • Loading branch information
basvanbeek authored Mar 11, 2022
1 parent e9a08c7 commit ba54268
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 69 deletions.
268 changes: 205 additions & 63 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
// limitations under the License.

// Package run implements an actor-runner with deterministic teardown.
// It uses the https://github.com/oklog/run/ package as its basis and enhances
// it with configuration registration and validation as well as pre-run phase
// logic.
// It uses the concepts found in the https://github.com/oklog/run/ package as
// its basis and enhances it with configuration registration and validation as
// well as pre-run phase logic.
package run

import (
"context"
"errors"
"fmt"
"os"
"path"
"strings"

color "github.com/logrusorgru/aurora"
"github.com/oklog/run"
"github.com/spf13/pflag"
"github.com/tetratelabs/multierror"
"github.com/tetratelabs/telemetry"
Expand Down Expand Up @@ -149,6 +149,13 @@ func (p preRunner) PreRun() error {
//
// Since Service is managed by Group, it is considered a design flaw to call any
// of the Service methods directly in application code.
//
// An alternative to implementing Service can be found in the ServiceContext
// interface which allows the Group Unit to listen for the cancellation signal
// from the Group provided context.Context.
//
// Important: Service and ServiceContext are mutually exclusive and should never
// be implemented in the same Unit.
type Service interface {
// Unit is embedded for Group registration and identification
Unit
Expand All @@ -158,9 +165,27 @@ type Service interface {
GracefulStop()
}

// Group builds on https://github.com/oklog/run to provide a deterministic way
// to manage service lifecycles. It allows for easy composition of elegant
// monoliths as well as adding signal handlers, metrics services, etc.
// ServiceContext interface should be implemented by Group Unit objects that
// need to run a blocking service until an error occurs or the by Group provided
// context.Context sends a cancellation signal.
//
// An alternative to implementing ServiceContext can be found in the Service
// interface which has specific Serve and GracefulStop methods.
//
// Important: Service and ServiceContext are mutually exclusive and should never
// be implemented in the same Unit.
type ServiceContext interface {
// Unit is embedded for Group registration and identification
Unit
// ServeContext starts the GroupService and blocks until the provided
// context is cancelled.
ServeContext(ctx context.Context) error
}

// Group builds on concepts from https://github.com/oklog/run to provide a
// deterministic way to manage service lifecycles. It allows for easy
// composition of elegant monoliths as well as adding signal handlers, metrics
// services, etc.
type Group struct {
// Name of the Group managed service. If omitted, the binary name will be
// used as found at runtime.
Expand All @@ -171,12 +196,12 @@ type Group struct {
Logger telemetry.Logger

f *FlagSet
r run.Group
i []Initializer
n []Namer
c []Config
p []PreRunner
s []Service
x []ServiceContext

configured bool
hsRegistered bool
Expand All @@ -189,7 +214,15 @@ type Group struct {
// The returned array of booleans is of the same size as the amount of provided
// Units, signalling for each provided Unit if it successfully registered with
// Group for at least one of the bootstrap phases or if it was ignored.
//
// Important: It is a design flaw for a Unit implementation to adhere to both
// the Service and ServiceContext interfaces. Passing along such a Unit will
// cause Register to throw a panic!
func (g *Group) Register(units ...Unit) []bool {
type ambiguousService interface {
Service
ServiceContext
}
hasRegistered := make([]bool, len(units))
for idx := range units {
if i, ok := units[idx].(Initializer); ok {
Expand All @@ -212,10 +245,18 @@ func (g *Group) Register(units ...Unit) []bool {
g.p = append(g.p, p)
hasRegistered[idx] = true
}
if svc, ok := units[idx].(ambiguousService); ok {
panic("ambiguous service " + svc.Name() + " encountered: " +
"a Unit MUST NOT implement both Service and ServiceContext")
}
if s, ok := units[idx].(Service); ok {
g.s = append(g.s, s)
hasRegistered[idx] = true
}
if x, ok := units[idx].(ServiceContext); ok {
g.x = append(g.x, x)
hasRegistered[idx] = true
}
}
return hasRegistered
}
Expand Down Expand Up @@ -264,6 +305,12 @@ func (g *Group) Deregister(units ...Unit) []bool {
hasDeregistered[idx] = true
}
}
for i := range g.x {
if g.x[i] != nil && g.x[i].(Unit) == units[idx] {
g.x[i] = nil // can't resize slice during Run, so nil
hasDeregistered[idx] = true
}
}
}
return hasDeregistered
}
Expand Down Expand Up @@ -363,9 +410,13 @@ func (g *Group) RunConfig(args ...string) (err error) {
for idx := range g.c {
// a Config might have been de-registered
if g.c[idx] == nil {
g.Logger.Debug("flagset",
"name", "--deregistered--",
"item", fmt.Sprintf("(%d/%d)", idx+1, len(g.c)),
)
continue
}
g.Logger.Debug("registering flags",
g.Logger.Debug("flagset",
"name", g.c[idx].Name(),
"item", fmt.Sprintf("(%d/%d)", idx+1, len(g.c)),
)
Expand Down Expand Up @@ -413,19 +464,27 @@ func (g *Group) RunConfig(args ...string) (err error) {
}

// Validate Config inputs
for idx := range g.c {
// a Config might have been de-registered during Run
if g.c[idx] == nil {
g.Logger.Debug("skipping validate", "index", idx)
continue
}
g.Logger.Debug("validate config",
"name", g.c[idx].Name(),
"item", fmt.Sprintf("(%d/%d)", idx+1, len(g.c)),
)
if vErr := g.c[idx].Validate(); vErr != nil {
err = multierror.Append(err, vErr)
}
for idx, cfg := range g.c {
func(itemNr int, cfg Config) {
// a Config might have been de-registered during Run
if cfg == nil {
g.Logger.Debug("validate-skip",
"name", "--deregistered--",
"item", fmt.Sprintf("(%d/%d)", itemNr, len(g.c)),
)
return
}
var vErr error
l := g.Logger.With(
"name", cfg.Name(),
"item", fmt.Sprintf("(%d/%d)", itemNr, len(g.c)))
l.Debug("validate")
defer l.Debug("validate-exit", debugLogError(vErr)...)
vErr = cfg.Validate()
if vErr != nil {
err = multierror.Append(err, vErr)
}
}(idx+1, cfg)
}

// exit on at least one Validate error
Expand All @@ -451,26 +510,31 @@ func (g *Group) RunConfig(args ...string) (err error) {
//
// Config phase (serially, in order of Unit registration)
// - FlagSet() Get & register all FlagSets from Config Units.
// - Flag Parsing Using the provided args (os.Args if empty)
// - Flag Parsing Using the provided args (os.Args if empty).
// - Validate() Validate Config Units. Exit on first error.
//
// PreRunner phase (serially, in order of Unit registration)
// - PreRun() Execute PreRunner Units. Exit on first error.
//
// Service phase (concurrently)
// Service and ServiceContext phase (concurrently)
// - Serve() Execute all Service Units in separate Go routines.
// - Wait Block until one of the Serve() methods returns
// - GracefulStop() Call interrupt handlers of all Service Units.
// ServeContext() Execute all ServiceContext Units.
// - Wait Block until one of the Serve() or ServeContext()
// methods returns.
// - GracefulStop() Call interrupt handlers of all Service Units and
// cancel the context.Context provided to all the
// ServiceContext units registered.
//
// Run will return with the originating error on:
// - first Config.Validate() returning an error
// - first PreRunner.PreRun() returning an error
// - first Service.Serve() returning (error or nil)
// - first Service.Serve() or ServiceContext.ServeContext() returning
//
// Note: it is perfectly acceptable to use Group without Service units. In this
// case Run will just return immediately after having handled the Config and
// PreRunner phases of the registered Units. This is particularly convenient if
// using the common pkg middlewares in a CLI / ephemeral environment.
// Note: it is perfectly acceptable to use Group without Service and
// ServiceContext units. In this case Run will just return immediately after
// having handled the Config and PreRunner phases of the registered Units. This
// is particularly convenient if using the common pkg middlewares in a CLI,
// script, or other ephemeral environment.
func (g *Group) Run(args ...string) (err error) {
if !g.configured {
// run config registration and flag parsing stages
Expand Down Expand Up @@ -522,45 +586,106 @@ func (g *Group) Run(args ...string) (err error) {

// execute pre run stage and exit on error
for idx := range g.p {
// a PreRunner might have been de-registered during Run
if g.p[idx] == nil {
continue
}
g.Logger.Debug("pre-run",
"name", g.p[idx].Name(),
"item", fmt.Sprintf("(%d/%d)", idx+1, len(g.p)),
)
if err := g.p[idx].PreRun(); err != nil {
return fmt.Errorf("pre-run %s: %w", g.p[idx].Name(), err)
if err = func(itemNr int, pr PreRunner) error {
// a PreRunner might have been de-registered during Run
if pr == nil {
g.Logger.Debug("pre-run-skip",
"name", "--deregistered--",
"item", fmt.Sprintf("(%d/%d)", itemNr, len(g.p)),
)
return nil
}
var err error
l := g.Logger.With(
"name", pr.Name(),
"item", fmt.Sprintf("(%d/%d)", itemNr, len(g.p)))
l.Debug("pre-run")
defer l.Debug("pre-run-exit", debugLogError(err)...)
err = pr.PreRun()
if err != nil {
return fmt.Errorf("pre-run %s: %w", pr.Name(), err)
}
return nil
}(idx+1, g.p[idx]); err != nil {
return err
}
}

// feed our registered services to our internal run.Group
var (
s []Service
x []ServiceContext
)
for idx := range g.s {
// a Service might have been de-registered during Run
s := g.s[idx]
if s == nil {
continue
if g.s[idx] != nil {
s = append(s, g.s[idx])
}
hasServices = true
itemNr := idx + 1
g.Logger.Debug("serve",
"name", s.Name(),
"item", fmt.Sprintf("(%d/%d)", itemNr, len(g.s)),
)
g.r.Add(func() error {
return s.Serve()
}, func(_ error) {
g.Logger.Debug("stop",
"name", s.Name(),
"item", fmt.Sprintf("(%d/%d)", itemNr, len(g.s)),
)
s.GracefulStop()
})
}
for idx := range g.x {
// a ServiceContext might have been de-registered during Run
if g.x[idx] != nil {
x = append(x, g.x[idx])
}
}
if len(s)+len(x) == 0 {
// we have no Service or ServiceContext to run.
return nil
}

// setup our cancellable context and error channel
ctx, cancel := context.WithCancel(context.Background())
errs := make(chan error, len(s)+len(x))
hasServices = true

// run each Service
for idx, svc := range s {
go func(itemNr int, svc Service) {
l := g.Logger.With(
"name", svc.Name(),
"item", fmt.Sprintf("(%d/%d)", itemNr, len(s)))
l.Debug("serve")
defer l.Debug("serve-exit", debugLogError(err)...)
err := svc.Serve()
errs <- err
}(idx+1, svc)
}
// run each ServiceContext
for idx, svc := range x {
go func(itemNr int, svc ServiceContext) {
l := g.Logger.With(
"name", svc.Name(),
"item", fmt.Sprintf("(%d/%d)", itemNr, len(x)))
l.Debug("serve-context")
defer l.Debug("serve-context-exit", debugLogError(err)...)
err := svc.ServeContext(ctx)
errs <- err
}(idx+1, svc)
}

// wait for the first Service or ServiceContext to stop and special case
// its error as the originator
err = <-errs

// signal all Service and ServiceContext Units to stop
cancel()
for idx, svc := range s {
go func(itemNr int, svc Service) {
l := g.Logger.With(
"name", svc.Name(),
"item", fmt.Sprintf("(%d/%d)", itemNr, len(s)))
l.Debug("graceful-stop")
defer l.Debug("graceful-stop-exit")
svc.GracefulStop()
}(idx+1, svc)
}

// start registered services and block
return g.r.Run()
// wait for all Service and ServiceContext Units to have returned
for i := 1; i < cap(errs); i++ {
<-errs
}

// return the originating error
return err
}

// ListUnits returns a list of all Group phases and the Units registered to each
Expand Down Expand Up @@ -596,14 +721,31 @@ func (g Group) ListUnits() string {
}
}
if len(g.s) > 0 {
s += "\n- serve : "
s += "\n- serve: "
for _, u := range g.s {
if u != nil {
t = "svc"
s += u.Name() + " "
}
}
}
if len(g.x) > 0 {
s += "\n- serve-context: "
for _, u := range g.x {
if u != nil {
t = "svc"
s += u.Name() + " "
}
}
}

return fmt.Sprintf("Group: %s [%s]%s", g.Name, t, s)
}

func debugLogError(err error) (kv []interface{}) {
if err == nil {
return
}
kv = append(kv, "error", err.Error())
return
}
Loading

0 comments on commit ba54268

Please sign in to comment.