Skip to content

Commit

Permalink
Register error handlers before subscribers.
Browse files Browse the repository at this point in the history
  • Loading branch information
TomasMikula committed Jul 6, 2014
1 parent c0c39f7 commit e8e1ea9
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 12 deletions.
2 changes: 1 addition & 1 deletion reactfx/src/main/java/org/reactfx/BiEventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ default Subscription subscribe(Consumer<? super Tuple2<A, B>> subscriber) {
default Subscription watch(
BiConsumer<? super A, ? super B> subscriber,
Consumer<? super Throwable> monitor) {
return subscribe(subscriber).and(monitor(monitor));
return monitor(monitor).and(subscribe(subscriber));
}

default Subscription feedTo2(BiEventSink<? super A, ? super B> sink) {
Expand Down
8 changes: 4 additions & 4 deletions reactfx/src/main/java/org/reactfx/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public interface EventStream<T> {

/**
* Convenient method to subscribe to and monitor this stream. Is equivalent
* to {@code subscribe(subscriber).and(monitor(monitor))}.
* to {@code monitor(monitor).and(subscribe(subscriber))}.
* @see #subscribe(Consumer)
* @see #monitor(Consumer)
*/
default Subscription watch(
Consumer<? super T> subscriber,
Consumer<? super Throwable> monitor) {
return subscribe(subscriber).and(monitor(monitor));
return monitor(monitor).and(subscribe(subscriber));
}

/**
Expand Down Expand Up @@ -715,8 +715,8 @@ default EventStream<Try<T>> materializeErrors() {
return new LazilyBoundStream<Try<T>>() {
@Override
protected Subscription subscribeToInputs() {
Subscription s1 = EventStream.this.subscribe(t -> emit(Try.success(t)));
Subscription s2 = EventStream.this.monitor(er -> emit(Try.failure(er)));
Subscription s1 = EventStream.this.subscribe(t -> emit(Try.success(t)));
return s1.and(s2);
}
};
Expand All @@ -734,8 +734,8 @@ default EventStream<T> handleErrors(Consumer<? super Throwable> handler) {
return new LazilyBoundStream<T>() {
@Override
protected Subscription subscribeToInputs() {
Subscription s1 = EventStream.this.subscribe(this::emit);
Subscription s2 = EventStream.this.monitor(handler);
Subscription s1 = EventStream.this.subscribe(this::emit);
return s1.and(s2);
}
};
Expand Down
9 changes: 6 additions & 3 deletions reactfx/src/main/java/org/reactfx/EventStreamBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ protected final void reportError(Throwable thrown) {
* Overriding this method is a convenient way for subclasses
* to handle this event.
*
* <p>This method is called after the {@link #newSubscriber(Object)}
* method.</p>
* <p>This method is called <em>before</em> the
* {@link #newSubscriber(Object)} method.</p>
*/
protected void firstSubscriber() {
// default implementation is empty
Expand All @@ -57,6 +57,9 @@ protected void firstSubscriber() {
* Called for each new subscriber.
* Overriding this method is a convenient way for subclasses
* to handle this event, for example to publish some initial events.
*
* <p>This method is called <em>after</em> the
* {@link #firstSubscriber()} method.</p>
*/
protected void newSubscriber(S subscriber) {
// default implementation is empty
Expand Down Expand Up @@ -117,10 +120,10 @@ protected final <A, B, C> Subscription subscribeToTri(

public final Subscription subscribe(S subscriber) {
subscribers = ListHelper.add(subscribers, subscriber);
newSubscriber(subscriber);
if(ListHelper.size(subscribers) == 1) {
firstSubscriber();
}
newSubscriber(subscriber);
return () -> unsubscribe(subscriber);
}

Expand Down
4 changes: 1 addition & 3 deletions reactfx/src/main/java/org/reactfx/EventStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ protected Subscription subscribeToInputs() {
for(Observable dep: dependencies) {
dep.addListener(listener);
}
previousValue = computeValue.get();

return () -> {
for(Observable dep: dependencies) {
Expand All @@ -178,9 +179,6 @@ protected Subscription subscribeToInputs() {

@Override
protected void newSubscriber(Consumer<? super T> subscriber) {
if(!isBound()) { // this is the first subscriber
previousValue = computeValue.get();
}
subscriber.accept(previousValue);
}
};
Expand Down
2 changes: 1 addition & 1 deletion reactfx/src/main/java/org/reactfx/TriEventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ default Subscription subscribe(Consumer<? super Tuple3<A, B, C>> subscriber) {
default Subscription watch(
TriConsumer<? super A, ? super B, ? super C> subscriber,
Consumer<? super Throwable> monitor) {
return subscribe(subscriber).and(monitor(monitor));
return monitor(monitor).and(subscribe(subscriber));
}

default Subscription feedTo3(TriEventSink<? super A, ? super B, ? super C> sink) {
Expand Down
61 changes: 61 additions & 0 deletions reactfx/src/test/java/org/reactfx/ErrorReportingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.Test;
import org.reactfx.inhibeans.property.SimpleIntegerProperty;

public class ErrorReportingTest {

Expand Down Expand Up @@ -48,4 +51,62 @@ public void threadBridgeTest() throws InterruptedException, ExecutionException {
ex1.shutdown();
ex2.shutdown();
}
/**
* Stream of property values should work normally even when the first value
* in the stream (property value at the moment of subscription) produces an
* exception. The catch is the first value produced at the very moment of
* establishing subscription, so order of initialization matters.
*/
@Test
public void property_values_stream_with_faulty_first_value_test() {
SimpleIntegerProperty intProperty = new SimpleIntegerProperty(-1);
List<String> emitted = new LinkedList<>();
List<Throwable> errors = new LinkedList<>();

EventStreams.valuesOf(intProperty)
.map(i -> {
if (i.intValue() < 0) {
throw new IllegalArgumentException("Accepting only positive numbers");
}
return String.valueOf(i);
})
.handleErrors(errors::add)
.subscribe(emitted::add);

intProperty.set(10);
intProperty.set(-2);
intProperty.set(0);

assertEquals(Arrays.asList("10", "0"), emitted);
assertEquals(2, errors.size());
}

/**
* Variation on
* {@link #property_values_stream_with_faulty_first_value_test()} using the
* {@link EventStream#watch(java.util.function.Consumer, java.util.function.Consumer)}
* method.
*/
@Test
public void property_values_stream_with_faulty_first_value_test2() {
SimpleIntegerProperty intProperty = new SimpleIntegerProperty(-1);
List<String> emitted = new LinkedList<>();
List<Throwable> errors = new LinkedList<>();

EventStreams.valuesOf(intProperty)
.map(i -> {
if (i.intValue() < 0) {
throw new IllegalArgumentException("Accepting only positive numbers");
}
return String.valueOf(i);
})
.watch(emitted::add, errors::add);

intProperty.set(10);
intProperty.set(-2);
intProperty.set(0);

assertEquals(Arrays.asList("10", "0"), emitted);
assertEquals(2, errors.size());
}
}

0 comments on commit e8e1ea9

Please sign in to comment.