Skip to content
This repository has been archived by the owner on Sep 17, 2023. It is now read-only.

Commit

Permalink
Adds Catch Rx Observable and CatchAny and Catch aliases (#60)
Browse files Browse the repository at this point in the history
* Adds Catch Rx Observable and CatchAny and Catch aliases

* Updates docs with info about Subscribe, SubscribeSafe, Catch and CatchAny
  • Loading branch information
danielwertheim authored Sep 19, 2020
1 parent fa9616b commit cf27133
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 9 deletions.
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,35 @@ await client.SubAsync("tickItem", stream => stream.Subscribe(msg => {
```

### Stream.Subscribe vs Stream.SubscribeSafe
If you subscribe to e.g. the `MessageOpStream` using `Stream.Subscribe` and your handler is throwing an exception. That handler will get `OnError` invoked and then removed.
The initial behavior was to call `OnError` when a handler was throwing an exception and had specified a `OnError` handler. **This has changed**. The motivation around this is, that it's not the producer side that is causing the exception [(read more)](https://docs.microsoft.com/en-us/dotnet/api/system.iobserver-1.onerror?view=netcore-3.1).

If you subscribe to e.g. the `MessageOpStream` using `Stream.Subscribe` and your handler is throwing an exception. That in-process handler **will be removed**.

```csharp
await client.SubAsync("mySubject", stream => stream.Subscribe(msg => DoSomething(msg)));
```

If you instead subscribe using `Stream.SubscribeSafe` any unhandled exception will get swallowed.
If you instead subscribe using `Stream.SubscribeSafe` any unhandled exception will get swallowed and the in-process handler will still be around.

```csharp
await client.SubAsync("mySubject", stream => stream.SubscribeSafe(msg => DoSomething(msg)));
```

### Catch & CatchAny
If you want a generic way to handle exceptions in your handlers, you can use a `CatchObserver` e.g via the aliases `stream.Catch` or `stream.CatchAny`.

```csharp
await client.SubAsync("mySubject", stream => stream
.Catch((FooException ex) => {})
.Subscribe(msg => DoSomething(msg)));
```

```csharp
await client.SubAsync("mySubject", stream => stream
.CatchAny(ex => {})
.Subscribe(msg => DoSomething(msg)));
```

## Request-Response sample
Simple request-response sample. This sample also makes use of two clients. It can of course be the same client requesting and responding, you can also have more responders forming a queue group. Where one will be giving the answer.

Expand Down
57 changes: 53 additions & 4 deletions src/MyNatsClient/Internals/Observables.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ internal sealed class OfTypeObservable<TResult> : INatsObservable<TResult> where
{
private readonly INatsObservable<object> _src;

public OfTypeObservable(INatsObservable<object> src)
public OfTypeObservable(INatsObservable<object> src)
=> _src = src ?? throw new ArgumentNullException(nameof(src));

public void Dispose() => _src.Dispose();
Expand Down Expand Up @@ -41,7 +41,7 @@ internal sealed class CastObservable<TFrom, TTo> : INatsObservable<TTo> where TF
{
private readonly INatsObservable<TFrom> _src;

public CastObservable(INatsObservable<TFrom> src)
public CastObservable(INatsObservable<TFrom> src)
=> _src = src ?? throw new ArgumentNullException(nameof(src));

public void Dispose() => _src.Dispose();
Expand All @@ -53,7 +53,7 @@ private sealed class CastObserver : IObserver<TFrom>
{
private readonly IObserver<TTo> _observer;

public CastObserver(IObserver<TTo> observer)
public CastObserver(IObserver<TTo> observer)
=> _observer = observer;

public void OnNext(TFrom value)
Expand Down Expand Up @@ -108,6 +108,55 @@ public void OnCompleted()
}
}

internal sealed class CatchObservable<T, TException> : INatsObservable<T>
where T : class
where TException : Exception
{
private readonly INatsObservable<T> _src;
private readonly Action<TException> _handler;

public CatchObservable(INatsObservable<T> src, Action<TException> handler)
{
_src = src ?? throw new ArgumentNullException(nameof(src));
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public void Dispose() => _src.Dispose();

public IDisposable Subscribe(IObserver<T> observer)
=> _src.SubscribeSafe(new CatchObserver(observer, _handler));

private sealed class CatchObserver : IObserver<T>
{
private readonly IObserver<T> _observer;
private readonly Action<TException> _handler;

public CatchObserver(IObserver<T> observer, Action<TException> handler)
{
_observer = observer ?? throw new ArgumentNullException(nameof(observer));
_handler = handler;
}

public void OnNext(T value)
{
try
{
_observer.OnNext(value);
}
catch (TException ex)
{
_handler(ex);
}
}

public void OnError(Exception error)
=> _observer.OnError(error);

public void OnCompleted()
=> _observer.OnCompleted();
}
}

internal sealed class WhereSubjectMatchObservable : INatsObservable<MsgOp>
{
private readonly INatsObservable<MsgOp> _src;
Expand Down Expand Up @@ -187,4 +236,4 @@ public void OnCompleted()
=> _observer.OnCompleted();
}
}
}
}
11 changes: 10 additions & 1 deletion src/MyNatsClient/Rx/NatsObservableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ namespace MyNatsClient.Rx
{
public static class NatsObservableExtensions
{
public static INatsObservable<T> Catch<TException, T>(this INatsObservable<T> ob, Action<TException> handler)
where TException : Exception
where T : class
=> new CatchObservable<T, TException>(ob, handler);

public static INatsObservable<T> CatchAny<T>(this INatsObservable<T> ob, Action<Exception> handler)
where T : class
=> ob.Catch(handler);

public static INatsObservable<TResult> OfType<TResult>(this INatsObservable<object> ob) where TResult : class
=> new OfTypeObservable<TResult>(ob);

Expand All @@ -30,4 +39,4 @@ public static IDisposable SubscribeSafe<T>(this INatsObservable<T> ob, Action<T>
public static IDisposable SubscribeSafe<T>(this INatsObservable<T> ob, IObserver<T> observer) where T : class
=> ob.Subscribe(NatsObserver.Safe<T>(observer.OnNext, observer.OnError, observer.OnCompleted));
}
}
}
124 changes: 122 additions & 2 deletions src/UnitTests/NatsObservableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void Emitting_Should_not_dispatch_to_a_disposed_observer()
{
Interlocked.Increment(ref s1CallCount);
});
var s2 = UnitUnderTest.Subscribe(ev =>
var _ = UnitUnderTest.Subscribe(ev =>
{
Interlocked.Increment(ref s2CallCount);
});
Expand Down Expand Up @@ -235,5 +235,125 @@ public void Should_be_able_filter_by_type()
observer2.Verify(f => f.OnNext(It.IsAny<ExtendedData>()), Times.Exactly(1));
observer2.Verify(f => f.OnNext(It.Is<ExtendedData>(d => d.Value == 1 && d.OtherValue == 2)), Times.Once);
}

[Fact]
public void Emitting_Should_invoke_handler_of_failing_observer_When_using_catch_and_a_more_specific_exception_is_thrown()
{
var failingCatchWasInvoked = false;
var failingExHandlerWasInvoked = false;
var nonFailingCatchWasInvoked = false;
var nonFailingExHandlerWasInvoked = false;

UnitUnderTest.Catch((NotSupportedException ex) => failingCatchWasInvoked = true).Subscribe(
ev => throw new NotSupportedException("I FAILED!"),
ex => failingExHandlerWasInvoked = true);

UnitUnderTest.Catch((NotSupportedException ex) => nonFailingCatchWasInvoked = true).Subscribe(
ev => { },
ex => nonFailingExHandlerWasInvoked = true);

UnitUnderTest.Emit(Mock.Of<Data>());

failingCatchWasInvoked.Should().BeTrue();
failingExHandlerWasInvoked.Should().BeFalse();
nonFailingCatchWasInvoked.Should().BeFalse();
nonFailingExHandlerWasInvoked.Should().BeFalse();
}

[Fact]
public void Emitting_Should_invoke_handler_of_failing_observer_When_using_catch_and_an_exception_is_thrown()
{
var failingCatchWasInvoked = false;
var failingExHandlerWasInvoked = false;
var nonFailingCatchWasInvoked = false;
var nonFailingExHandlerWasInvoked = false;

UnitUnderTest.Catch((Exception ex) => failingCatchWasInvoked = true).Subscribe(
ev => throw new Exception("I FAILED!"),
ex => failingExHandlerWasInvoked = true);

UnitUnderTest.Catch((Exception ex) => nonFailingCatchWasInvoked = true).Subscribe(
ev => { },
ex => nonFailingExHandlerWasInvoked = true);

UnitUnderTest.Emit(Mock.Of<Data>());

failingCatchWasInvoked.Should().BeTrue();
failingExHandlerWasInvoked.Should().BeFalse();
nonFailingCatchWasInvoked.Should().BeFalse();
nonFailingExHandlerWasInvoked.Should().BeFalse();
}

[Fact]
public void Emitting_Should_not_invoke_handler_of_failing_observer_When_using_catch_and_a_more_generic_exception_is_thrown()
{
var failingCatchWasInvoked = false;
var failingExHandlerWasInvoked = false;
var nonFailingCatchWasInvoked = false;
var nonFailingExHandlerWasInvoked = false;

UnitUnderTest.Catch((NotSupportedException ex) => failingCatchWasInvoked = true).Subscribe(
ev => throw new Exception("I FAILED!"),
ex => failingExHandlerWasInvoked = true);

UnitUnderTest.Catch((NotSupportedException ex) => nonFailingCatchWasInvoked = true).Subscribe(
ev => { },
ex => nonFailingExHandlerWasInvoked = true);

UnitUnderTest.Emit(Mock.Of<Data>());

failingCatchWasInvoked.Should().BeFalse();
failingExHandlerWasInvoked.Should().BeFalse();
nonFailingCatchWasInvoked.Should().BeFalse();
nonFailingExHandlerWasInvoked.Should().BeFalse();
}

[Fact]
public void Emitting_Should_invoke_handler_of_failing_observer_When_using_catchAny_and_an_exception_is_thrown()
{
var failingCatchWasInvoked = false;
var failingExHandlerWasInvoked = false;
var nonFailingCatchWasInvoked = false;
var nonFailingExHandlerWasInvoked = false;

UnitUnderTest.CatchAny(ex => failingCatchWasInvoked = true).Subscribe(
ev => throw new Exception("I FAILED!"),
ex => failingExHandlerWasInvoked = true);

UnitUnderTest.CatchAny(ex => nonFailingCatchWasInvoked = true).Subscribe(
ev => { },
ex => nonFailingExHandlerWasInvoked = true);

UnitUnderTest.Emit(Mock.Of<Data>());

failingCatchWasInvoked.Should().BeTrue();
failingExHandlerWasInvoked.Should().BeFalse();
nonFailingCatchWasInvoked.Should().BeFalse();
nonFailingExHandlerWasInvoked.Should().BeFalse();
}

[Fact]
public void Emitting_Should_invoke_handler_of_failing_observer_When_using_catchAny_and_a_more_specific_exception_is_thrown()
{
var failingCatchWasInvoked = false;
var failingExHandlerWasInvoked = false;
var nonFailingCatchWasInvoked = false;
var nonFailingExHandlerWasInvoked = false;

UnitUnderTest.CatchAny(ex => failingCatchWasInvoked = true).Subscribe(
ev => throw new NotSupportedException("I FAILED!"),
ex => failingExHandlerWasInvoked = true);

UnitUnderTest.CatchAny(ex => nonFailingCatchWasInvoked = true).Subscribe(
ev => { },
ex => nonFailingExHandlerWasInvoked = true);

UnitUnderTest.Emit(Mock.Of<Data>());

failingCatchWasInvoked.Should().BeTrue();
failingExHandlerWasInvoked.Should().BeFalse();
nonFailingCatchWasInvoked.Should().BeFalse();
nonFailingExHandlerWasInvoked.Should().BeFalse();
}
}
}
}

0 comments on commit cf27133

Please sign in to comment.