From f2b1849aaf501b172a83475734eed31869b196ba Mon Sep 17 00:00:00 2001 From: wangminxiang Date: Mon, 18 Nov 2024 18:42:25 +0800 Subject: [PATCH] add unlimited --- async/future.go | 22 ++++++++++++++++-- async/unlimited.go | 34 +++++++++++++++++++++++++++ async/unlimited_test.go | 51 +++++++++++++++++++++++++++++++++++++++++ executors.go | 42 ++++++++++++++++++++++++++------- 4 files changed, 139 insertions(+), 10 deletions(-) create mode 100644 async/unlimited.go create mode 100644 async/unlimited_test.go diff --git a/async/future.go b/async/future.go index 2bb58a4..cd3df01 100644 --- a/async/future.go +++ b/async/future.go @@ -133,8 +133,14 @@ func (f *futureImpl[R]) clean() { } func (f *futureImpl[R]) OnComplete(handler ResultHandler[R]) { + f.locker.Lock() f.handler = handler - f.submitter.Submit(f.handle) + if ok := f.submitter.Submit(f.handle); !ok { + handler(f.ctx, *(new(R)), rxp.ErrClosed) + f.closed = true + close(f.rch) + } + f.locker.Unlock() } func (f *futureImpl[R]) Await() (v R, err error) { @@ -146,8 +152,20 @@ func (f *futureImpl[R]) Await() (v R, err error) { } close(ch) } + + f.locker.Lock() f.handler = handler - f.submitter.Submit(f.handle) + if ok := f.submitter.Submit(f.handle); !ok { + ch <- result[R]{ + entry: *(new(R)), + cause: rxp.ErrClosed, + } + close(ch) + f.closed = true + close(f.rch) + } + f.locker.Unlock() + ar := <-ch v = ar.entry err = ar.cause diff --git a/async/unlimited.go b/async/unlimited.go new file mode 100644 index 0000000..97819e5 --- /dev/null +++ b/async/unlimited.go @@ -0,0 +1,34 @@ +package async + +import ( + "context" + "github.com/brickingsoft/rxp" +) + +type unlimitedSubmitter struct { + exec rxp.Executors +} + +func (submitter *unlimitedSubmitter) Submit(task rxp.Task) (ok bool) { + err := submitter.exec.UnlimitedExecute(task) + ok = err == nil + return +} + +// UnlimitedPromise +// 不受 rxp.Executors 最大协程限制的许诺 +func UnlimitedPromise[R any](ctx context.Context) (promise Promise[R]) { + exec := rxp.From(ctx) + submitter := &unlimitedSubmitter{exec: exec} + promise = newPromise[R](ctx, submitter) + return +} + +// UnlimitedStreamPromise +// 不受 rxp.Executors 最大协程限制的流式许诺 +func UnlimitedStreamPromise[R any](ctx context.Context, buf int) (promise Promise[R]) { + exec := rxp.From(ctx) + submitter := &unlimitedSubmitter{exec: exec} + promise = newStreamPromise[R](ctx, submitter, buf) + return +} diff --git a/async/unlimited_test.go b/async/unlimited_test.go new file mode 100644 index 0000000..9da55fe --- /dev/null +++ b/async/unlimited_test.go @@ -0,0 +1,51 @@ +package async_test + +import ( + "context" + "github.com/brickingsoft/rxp/async" + "testing" +) + +func TestUnlimitedPromise(t *testing.T) { + ctx, closer := prepare() + defer func() { + err := closer() + if err != nil { + t.Error(err) + } + }() + promise := async.UnlimitedPromise[int](ctx) + promise.Succeed(1) + future := promise.Future() + future.OnComplete(func(ctx context.Context, result int, err error) { + t.Log("future entry:", result, err) + }) +} + +func TestUnlimitedStreamPromise(t *testing.T) { + ctx, closer := prepare() + defer func() { + err := closer() + if err != nil { + t.Error(err) + } + }() + promise := async.UnlimitedStreamPromise[*Closer](ctx, 8) + + future := promise.Future() + future.OnComplete(func(ctx context.Context, result *Closer, err error) { + t.Log("future entry:", result, err) + if err != nil { + t.Log("is closed:", async.IsCanceled(err)) + return + } + return + }) + for i := 0; i < 10; i++ { + promise.Succeed(&Closer{N: i, t: t}) + } + promise.Cancel() + for i := 0; i < 10; i++ { + promise.Succeed(&Closer{N: i, t: t}) + } +} diff --git a/executors.go b/executors.go index 5a18527..443fd13 100644 --- a/executors.go +++ b/executors.go @@ -45,6 +45,9 @@ type Executors interface { // // 当 context.Context 有错误或者 Executors.Close、Executors.CloseGracefully,则返回错误。 Execute(ctx context.Context, task Task) (err error) + // UnlimitedExecute + // 执行一个不受限型任务。它不受最大协程数限制,但会消耗协程数,即 TryExecute 可能会得不到协程而失败。 + UnlimitedExecute(task Task) (err error) // Goroutines // 当前 goroutine 数量 Goroutines() (n int64) @@ -105,7 +108,7 @@ func New(options ...Option) Executors { maxGoroutines: int64(opts.MaxGoroutines), maxReadyGoroutinesIdleDuration: opts.MaxReadyGoroutinesIdleDuration, locker: spin.New(), - running: atomic.Bool{}, + running: new(atomic.Bool), ready: nil, submitters: sync.Pool{}, tasks: new(atomic.Int64), @@ -122,7 +125,7 @@ type executors struct { maxGoroutines int64 maxReadyGoroutinesIdleDuration time.Duration locker sync.Locker - running atomic.Bool + running *atomic.Bool ready []*submitterImpl submitters sync.Pool tasks *atomic.Int64 @@ -154,7 +157,12 @@ func (exec *executors) TryExecute(ctx context.Context, task Task) (ok bool) { } func (exec *executors) Execute(ctx context.Context, task Task) (err error) { - if task == nil || !exec.running.Load() { + if task == nil { + err = errors.New("rxp: task is nil") + return + } + if !exec.running.Load() { + err = ErrClosed return } times := 10 @@ -180,6 +188,25 @@ func (exec *executors) Execute(ctx context.Context, task Task) (err error) { return } +func (exec *executors) UnlimitedExecute(task Task) (err error) { + if task == nil { + err = errors.New("rxp: task is nil") + return + } + if !exec.running.Load() { + err = ErrClosed + return + } + + exec.goroutines.Incr() + go func(task Task, exec *executors) { + task() + exec.goroutines.Decr() + }(task, exec) + + return err +} + func (exec *executors) Goroutines() (n int64) { n = exec.goroutines.Value() return @@ -240,8 +267,10 @@ func (exec *executors) start() { exec.stopCh = make(chan struct{}) exec.submitters.New = func() interface{} { return &submitterImpl{ - ch: make(chan Task, 1), - tasks: exec.tasks, + lastUseTime: time.Time{}, + ch: make(chan Task, 1), + running: exec.running, + tasks: exec.tasks, } } go func(exec *executors) { @@ -268,9 +297,6 @@ func (exec *executors) start() { } func (exec *executors) clean(scratch *[]*submitterImpl) { - if !exec.running.Load() { - return - } maxExecutorIdleDuration := exec.maxReadyGoroutinesIdleDuration criticalTime := time.Now().Add(-maxExecutorIdleDuration) exec.locker.Lock()