Skip to content

Commit

Permalink
add unlimited
Browse files Browse the repository at this point in the history
  • Loading branch information
RyougiNevermore committed Nov 18, 2024
1 parent 853796f commit f2b1849
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 10 deletions.
22 changes: 20 additions & 2 deletions async/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,14 @@ func (f *futureImpl[R]) clean() {
}

func (f *futureImpl[R]) OnComplete(handler ResultHandler[R]) {
f.locker.Lock()
f.handler = handler
f.submitter.Submit(f.handle)
if ok := f.submitter.Submit(f.handle); !ok {
handler(f.ctx, *(new(R)), rxp.ErrClosed)
f.closed = true
close(f.rch)
}
f.locker.Unlock()
}

func (f *futureImpl[R]) Await() (v R, err error) {
Expand All @@ -146,8 +152,20 @@ func (f *futureImpl[R]) Await() (v R, err error) {
}
close(ch)
}

f.locker.Lock()
f.handler = handler
f.submitter.Submit(f.handle)
if ok := f.submitter.Submit(f.handle); !ok {
ch <- result[R]{
entry: *(new(R)),
cause: rxp.ErrClosed,
}
close(ch)
f.closed = true
close(f.rch)
}
f.locker.Unlock()

ar := <-ch
v = ar.entry
err = ar.cause
Expand Down
34 changes: 34 additions & 0 deletions async/unlimited.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package async

import (
"context"
"github.com/brickingsoft/rxp"
)

type unlimitedSubmitter struct {
exec rxp.Executors
}

func (submitter *unlimitedSubmitter) Submit(task rxp.Task) (ok bool) {
err := submitter.exec.UnlimitedExecute(task)
ok = err == nil
return
}

// UnlimitedPromise
// 不受 rxp.Executors 最大协程限制的许诺
func UnlimitedPromise[R any](ctx context.Context) (promise Promise[R]) {
exec := rxp.From(ctx)
submitter := &unlimitedSubmitter{exec: exec}
promise = newPromise[R](ctx, submitter)
return
}

// UnlimitedStreamPromise
// 不受 rxp.Executors 最大协程限制的流式许诺
func UnlimitedStreamPromise[R any](ctx context.Context, buf int) (promise Promise[R]) {
exec := rxp.From(ctx)
submitter := &unlimitedSubmitter{exec: exec}
promise = newStreamPromise[R](ctx, submitter, buf)
return
}
51 changes: 51 additions & 0 deletions async/unlimited_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package async_test

import (
"context"
"github.com/brickingsoft/rxp/async"
"testing"
)

func TestUnlimitedPromise(t *testing.T) {
ctx, closer := prepare()
defer func() {
err := closer()
if err != nil {
t.Error(err)
}
}()
promise := async.UnlimitedPromise[int](ctx)
promise.Succeed(1)
future := promise.Future()
future.OnComplete(func(ctx context.Context, result int, err error) {
t.Log("future entry:", result, err)
})
}

func TestUnlimitedStreamPromise(t *testing.T) {
ctx, closer := prepare()
defer func() {
err := closer()
if err != nil {
t.Error(err)
}
}()
promise := async.UnlimitedStreamPromise[*Closer](ctx, 8)

future := promise.Future()
future.OnComplete(func(ctx context.Context, result *Closer, err error) {
t.Log("future entry:", result, err)
if err != nil {
t.Log("is closed:", async.IsCanceled(err))
return
}
return
})
for i := 0; i < 10; i++ {
promise.Succeed(&Closer{N: i, t: t})
}
promise.Cancel()
for i := 0; i < 10; i++ {
promise.Succeed(&Closer{N: i, t: t})
}
}
42 changes: 34 additions & 8 deletions executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Executors interface {
//
// 当 context.Context 有错误或者 Executors.Close、Executors.CloseGracefully,则返回错误。
Execute(ctx context.Context, task Task) (err error)
// UnlimitedExecute
// 执行一个不受限型任务。它不受最大协程数限制,但会消耗协程数,即 TryExecute 可能会得不到协程而失败。
UnlimitedExecute(task Task) (err error)
// Goroutines
// 当前 goroutine 数量
Goroutines() (n int64)
Expand Down Expand Up @@ -105,7 +108,7 @@ func New(options ...Option) Executors {
maxGoroutines: int64(opts.MaxGoroutines),
maxReadyGoroutinesIdleDuration: opts.MaxReadyGoroutinesIdleDuration,
locker: spin.New(),
running: atomic.Bool{},
running: new(atomic.Bool),
ready: nil,
submitters: sync.Pool{},
tasks: new(atomic.Int64),
Expand All @@ -122,7 +125,7 @@ type executors struct {
maxGoroutines int64
maxReadyGoroutinesIdleDuration time.Duration
locker sync.Locker
running atomic.Bool
running *atomic.Bool
ready []*submitterImpl
submitters sync.Pool
tasks *atomic.Int64
Expand Down Expand Up @@ -154,7 +157,12 @@ func (exec *executors) TryExecute(ctx context.Context, task Task) (ok bool) {
}

func (exec *executors) Execute(ctx context.Context, task Task) (err error) {
if task == nil || !exec.running.Load() {
if task == nil {
err = errors.New("rxp: task is nil")
return
}
if !exec.running.Load() {
err = ErrClosed
return
}
times := 10
Expand All @@ -180,6 +188,25 @@ func (exec *executors) Execute(ctx context.Context, task Task) (err error) {
return
}

func (exec *executors) UnlimitedExecute(task Task) (err error) {
if task == nil {
err = errors.New("rxp: task is nil")
return
}
if !exec.running.Load() {
err = ErrClosed
return
}

exec.goroutines.Incr()
go func(task Task, exec *executors) {
task()
exec.goroutines.Decr()
}(task, exec)

return err
}

func (exec *executors) Goroutines() (n int64) {
n = exec.goroutines.Value()
return
Expand Down Expand Up @@ -240,8 +267,10 @@ func (exec *executors) start() {
exec.stopCh = make(chan struct{})
exec.submitters.New = func() interface{} {
return &submitterImpl{
ch: make(chan Task, 1),
tasks: exec.tasks,
lastUseTime: time.Time{},
ch: make(chan Task, 1),
running: exec.running,
tasks: exec.tasks,
}
}
go func(exec *executors) {
Expand All @@ -268,9 +297,6 @@ func (exec *executors) start() {
}

func (exec *executors) clean(scratch *[]*submitterImpl) {
if !exec.running.Load() {
return
}
maxExecutorIdleDuration := exec.maxReadyGoroutinesIdleDuration
criticalTime := time.Now().Add(-maxExecutorIdleDuration)
exec.locker.Lock()
Expand Down

0 comments on commit f2b1849

Please sign in to comment.