Skip to content

Commit

Permalink
add async closer
Browse files Browse the repository at this point in the history
  • Loading branch information
RyougiNevermore committed Dec 26, 2024
1 parent 78e490a commit 25b9fee
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
10 changes: 10 additions & 0 deletions async/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type ResultHandler[E any] func(ctx context.Context, entry E, cause error)

var DiscardVoidHandler ResultHandler[Void] = func(_ context.Context, _ Void, _ error) {}

type Closer interface {
Close() (future Future[Void])
}

func tryCloseResultWhenUnexpectedlyErrorOccur(v any) {
if v == nil {
return
Expand All @@ -68,6 +72,12 @@ func tryCloseResultWhenUnexpectedlyErrorOccur(v any) {
closer, isCloser := ri.(io.Closer)
if isCloser {
_ = closer.Close()
return
}
asyncCloser, isAsyncCloser := ri.(Closer)
if isAsyncCloser {
asyncCloser.Close().OnComplete(DiscardVoidHandler)
return
}
}

Expand Down
42 changes: 42 additions & 0 deletions async/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ func (c *Closer) Close() error {
return nil
}

type AsyncCloser struct {
N int
t *testing.T
ctx context.Context
}

func (c *AsyncCloser) Close() async.Future[async.Void] {
c.t.Log("close ", c.N)
return async.SucceedImmediately[async.Void](c.ctx, async.Void{})
}

func TestTryStreamPromise(t *testing.T) {
ctx, closer := prepare()
defer func() {
Expand Down Expand Up @@ -47,6 +58,37 @@ func TestTryStreamPromise(t *testing.T) {
}
}

func TestTryStreamPromiseWithAsyncCloser(t *testing.T) {
ctx, closer := prepare()
defer func() {
err := closer()
if err != nil {
t.Error(err)
}
}()
promise, promiseErr := async.Make[*AsyncCloser](ctx, async.WithStream())
if promiseErr != nil {
t.Errorf("try promise failed")
return
}
future := promise.Future()
future.OnComplete(func(ctx context.Context, result *AsyncCloser, err error) {
t.Log("future entry:", result, err)
if err != nil {
t.Log("is eof:", async.IsEOF(err))
return
}
return
})
for i := 0; i < 10; i++ {
promise.Succeed(&AsyncCloser{N: i, t: t, ctx: ctx})
}
promise.Cancel()
for i := 0; i < 10; i++ {
promise.Succeed(&AsyncCloser{N: i, t: t, ctx: ctx})
}
}

func TestMustStreamPromise(t *testing.T) {
ctx, closer := prepare()
defer func() {
Expand Down

0 comments on commit 25b9fee

Please sign in to comment.