Skip to content

Commit

Permalink
Limit trigger handling concurrency
Browse files Browse the repository at this point in the history
- Change flatMap to use concurrency 1 in a trigger handling which
  should fix issues when events are sent fast.
- Fixes #942
  • Loading branch information
jvalkeal committed Apr 5, 2021
1 parent 5919b06 commit 4dcf6dc
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -115,7 +115,8 @@ public ReactiveStateMachineExecutor(StateMachine<S, E> stateMachine, StateMachin
@Override
protected void onInit() throws Exception {
triggerSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger));
// limit concurrency so that we get one by one handling
triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger), 1);
}

@Override
Expand Down Expand Up @@ -328,10 +329,11 @@ private Mono<Void> handleTrigger(TriggerQueueItem queueItem) {
}
});
}
}))
.contextWrite(Context.of(
StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(),
REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder()));
})
)
.contextWrite(Context.of(
StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(),
REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder()));
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@
import java.lang.reflect.Method;
import java.time.Duration;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
Expand All @@ -44,6 +46,8 @@
*/
public class TestUtils {

private static Log log = LogFactory.getLog(TestUtils.class);

@SuppressWarnings("unchecked")
public static <S, E> StateMachine<S, E> resolveMachine(BeanFactory beanFactory) {
assertThat(beanFactory.containsBean(DEFAULT_ID_STATEMACHINE)).isTrue();
Expand Down Expand Up @@ -99,7 +103,10 @@ public static <T> Flux<Message<T>> eventsAsFlux(T... events) {

public static <S, E> void doSendEventAndConsumeAll(StateMachine<S, E> stateMachine, E event) {
StepVerifier.create(stateMachine.sendEvent(eventAsMono(event)))
.thenConsumeWhile(eventResult -> true)
.thenConsumeWhile(eventResult -> {
log.debug("Consume eventResult " + eventResult);
return true;
})
.expectComplete()
.verify(Duration.ofSeconds(5));
}
Expand All @@ -126,6 +133,30 @@ public static <S, E> void doSendEventAndConsumeResultAsDenied(StateMachine<S, E>
.verifyComplete();
}

@SafeVarargs
public static <S, E> void doSendEventsAndConsumeAll(StateMachine<S, E> stateMachine, E... events) {
StepVerifier.create(stateMachine.sendEvents(eventsAsFlux(events)))
.thenConsumeWhile(eventResult -> {
log.debug("Consume eventResult " + eventResult);
return true;
})
.expectComplete()
.verify(Duration.ofSeconds(5));
}

@SafeVarargs
public static <S, E> void doSendEventsAndConsumeAllWithComplete(StateMachine<S, E> stateMachine, E... events) {
Flux<Void> completions = stateMachine.sendEvents(eventsAsFlux(events))
.doOnNext(result -> {
log.debug("Consume eventResult " + result);
})
.flatMap(result -> result.complete());
StepVerifier.create(completions)
.thenConsumeWhile(complete -> true)
.expectComplete()
.verify(Duration.ofSeconds(10));
}

@SuppressWarnings("unchecked")
public static <T> T readField(String name, Object target) throws Exception {
Field field = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.statemachine.action;

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.statemachine.TestUtils.doSendEventsAndConsumeAllWithComplete;
import static org.springframework.statemachine.TestUtils.doStartAndAssert;
import static org.springframework.statemachine.TestUtils.resolveMachine;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.AbstractStateMachineTests;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.config.EnableStateMachine;
import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;

import reactor.core.publisher.Mono;

/**
* Tests for state machine reactive actions.
*
* @author Janne Valkealahti
*
*/
public class ReactiveAction2Tests extends AbstractStateMachineTests {

@Test
public void testSimpleReactiveActions() throws Exception {
context.register(Config1.class);
context.refresh();
StateMachine<TestStates, TestEvents> machine = resolveMachine(context);
doStartAndAssert(machine);

TestCountAction testAction3 = context.getBean("testAction3", TestCountAction.class);
TestCountAction testAction4 = context.getBean("testAction4", TestCountAction.class);
doSendEventsAndConsumeAllWithComplete(machine, TestEvents.E1, TestEvents.E2);
assertThat(testAction3.latch.await(6, TimeUnit.SECONDS)).isTrue();
assertThat(testAction4.latch.await(6, TimeUnit.SECONDS)).isTrue();
assertThat(testAction4.time.get() - testAction3.time.get()).isGreaterThan(1000);
}

@Configuration
@EnableStateMachine
static class Config1 extends EnumStateMachineConfigurerAdapter<TestStates, TestEvents> {

@Override
public void configure(StateMachineStateConfigurer<TestStates, TestEvents> states) throws Exception {
states
.withStates()
.initial(TestStates.S1)
.stateEntryFunction(TestStates.S2, testAction3())
.stateEntryFunction(TestStates.S3, testAction4());
}

@Override
public void configure(StateMachineTransitionConfigurer<TestStates, TestEvents> transitions) throws Exception {
transitions
.withExternal()
.source(TestStates.S1)
.target(TestStates.S2)
.event(TestEvents.E1)
.and()
.withExternal()
.source(TestStates.S2)
.target(TestStates.S3)
.event(TestEvents.E2);
}

@Bean
public TestCountAction testAction3() {
return new TestCountAction("ACTION3");
}

@Bean
public TestCountAction testAction4() {
return new TestCountAction("ACTION4");
}
}

@Override
protected AnnotationConfigApplicationContext buildContext() {
return new AnnotationConfigApplicationContext();
}

private static class TestCountAction implements ReactiveAction<TestStates, TestEvents> {

private final String id;
int count = 0;
CountDownLatch latch = new CountDownLatch(1);
AtomicLong time = new AtomicLong();

TestCountAction(String id) {
this.id = id;
}

@Override
public Mono<Void> apply(StateContext<TestStates, TestEvents> context) {
return Mono.delay(Duration.ofMillis(2000))
.doFinally(x -> {
count++;
time.set(System.currentTimeMillis());
latch.countDown();
})
.then()
.log(id);
}
}
}

0 comments on commit 4dcf6dc

Please sign in to comment.