Skip to content

Commit

Permalink
feat(nav): add With to session runner (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Oct 7, 2023
1 parent 360b292 commit 5e7eb80
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 19 deletions.
19 changes: 19 additions & 0 deletions xfs/nav/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,25 @@ func universalCallback(name string, extended bool) nav.LabelledTraverseCallback
}
}

func universalCallbackNoAssert(name string, extended bool) nav.LabelledTraverseCallback {
ex := lo.Ternary(extended, "-EX", "")

return nav.LabelledTraverseCallback{
Label: "test universal callback",
Fn: func(item *nav.TraverseItem) error {
depth := lo.TernaryF(extended,
func() int { return item.Extension.Depth },
func() int { return 9999 },
)
GinkgoWriter.Printf(
"---> 🌊 UNIVERSAL//%v-CALLBACK%v: (depth:%v) '%v'\n", name, ex, depth, item.Path,
)

return nil
},
}
}

func foldersCallback(name string, extended bool) nav.LabelledTraverseCallback {
ex := lo.Ternary(extended, "-EX", "")

Expand Down
5 changes: 3 additions & 2 deletions xfs/nav/navigation-async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

var (
navigatorRoutineName = boost.GoRoutineName("✨ observable-navigator")
outputChTimeout = time.Second
)

type (
Expand Down Expand Up @@ -279,7 +280,7 @@ var _ = Describe("navigation", Ordered, func() {
given: "Primary Session Consume",
should: "enable output to be consumed externally",
operator: func(op nav.AccelerationOperators) nav.AccelerationOperators {
return op.NoW(4).Consume(outputCh)
return op.NoW(4).Consume(outputCh, outputChTimeout)
},
},
}, SpecTimeout(time.Second*2)),
Expand All @@ -290,7 +291,7 @@ var _ = Describe("navigation", Ordered, func() {
should: "enable output to be consumed externally",
operator: func(op nav.AccelerationOperators) nav.AccelerationOperators {
outputCh = nav.CreateTraverseOutputCh(3)
return op.NoW(4).Consume(outputCh)
return op.NoW(4).Consume(outputCh, outputChTimeout)
},
// 🔥 panic: send on closed channel;
//
Expand Down
98 changes: 96 additions & 2 deletions xfs/nav/navigation-runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,35 @@ package nav
import (
"fmt"
"runtime"
"time"

"github.com/samber/lo"
"github.com/snivilised/lorax/boost"
)

type CreateNewRunnerWith int

const (
RunnerDefault CreateNewRunnerWith = 0
RunnerWithResume CreateNewRunnerWith = 1
RunnerWithPool CreateNewRunnerWith = 2
)

type Acceleration struct {
WgAn boost.WaitGroupAn
RoutineName boost.GoRoutineName
NoW int
JobsChOut TraverseItemJobStream
JobResultsCh boost.JobOutputStream[TraverseOutput]
OutputChTimeout time.Duration
}

type RunnerInfo struct {
ResumeInfo *Resumption
PrimeInfo *Prime
AccelerationInfo *Acceleration
}

const (
MinNoWorkers = 1
MaxNoWorkers = 100
Expand All @@ -31,10 +55,11 @@ type Runnable interface {
type AccelerationOperators interface {
Runnable
NoW(now int) AccelerationOperators
Consume(outputCh boost.JobOutputStream[TraverseOutput]) AccelerationOperators
Consume(outputCh boost.JobOutputStream[TraverseOutput], timeout time.Duration) AccelerationOperators
}

type SessionRunner interface {
With(with CreateNewRunnerWith, info *RunnerInfo) NavigationRunner
Primary(info *Prime) NavigationRunner
Resume(info *Resumption) NavigationRunner
}
Expand All @@ -54,6 +79,71 @@ type runner struct {
sync *acceleratedSync
}

func (r *runner) With(with CreateNewRunnerWith, info *RunnerInfo) NavigationRunner {
lo.TernaryF(with&RunnerWithResume == 0,
func() NavigationRunner {
return r.Primary(&Prime{
Path: info.PrimeInfo.Path,
OptionsFn: info.PrimeInfo.OptionsFn,
})
},
func() NavigationRunner {
return r.Resume(&Resumption{
RestorePath: info.ResumeInfo.RestorePath,
Restorer: info.ResumeInfo.Restorer,
Strategy: info.ResumeInfo.Strategy,
})
},
)

lo.TernaryF(with&RunnerWithPool == 0,
func() AccelerationOperators {
return r
},
func() AccelerationOperators {
if info.AccelerationInfo == nil {
// As this is not a user facing issue (ie programming error),
// it does not have to be i18n error
//
panic("internal: acceleration info missing from runner info")
}

if info.AccelerationInfo.JobsChOut == nil {
panic("internal: job channel not set on acceleration info")
}

return r.WithPool(&AsyncInfo{
NavigatorRoutineName: info.AccelerationInfo.RoutineName,
WaitAQ: info.AccelerationInfo.WgAn,
JobsChanOut: info.AccelerationInfo.JobsChOut,
})
},
)

if info.AccelerationInfo != nil && with&RunnerWithPool > 0 {
if info.AccelerationInfo.JobResultsCh != nil {
r.Consume(info.AccelerationInfo.JobResultsCh, info.AccelerationInfo.OutputChTimeout)
}

if info.AccelerationInfo.NoW > 0 {
r.NoW(info.AccelerationInfo.NoW)
}
}

return r
}

func IfWithPoolUseContext(with CreateNewRunnerWith, args ...any) []any {
return lo.TernaryF(with&RunnerWithPool > 0,
func() []any {
return args
},
func() []any {
return []any{}
},
)
}

func (r *runner) Primary(info *Prime) NavigationRunner {
r.session = &Primary{
Path: info.Path,
Expand Down Expand Up @@ -99,8 +189,12 @@ func (r *runner) NoW(now int) AccelerationOperators {
return r
}

func (r *runner) Consume(outputCh boost.JobOutputStream[TraverseOutput]) AccelerationOperators {
func (r *runner) Consume(
outputCh boost.JobOutputStream[TraverseOutput],
timeout time.Duration,
) AccelerationOperators {
r.sync.outputChOut = outputCh
r.sync.outputChTimeout = timeout

return r
}
Expand Down
19 changes: 11 additions & 8 deletions xfs/nav/navigation-sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"time"

"github.com/snivilised/lorax/boost"
)
Expand Down Expand Up @@ -59,10 +60,11 @@ func (s *inlineSync) Run(callback sessionCallback, _ syncable, _ ...any) (*Trave

type acceleratedSync struct {
baseSync
ai *AsyncInfo
noWorkers int
outputChOut boost.JobOutputStream[TraverseOutput]
pool *boost.WorkerPool[TraverseItemInput, TraverseOutput]
ai *AsyncInfo
noWorkers int
outputChOut boost.JobOutputStream[TraverseOutput]
outputChTimeout time.Duration
pool *boost.WorkerPool[TraverseItemInput, TraverseOutput]
}

func (s *acceleratedSync) Run(callback sessionCallback, nc syncable, args ...any) (*TraverseResult, error) {
Expand All @@ -85,10 +87,11 @@ func (s *acceleratedSync) Run(callback sessionCallback, nc syncable, args ...any
func (s *acceleratedSync) start(ctx context.Context, cancel context.CancelFunc) {
s.pool = boost.NewWorkerPool[TraverseItemInput, TraverseOutput](
&boost.NewWorkerPoolParams[TraverseItemInput, TraverseOutput]{
NoWorkers: s.noWorkers,
Exec: workerExecutive,
JobsCh: s.ai.JobsChanOut,
WaitAQ: s.ai.WaitAQ,
NoWorkers: s.noWorkers,
OutputChTimeout: s.outputChTimeout,
Exec: workerExecutive,
JobsCh: s.ai.JobsChanOut,
WaitAQ: s.ai.WaitAQ,
})

// We are handing over ownership of this channel (ai.OutputsChIn) to the pool as
Expand Down
105 changes: 105 additions & 0 deletions xfs/nav/navigation-with-runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package nav_test

import (
"context"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/snivilised/extendio/internal/helpers"
"github.com/snivilised/extendio/xfs/nav"
"github.com/snivilised/lorax/boost"

. "github.com/snivilised/extendio/i18n"
)

var _ = Describe("NavigationWithRunner", Ordered, func() {
var (
root string
jroot string
fromJSONPath string
)

BeforeAll(func() {
root = musico()
jroot = helpers.JoinCwd("Test", "json")
fromJSONPath = helpers.Path(jroot, "resume-state.json")
})

BeforeEach(func() {
if err := Use(func(o *UseOptions) {
o.Tag = DefaultLanguage.Get()
}); err != nil {
Fail(err.Error())
}
})

Context("resume and worker pool acceleration", func() {
When("universal: listen pending(logged)", func() {
It("should: ...", SpecTimeout(time.Second*5), func(ctxSpec SpecContext) {
ctx, cancel := context.WithCancel(ctxSpec)
path := helpers.Path(root, "RETRO-WAVE")
restorer := func(o *nav.TraverseOptions, active *nav.ActiveState) {
// synthetic assignments
//
active.Root = helpers.Path(root, "RETRO-WAVE")
active.NodePath = helpers.Path(root, ResumeAtTeenageColor)
active.Listen = nav.ListenPending
o.Store.Subscription = nav.SubscribeAny
//
// end of synthetic assignments
o.Callback = universalCallbackNoAssert(
"universal: listen pending(Resume-WithRunner)",
NotExtended,
)
}

wgan := boost.NewAnnotatedWaitGroup("🍂 traversal")
wgan.Add(1, navigatorRoutineName)
createWith := nav.RunnerWithResume | nav.RunnerWithPool
now := 3
JobsChOut := make(boost.JobStream[nav.TraverseItemInput], DefaultJobsChSize)
outputChTimeout := time.Second
jobsOutputChOut := make(boost.JobOutputStream[nav.TraverseOutput], DefaultJobsChSize)

result, err := nav.New().With(createWith, &nav.RunnerInfo{
PrimeInfo: &nav.Prime{
Path: path,
OptionsFn: func(o *nav.TraverseOptions) {
o.Notify.OnBegin = begin("🛡️")
o.Store.Subscription = nav.SubscribeAny
o.Callback = universalCallbackNoAssert(
"universal: Path contains folders(Prime-WithRunner)",
NotExtended,
)
},
},
ResumeInfo: &nav.Resumption{
RestorePath: fromJSONPath,
Restorer: restorer,
Strategy: nav.ResumeStrategySpawnEn,
},
AccelerationInfo: &nav.Acceleration{
WgAn: wgan,
RoutineName: navigatorRoutineName,
NoW: now,
JobsChOut: JobsChOut,
JobResultsCh: jobsOutputChOut,
OutputChTimeout: outputChTimeout,
},
}).Run(
nav.IfWithPoolUseContext(createWith, ctx, cancel)...,
)

if createWith&nav.RunnerWithPool > 0 {
wgan.Wait("👾 test-main")
}

Expect(err).Error().To(BeNil())
_ = result.Session.StartedAt()
_ = result.Session.Elapsed()
})
})
})
})
10 changes: 6 additions & 4 deletions xfs/nav/resume-strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,12 @@ var _ = Describe("Resume", Ordered, func() {
}
}

result, _ := nav.New().Resume(&nav.Resumption{
RestorePath: fromJSONPath,
Restorer: restorer,
Strategy: strategyEn,
result, _ := nav.New().With(nav.RunnerWithResume, &nav.RunnerInfo{
ResumeInfo: &nav.Resumption{
RestorePath: fromJSONPath,
Restorer: restorer,
Strategy: strategyEn,
},
}).Run()

if profile.mandatory != nil {
Expand Down
10 changes: 7 additions & 3 deletions xfs/nav/traverse-navigator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,13 @@ var _ = Describe("TraverseNavigator(logged)", Ordered, func() {
o.Store.DoExtend = entry.extended
o.Callback = once
}
result, _ := nav.New().Primary(&nav.Prime{
Path: path,
OptionsFn: optionFn,

createWith := nav.RunnerDefault
result, _ := nav.New().With(createWith, &nav.RunnerInfo{
PrimeInfo: &nav.Prime{
Path: path,
OptionsFn: optionFn,
},
}).Run()

if entry.visit {
Expand Down

0 comments on commit 5e7eb80

Please sign in to comment.