diff --git a/README.md b/README.md index d3613f0..a7c0207 100644 --- a/README.md +++ b/README.md @@ -96,18 +96,35 @@ await client.SubAsync("tickItem", stream => stream.Subscribe(msg => { ``` ### Stream.Subscribe vs Stream.SubscribeSafe -If you subscribe to e.g. the `MessageOpStream` using `Stream.Subscribe` and your handler is throwing an exception. That handler will get `OnError` invoked and then removed. +The initial behavior was to call `OnError` when a handler was throwing an exception and had specified a `OnError` handler. **This has changed**. The motivation around this is, that it's not the producer side that is causing the exception [(read more)](https://docs.microsoft.com/en-us/dotnet/api/system.iobserver-1.onerror?view=netcore-3.1). + +If you subscribe to e.g. the `MessageOpStream` using `Stream.Subscribe` and your handler is throwing an exception. That in-process handler **will be removed**. ```csharp await client.SubAsync("mySubject", stream => stream.Subscribe(msg => DoSomething(msg))); ``` -If you instead subscribe using `Stream.SubscribeSafe` any unhandled exception will get swallowed. +If you instead subscribe using `Stream.SubscribeSafe` any unhandled exception will get swallowed and the in-process handler will still be around. ```csharp await client.SubAsync("mySubject", stream => stream.SubscribeSafe(msg => DoSomething(msg))); ``` +### Catch & CatchAny +If you want a generic way to handle exceptions in your handlers, you can use a `CatchObserver` e.g via the aliases `stream.Catch` or `stream.CatchAny`. + +```csharp +await client.SubAsync("mySubject", stream => stream + .Catch((FooException ex) => {}) + .Subscribe(msg => DoSomething(msg))); +``` + +```csharp +await client.SubAsync("mySubject", stream => stream + .CatchAny(ex => {}) + .Subscribe(msg => DoSomething(msg))); +``` + ## Request-Response sample Simple request-response sample. This sample also makes use of two clients. It can of course be the same client requesting and responding, you can also have more responders forming a queue group. Where one will be giving the answer. diff --git a/src/MyNatsClient/Internals/Observables.cs b/src/MyNatsClient/Internals/Observables.cs index ae894db..0d7573a 100644 --- a/src/MyNatsClient/Internals/Observables.cs +++ b/src/MyNatsClient/Internals/Observables.cs @@ -8,7 +8,7 @@ internal sealed class OfTypeObservable : INatsObservable where { private readonly INatsObservable _src; - public OfTypeObservable(INatsObservable src) + public OfTypeObservable(INatsObservable src) => _src = src ?? throw new ArgumentNullException(nameof(src)); public void Dispose() => _src.Dispose(); @@ -41,7 +41,7 @@ internal sealed class CastObservable : INatsObservable where TF { private readonly INatsObservable _src; - public CastObservable(INatsObservable src) + public CastObservable(INatsObservable src) => _src = src ?? throw new ArgumentNullException(nameof(src)); public void Dispose() => _src.Dispose(); @@ -53,7 +53,7 @@ private sealed class CastObserver : IObserver { private readonly IObserver _observer; - public CastObserver(IObserver observer) + public CastObserver(IObserver observer) => _observer = observer; public void OnNext(TFrom value) @@ -108,6 +108,55 @@ public void OnCompleted() } } + internal sealed class CatchObservable : INatsObservable + where T : class + where TException : Exception + { + private readonly INatsObservable _src; + private readonly Action _handler; + + public CatchObservable(INatsObservable src, Action handler) + { + _src = src ?? throw new ArgumentNullException(nameof(src)); + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + } + + public void Dispose() => _src.Dispose(); + + public IDisposable Subscribe(IObserver observer) + => _src.SubscribeSafe(new CatchObserver(observer, _handler)); + + private sealed class CatchObserver : IObserver + { + private readonly IObserver _observer; + private readonly Action _handler; + + public CatchObserver(IObserver observer, Action handler) + { + _observer = observer ?? throw new ArgumentNullException(nameof(observer)); + _handler = handler; + } + + public void OnNext(T value) + { + try + { + _observer.OnNext(value); + } + catch (TException ex) + { + _handler(ex); + } + } + + public void OnError(Exception error) + => _observer.OnError(error); + + public void OnCompleted() + => _observer.OnCompleted(); + } + } + internal sealed class WhereSubjectMatchObservable : INatsObservable { private readonly INatsObservable _src; @@ -187,4 +236,4 @@ public void OnCompleted() => _observer.OnCompleted(); } } -} \ No newline at end of file +} diff --git a/src/MyNatsClient/Rx/NatsObservableExtensions.cs b/src/MyNatsClient/Rx/NatsObservableExtensions.cs index 08c22e2..c56a414 100644 --- a/src/MyNatsClient/Rx/NatsObservableExtensions.cs +++ b/src/MyNatsClient/Rx/NatsObservableExtensions.cs @@ -6,6 +6,15 @@ namespace MyNatsClient.Rx { public static class NatsObservableExtensions { + public static INatsObservable Catch(this INatsObservable ob, Action handler) + where TException : Exception + where T : class + => new CatchObservable(ob, handler); + + public static INatsObservable CatchAny(this INatsObservable ob, Action handler) + where T : class + => ob.Catch(handler); + public static INatsObservable OfType(this INatsObservable ob) where TResult : class => new OfTypeObservable(ob); @@ -30,4 +39,4 @@ public static IDisposable SubscribeSafe(this INatsObservable ob, Action public static IDisposable SubscribeSafe(this INatsObservable ob, IObserver observer) where T : class => ob.Subscribe(NatsObserver.Safe(observer.OnNext, observer.OnError, observer.OnCompleted)); } -} \ No newline at end of file +} diff --git a/src/UnitTests/NatsObservableTests.cs b/src/UnitTests/NatsObservableTests.cs index 01c6576..9c81b16 100644 --- a/src/UnitTests/NatsObservableTests.cs +++ b/src/UnitTests/NatsObservableTests.cs @@ -165,7 +165,7 @@ public void Emitting_Should_not_dispatch_to_a_disposed_observer() { Interlocked.Increment(ref s1CallCount); }); - var s2 = UnitUnderTest.Subscribe(ev => + var _ = UnitUnderTest.Subscribe(ev => { Interlocked.Increment(ref s2CallCount); }); @@ -235,5 +235,125 @@ public void Should_be_able_filter_by_type() observer2.Verify(f => f.OnNext(It.IsAny()), Times.Exactly(1)); observer2.Verify(f => f.OnNext(It.Is(d => d.Value == 1 && d.OtherValue == 2)), Times.Once); } + + [Fact] + public void Emitting_Should_invoke_handler_of_failing_observer_When_using_catch_and_a_more_specific_exception_is_thrown() + { + var failingCatchWasInvoked = false; + var failingExHandlerWasInvoked = false; + var nonFailingCatchWasInvoked = false; + var nonFailingExHandlerWasInvoked = false; + + UnitUnderTest.Catch((NotSupportedException ex) => failingCatchWasInvoked = true).Subscribe( + ev => throw new NotSupportedException("I FAILED!"), + ex => failingExHandlerWasInvoked = true); + + UnitUnderTest.Catch((NotSupportedException ex) => nonFailingCatchWasInvoked = true).Subscribe( + ev => { }, + ex => nonFailingExHandlerWasInvoked = true); + + UnitUnderTest.Emit(Mock.Of()); + + failingCatchWasInvoked.Should().BeTrue(); + failingExHandlerWasInvoked.Should().BeFalse(); + nonFailingCatchWasInvoked.Should().BeFalse(); + nonFailingExHandlerWasInvoked.Should().BeFalse(); + } + + [Fact] + public void Emitting_Should_invoke_handler_of_failing_observer_When_using_catch_and_an_exception_is_thrown() + { + var failingCatchWasInvoked = false; + var failingExHandlerWasInvoked = false; + var nonFailingCatchWasInvoked = false; + var nonFailingExHandlerWasInvoked = false; + + UnitUnderTest.Catch((Exception ex) => failingCatchWasInvoked = true).Subscribe( + ev => throw new Exception("I FAILED!"), + ex => failingExHandlerWasInvoked = true); + + UnitUnderTest.Catch((Exception ex) => nonFailingCatchWasInvoked = true).Subscribe( + ev => { }, + ex => nonFailingExHandlerWasInvoked = true); + + UnitUnderTest.Emit(Mock.Of()); + + failingCatchWasInvoked.Should().BeTrue(); + failingExHandlerWasInvoked.Should().BeFalse(); + nonFailingCatchWasInvoked.Should().BeFalse(); + nonFailingExHandlerWasInvoked.Should().BeFalse(); + } + + [Fact] + public void Emitting_Should_not_invoke_handler_of_failing_observer_When_using_catch_and_a_more_generic_exception_is_thrown() + { + var failingCatchWasInvoked = false; + var failingExHandlerWasInvoked = false; + var nonFailingCatchWasInvoked = false; + var nonFailingExHandlerWasInvoked = false; + + UnitUnderTest.Catch((NotSupportedException ex) => failingCatchWasInvoked = true).Subscribe( + ev => throw new Exception("I FAILED!"), + ex => failingExHandlerWasInvoked = true); + + UnitUnderTest.Catch((NotSupportedException ex) => nonFailingCatchWasInvoked = true).Subscribe( + ev => { }, + ex => nonFailingExHandlerWasInvoked = true); + + UnitUnderTest.Emit(Mock.Of()); + + failingCatchWasInvoked.Should().BeFalse(); + failingExHandlerWasInvoked.Should().BeFalse(); + nonFailingCatchWasInvoked.Should().BeFalse(); + nonFailingExHandlerWasInvoked.Should().BeFalse(); + } + + [Fact] + public void Emitting_Should_invoke_handler_of_failing_observer_When_using_catchAny_and_an_exception_is_thrown() + { + var failingCatchWasInvoked = false; + var failingExHandlerWasInvoked = false; + var nonFailingCatchWasInvoked = false; + var nonFailingExHandlerWasInvoked = false; + + UnitUnderTest.CatchAny(ex => failingCatchWasInvoked = true).Subscribe( + ev => throw new Exception("I FAILED!"), + ex => failingExHandlerWasInvoked = true); + + UnitUnderTest.CatchAny(ex => nonFailingCatchWasInvoked = true).Subscribe( + ev => { }, + ex => nonFailingExHandlerWasInvoked = true); + + UnitUnderTest.Emit(Mock.Of()); + + failingCatchWasInvoked.Should().BeTrue(); + failingExHandlerWasInvoked.Should().BeFalse(); + nonFailingCatchWasInvoked.Should().BeFalse(); + nonFailingExHandlerWasInvoked.Should().BeFalse(); + } + + [Fact] + public void Emitting_Should_invoke_handler_of_failing_observer_When_using_catchAny_and_a_more_specific_exception_is_thrown() + { + var failingCatchWasInvoked = false; + var failingExHandlerWasInvoked = false; + var nonFailingCatchWasInvoked = false; + var nonFailingExHandlerWasInvoked = false; + + UnitUnderTest.CatchAny(ex => failingCatchWasInvoked = true).Subscribe( + ev => throw new NotSupportedException("I FAILED!"), + ex => failingExHandlerWasInvoked = true); + + UnitUnderTest.CatchAny(ex => nonFailingCatchWasInvoked = true).Subscribe( + ev => { }, + ex => nonFailingExHandlerWasInvoked = true); + + UnitUnderTest.Emit(Mock.Of()); + + failingCatchWasInvoked.Should().BeTrue(); + failingExHandlerWasInvoked.Should().BeFalse(); + nonFailingCatchWasInvoked.Should().BeFalse(); + nonFailingExHandlerWasInvoked.Should().BeFalse(); + } } -} \ No newline at end of file +}