From 25b9fee2eae9749b266707c0ba4360929a462819 Mon Sep 17 00:00:00 2001 From: wangminxiang Date: Thu, 26 Dec 2024 16:50:32 +0800 Subject: [PATCH] add async closer --- async/result.go | 10 ++++++++++ async/stream_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/async/result.go b/async/result.go index d5ca913..e3a2491 100644 --- a/async/result.go +++ b/async/result.go @@ -60,6 +60,10 @@ type ResultHandler[E any] func(ctx context.Context, entry E, cause error) var DiscardVoidHandler ResultHandler[Void] = func(_ context.Context, _ Void, _ error) {} +type Closer interface { + Close() (future Future[Void]) +} + func tryCloseResultWhenUnexpectedlyErrorOccur(v any) { if v == nil { return @@ -68,6 +72,12 @@ func tryCloseResultWhenUnexpectedlyErrorOccur(v any) { closer, isCloser := ri.(io.Closer) if isCloser { _ = closer.Close() + return + } + asyncCloser, isAsyncCloser := ri.(Closer) + if isAsyncCloser { + asyncCloser.Close().OnComplete(DiscardVoidHandler) + return } } diff --git a/async/stream_test.go b/async/stream_test.go index c78ea2c..71d2f97 100644 --- a/async/stream_test.go +++ b/async/stream_test.go @@ -16,6 +16,17 @@ func (c *Closer) Close() error { return nil } +type AsyncCloser struct { + N int + t *testing.T + ctx context.Context +} + +func (c *AsyncCloser) Close() async.Future[async.Void] { + c.t.Log("close ", c.N) + return async.SucceedImmediately[async.Void](c.ctx, async.Void{}) +} + func TestTryStreamPromise(t *testing.T) { ctx, closer := prepare() defer func() { @@ -47,6 +58,37 @@ func TestTryStreamPromise(t *testing.T) { } } +func TestTryStreamPromiseWithAsyncCloser(t *testing.T) { + ctx, closer := prepare() + defer func() { + err := closer() + if err != nil { + t.Error(err) + } + }() + promise, promiseErr := async.Make[*AsyncCloser](ctx, async.WithStream()) + if promiseErr != nil { + t.Errorf("try promise failed") + return + } + future := promise.Future() + future.OnComplete(func(ctx context.Context, result *AsyncCloser, err error) { + t.Log("future entry:", result, err) + if err != nil { + t.Log("is eof:", async.IsEOF(err)) + return + } + return + }) + for i := 0; i < 10; i++ { + promise.Succeed(&AsyncCloser{N: i, t: t, ctx: ctx}) + } + promise.Cancel() + for i := 0; i < 10; i++ { + promise.Succeed(&AsyncCloser{N: i, t: t, ctx: ctx}) + } +} + func TestMustStreamPromise(t *testing.T) { ctx, closer := prepare() defer func() {