Skip to content

Commit

Permalink
add runner
Browse files Browse the repository at this point in the history
  • Loading branch information
ktong committed Mar 13, 2024
1 parent 4cce161 commit acb0cb5
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 0 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,5 @@ issues:
- gochecknoinits
- goconst
- goerr113
- lll
- wrapcheck
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
module github.com/nil-go/nilgo

go 1.21

require (
github.com/nil-go/konf v0.9.2
github.com/nil-go/sloth v0.3.0
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/nil-go/konf v0.9.2 h1:pfwtRmo2qces/xo2YFxE9yWP1eUAsIec3pH4eD8VV0s=
github.com/nil-go/konf v0.9.2/go.mod h1:ULW6PmJzWMd0F4KKNJQPhWD6Zu5eoX9U1C9SW2BPAr4=
github.com/nil-go/sloth v0.3.0 h1:lAqd8/pH6psoXZDpScCefY+3V9PVfJnIyOqMK1GSvwo=
github.com/nil-go/sloth v0.3.0/go.mod h1:SE8dLU9DLYeuLtu3kHp9PUEyj0OwUGKvTjSpx8tPdwo=
36 changes: 36 additions & 0 deletions internal/assert/assert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2024 The nilgo authors
// Use of this source code is governed by a MIT license found in the LICENSE file.

package assert

import (
"reflect"
"testing"
)

func Equal[T any](tb testing.TB, expected, actual T) {
tb.Helper()

if !reflect.DeepEqual(expected, actual) {
tb.Errorf("\nexpected: %v\n actual: %v", expected, actual)
}
}

func NoError(tb testing.TB, err error) {
tb.Helper()

if err != nil {
tb.Errorf("unexpected error: %v", err)
}
}

func EqualError(tb testing.TB, err error, message string) {
tb.Helper()

switch {
case err == nil:
tb.Errorf("\n actual: <nil>\nexpected: %v", message)
case err.Error() != message:
tb.Errorf("\n actual: %v\nexpected: %v", err.Error(), message)
}
}
24 changes: 24 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) 2024 The nilgo authors
// Use of this source code is governed by a MIT license found in the LICENSE file.

package nilgo

import "context"

func WithPreRun(runs func(context.Context) error) Option {
return func(opts *options) {
opts.preRuns = append(opts.preRuns, runs)
}
}

func WithGate(gate func(context.Context) error) Option {
return func(opts *options) {
opts.gates = append(opts.gates, gate)
}
}

type (
// Option configures the Runner with specific options.
Option func(*options)
options Runner
)
92 changes: 92 additions & 0 deletions run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2024 The nilgo authors
// Use of this source code is governed by a MIT license found in the LICENSE file.

package nilgo

import (
"context"
"errors"
"sync"
)

// Parallel executes the given run in parallel.
//
// It blocks until all run complete or ctx is done, then
// returns the first non-nil error (if any) received from run.
func Parallel(ctx context.Context, runs []func(context.Context) error) error {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)

var waitGroup sync.WaitGroup
for _, run := range runs {
waitGroup.Add(1)

run := run
go func() {
defer waitGroup.Done()

if err := run(ctx); err != nil {
cancel(err)
}
}()
}
waitGroup.Wait()

if err := context.Cause(ctx); err != nil && !errors.Is(err, ctx.Err()) {
return err //nolint:wrapcheck
}

return nil
}

// WithCloser wraps the given run with a dedicated close function,
// which should cause the run returns when close function executes.
//
// It guarantees both run and close functions complete
// and return the first non-nil error if any, so close function executes
// even if run function returns non-nil error.
func WithCloser(run func(context.Context) error, closer func() error) func(context.Context) error {
if run == nil {
run = func(context.Context) error { return nil }
}
if closer == nil {
closer = func() error { return nil }
}

return func(ctx context.Context) error {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)

var waitGroup sync.WaitGroup
waitGroup.Add(2) //nolint:gomnd
go func() {
defer waitGroup.Done()

if err := run(ctx); err != nil {
cancel(err)
}
}()
<-ctx.Done()

// Create a new context for closer function as the original context is done.
cctx, ccancel := context.WithCancelCause(context.WithoutCancel(ctx))
defer ccancel(nil)
go func() {
defer waitGroup.Done()

if err := closer(); err != nil {
ccancel(err)
}
}()
waitGroup.Wait()

if err := context.Cause(ctx); err != nil && !errors.Is(err, ctx.Err()) {
return err //nolint:wrapcheck
}
if err := context.Cause(cctx); err != nil && !errors.Is(err, cctx.Err()) {
return err //nolint:wrapcheck
}

return nil
}
}
62 changes: 62 additions & 0 deletions run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2024 The nilgo authors
// Use of this source code is governed by a MIT license found in the LICENSE file.

package nilgo_test

import (
"context"
"errors"
"testing"
"time"

"github.com/nil-go/nilgo"
"github.com/nil-go/nilgo/internal/assert"
)

func TestWithCloser(t *testing.T) {
testcases := []struct {
description string
run func(ctx context.Context) error
closer func() error
err string
}{
{
description: "no error",
run: func(context.Context) error {
return nil
},
closer: func() error {
return nil
},
},
{
description: "run error",
run: func(context.Context) error {
return errors.New("run error")
},
err: "run error",
},
{
description: "closer error",
closer: func() error {
return errors.New("closer error")
},
err: "closer error",
},
}

for _, testcase := range testcases {
testcase := testcase

t.Run(testcase.description, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
err := nilgo.WithCloser(testcase.run, testcase.closer)(ctx)
if testcase.err == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, testcase.err)
}
})
}
}
91 changes: 91 additions & 0 deletions runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2024 The nilgo authors
// Use of this source code is governed by a MIT license found in the LICENSE file.

package nilgo

import (
"context"
"errors"
"os/signal"
"sync"
"sync/atomic"
"syscall"
)

// Runner is a pre-configured runtime for executing runs in parallel.
//
// To create an Runner, use [New].
type Runner struct {
preRuns []func(context.Context) error
gates []func(context.Context) error
ran atomic.Bool
}

// New creates a new Runner with the given Option(s).
func New(opts ...Option) *Runner {
option := &options{}
for _, opt := range opts {
opt(option)
}

return (*Runner)(option)
}

// Run executes the given run in parallel with well-configured runtime,
// which includes logging, configuration, and telemetry.
//
// The run ran in parallel without any explicit order,
// which means it should not have temporal dependencies between each other.
//
// The execution can be interrupted if any run returns non-nil error,
// or it receives an OS signal syscall.SIGINT or syscall.SIGTERM.
// It waits all run return unless it's forcefully terminated by OS.
func (e *Runner) Run(ctx context.Context, runs ...func(context.Context) error) error {
if e == nil {
e = &Runner{}
}

if e.ran.Swap(true) {
return errors.New("runner is already ran") //nolint:goerr113
}

// Add gate to wait for all pre-runs start.
var waitGroup sync.WaitGroup
waitGroup.Add(len(e.preRuns))
for i, run := range e.preRuns {
run := run
e.preRuns[i] = func(ctx context.Context) error {
defer waitGroup.Done()

return run(ctx)
}
}
e.gates = append(e.gates,
func(context.Context) error {
waitGroup.Wait()

return nil
},
)

ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)

return Parallel(ctx, append(e.preRuns,
func(ctx context.Context) (err error) { //nolint:nonamedreturns
defer func() {
cancel(err)
}()

// Terminate signals apply to the runs, then cancel the root context for pre-runs.
nctx, ncancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer ncancel()

if err = Parallel(nctx, e.gates); err != nil {
return err
}

return Parallel(nctx, runs)
},
))
}
Loading

0 comments on commit acb0cb5

Please sign in to comment.