Skip to content

Commit

Permalink
Add support for CDI events (#936)
Browse files Browse the repository at this point in the history
Resolves #912
  • Loading branch information
acoburn authored Jun 30, 2020
1 parent 67c9c58 commit d7c0009
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 26 deletions.
1 change: 1 addition & 0 deletions notifications/reactive/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ext {

dependencies {
api "io.reactivex.rxjava2:rxjava:$rxjavaVersion"
api "jakarta.enterprise:jakarta.enterprise.cdi-api:$cdiApiVersion"
api "jakarta.inject:jakarta.inject-api:$injectApiVersion"
api "org.eclipse.microprofile.reactive.messaging:microprofile-reactive-messaging-api:$microprofileReactiveMessagingVersion"
api project(':trellis-api')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.trellisldp.reactive;

import static java.util.Objects.requireNonNull;
import static org.eclipse.microprofile.reactive.messaging.Message.of;
import static org.slf4j.LoggerFactory.getLogger;

Expand All @@ -41,31 +40,18 @@ public class ReactiveEventService implements EventService {

public static final String REACTIVE_DESTINATION = "trellis";

private final EventSerializationService serializer;
private final PublishSubject<Message<String>> subject = PublishSubject.create();

/**
* Create a new Reactive Stream Event Service with a no-op serializer.
*
* @apiNote This construtor is used by CDI runtimes that require a public, no-argument constructor.
* It should not be invoked directly in user code.
*/
public ReactiveEventService() {
this(new NoopEventSerializationService());
}
@Inject
EventSerializationService serializer;

/**
* Create a new Reactive Stream Event Service.
* @param serializer the event serializer
*/
@Inject
public ReactiveEventService(final EventSerializationService serializer) {
this.serializer = requireNonNull(serializer, "serializer may not be null!");
}
javax.enterprise.event.Event<Event> trellisEvent;

@Override
public void emit(final Event event) {
LOGGER.debug("Sending message to reactive destination: {}", event.getIdentifier());
trellisEvent.fireAsync(event);
subject.onNext(of(serializer.serialize(event)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2020 Aaron Coburn and individual contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.trellisldp.reactive;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.ObservesAsync;

import org.trellisldp.api.Event;

@ApplicationScoped
public class EventCollector {

private final List<Event> events = new CopyOnWriteArrayList<>();

public void sink(@ObservesAsync final Event event) {
events.add(event);
}

public List<Event> getResults() {
return events;
}

public void clear() {
events.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class ReactiveEventServiceTest {
InternalChannelRegistry.class,
ConfiguredChannelFactory.class,
TestCollector.class,
EventCollector.class,
ReactiveEventService.class,
DefaultEventSerializationService.class,
ConfigProducer.class)
Expand All @@ -74,6 +75,9 @@ class ReactiveEventServiceTest {
@Inject
private TestCollector collector;

@Inject
private EventCollector events;

@Inject
private ReactiveEventService service;

Expand All @@ -91,12 +95,8 @@ void setUp() {
when(mockEvent.getTypes()).thenReturn(singleton(AS.Update));
when(mockEvent.getObjectTypes()).thenReturn(singleton(LDP.RDFSource));
when(mockEvent.getInbox()).thenReturn(empty());
}

@Test
void testNoargCtor() {
final ReactiveEventService svc = new ReactiveEventService();
assertDoesNotThrow(() -> svc.emit(mockEvent));
events.clear();
collector.clear();
}

@Test
Expand All @@ -112,4 +112,10 @@ void testReactiveStream() {
await().atMost(5, SECONDS).until(() -> collector.getResults().size() == 4);
assertEquals(4, collector.getResults().size(), "Incorrect number of messages!");
}

@Test
void testCdiEvent() {
service.emit(mockEvent);
await().atMost(5, SECONDS).until(() -> events.getResults().size() > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package org.trellisldp.reactive;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;

Expand All @@ -25,7 +25,7 @@
@ApplicationScoped
public class TestCollector {

private final List<String> list = new ArrayList<>();
private final List<String> list = new CopyOnWriteArrayList<>();

@Incoming(ReactiveEventService.REACTIVE_DESTINATION)
public void sink(final String message) {
Expand All @@ -35,4 +35,8 @@ public void sink(final String message) {
public List<String> getResults() {
return list;
}

public void clear() {
list.clear();
}
}

0 comments on commit d7c0009

Please sign in to comment.