Skip to content

Commit

Permalink
Merge pull request #8 from Koralix-Studios/dev
Browse files Browse the repository at this point in the history
v1.1.0
  • Loading branch information
JohanVonElectrum authored Apr 11, 2023
2 parents 0355cb4 + 1dbbc3f commit bfff7a7
Show file tree
Hide file tree
Showing 11 changed files with 507 additions and 85 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

group 'com.koralix.stepfn'
version '1.0.1'
version '1.1.0'

repositories {
mavenCentral()
Expand Down
56 changes: 35 additions & 21 deletions src/main/java/com/koralix/stepfn/AsyncStepFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
import java.util.function.Supplier;

/**
* A {@link StepFunction} that returns a {@link CompletableFuture}.
Expand All @@ -24,47 +24,55 @@
*/
public class AsyncStepFunction<T, R> extends StepFunction<T, R, CompletableFuture<R>> {

private final Executor executor;
private final Supplier<ExecutorService> executorSupplier;
private ExecutorService executor;

/**
* Creates a new {@link AsyncStepFunction} with the given initial step and transitions.
*
* @param initialStep the initial step
* @param transitions the transitions
* @param executor the executor to use for asynchronous computation
* @param initialStep the initial step
* @param transitions the transitions
* @param executorSupplier the supplier of the executor to use for asynchronous computation
* @deprecated use {@link #AsyncStepFunction(Step, Supplier)} instead - this constructor will be removed in 1.2.0
* @since 1.0.0
*/
@Deprecated
public AsyncStepFunction(
Step<T, ?> initialStep,
Map<Step<?, ?>, Set<Transition<?>>> transitions,
Executor executor
Map<Step<?, ?>, Set<Transition<?, ?>>> transitions,
Supplier<ExecutorService> executorSupplier
) {
super(initialStep, transitions);
this.executor = executor;
this.executorSupplier = executorSupplier;
}

/**
* Creates a new {@link AsyncStepFunction} with the given initial step.
*
* @param initialStep the initial step
* @param executor the executor to use for asynchronous computation
* @param initialStep the initial step
* @param executorSupplier the supplier of the executor to use for asynchronous computation
* @since 1.0.0
*/
public AsyncStepFunction(Step<T, ?> initialStep, Executor executor) {
public AsyncStepFunction(Step<T, ?> initialStep, Supplier<ExecutorService> executorSupplier) {
super(initialStep);
this.executor = executor;
this.executorSupplier = executorSupplier;
}

/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result in a {@link CompletableFuture}
* @since 1.0.0
*/
@Override
public CompletableFuture<R> apply(T t) {
CompletableFuture<R> future = new CompletableFuture<>();
// TODO: check that null is stored in the aggregation map
// may be a problem to read the value from the map
this.executor = this.executorSupplier.get();
this.apply(this.firstStep(), null, t, future);
future.whenComplete((r, e) -> {
this.executor.shutdownNow();
});
return future;
}

Expand All @@ -78,17 +86,23 @@ public CompletableFuture<R> apply(T t) {
* @param step the step to apply
* @param from the step from which the input was received
* @param input the input to the step
* @param <A> the type of the input to the step
* @param <B> the type of the result of the step
* @param <A> the input type of the step
* @param <B> the output type of the step
* @return the result of the step in a {@link CompletableFuture} if the step is complete,
* an empty {@link Optional} otherwise
* @since 1.0.0
*/
@Override
protected <A, B> Optional<CompletableFuture<B>> step(Step<A, B> step, Step<?, A> from, A input) {
protected <A, B> Optional<CompletableFuture<B>> step(Step<A, B> step, Step<?, ?> from, A input) {
step.aggregate(from, input);
if (step.isComplete())
return Optional.of(CompletableFuture.supplyAsync(() -> step.apply(input), this.executor));
else
if (step.isComplete()) {
try {
return Optional.of(CompletableFuture.supplyAsync(() -> step.apply(input), this.executor));
} catch (RejectedExecutionException ignored) {
return Optional.empty();
}
} else {
return Optional.empty();
}
}
}
8 changes: 6 additions & 2 deletions src/main/java/com/koralix/stepfn/Step.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ public abstract class Step<T, R> implements Function<T, R> {

/**
* The aggregation of the inputs from the previous steps.
* @since 1.0.0
*/
protected final Map<Step<?, T>, T> aggregation = new HashMap<>();
protected final Map<Step<?, ?>, T> aggregation = new HashMap<>();

/**
* The input from the step function.
* <p>
* This is only used for the first step.
* @since 1.0.0
*/
protected T stepFunctionInput;

Expand All @@ -32,8 +34,9 @@ public abstract class Step<T, R> implements Function<T, R> {
*
* @param from the step that produced the input
* @param input the input
* @since 1.0.0
*/
public void aggregate(Step<?, T> from, T input) {
public void aggregate(Step<?, ?> from, T input) {
if (from == null)
this.stepFunctionInput = input;
else
Expand All @@ -44,6 +47,7 @@ public void aggregate(Step<?, T> from, T input) {
* Checks if the step has all the required inputs to be executed.
*
* @return true if the step is complete, false otherwise
* @since 1.0.0
*/
public abstract boolean isComplete();

Expand Down
88 changes: 63 additions & 25 deletions src/main/java/com/koralix/stepfn/StepFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,20 @@
public abstract class StepFunction<T, V, R> implements Function<T, R> {

private final Step<T, ?> initialStep;
private final Map<Step<?, ?>, Set<Transition<?>>> transitions = new HashMap<>();
private final Map<Step<?, ?>, Set<Transition<?, ?>>> transitions = new HashMap<>();

/**
* Creates a new {@link StepFunction} with the given initial {@link Step} and {@link Transition}s.
*
* @param initialStep the initial step
* @param transitions the transitions
* @deprecated use {@link #StepFunction(Step)} instead - this constructor will be removed in 1.2.0
* @since 1.0.0
*/
@Deprecated
public StepFunction(
Step<T, ?> initialStep,
Map<Step<?, ?>, Set<Transition<?>>> transitions
Map<Step<?, ?>, Set<Transition<?, ?>>> transitions
) {
this.initialStep = initialStep;
this.transitions.putAll(transitions.entrySet().stream().map(entry -> Map.entry(
Expand All @@ -87,34 +90,62 @@ public StepFunction(
* Creates a new {@link StepFunction} with the given initial {@link Step}.
*
* @param initialStep the initial step
* @since 1.0.0
*/
public StepFunction(Step<T, ?> initialStep) {
this.initialStep = initialStep;
}

/**
* Adds a {@link Transition} from the first given {@link Step} to the second given {@link Step}
* with the given predicate.
* with the given predicate and mapper.
*
* @param from the step to transition from
* @param to the step to transition to
* @param predicate the predicate to determine if the transition is applicable
* @param mapper the mapper to map the input to the next step
* @param <A> the output type of the from step
* @param <B> the input type of the to step
* @since 1.1.0
*/
public <A> void addTransition(Step<?, A> from, Step<A, ?> to, Function<A, Boolean> predicate) {
this.transitions.computeIfAbsent(from, step -> new HashSet<>()).add(new Transition<A>() {
public <A, B> void addTransition(
Step<?, A> from,
Step<B, ?> to,
Function<A, Boolean> predicate,
Function<A, B> mapper
) {
this.transitions.computeIfAbsent(from, step -> new HashSet<>()).add(new Transition<A, B>() {
@Override
public boolean isApplicable(A input) {
return predicate.apply(input);
}

@Override
public Step<A, ?> get() {
public Step<B, ?> get() {
return to;
}

@Override
public B map(A input) {
return mapper.apply(input);
}
});
}

/**
* Adds a {@link Transition} from the first given {@link Step} to the second given {@link Step}
* with the given predicate.
*
* @param from the step to transition from
* @param to the step to transition to
* @param predicate the predicate to determine if the transition is applicable
* @param <A> the output type of the from step
* @since 1.0.0
*/
public <A> void addTransition(Step<?, A> from, Step<A, ?> to, Function<A, Boolean> predicate) {
this.addTransition(from, to, predicate, input -> input);
}

/**
* Recursive method that applies the given {@link Step} to the given input.
* <p>
Expand All @@ -129,29 +160,27 @@ public boolean isApplicable(A input) {
* @param future the future to complete
* @param <A> the input type of the step
* @param <B> the output type of the step
* @since 1.0.0
*/
@SuppressWarnings("unchecked")
protected <A, B> void apply(Step<A, B> step, Step<?, A> from, A input, CompletableFuture<V> future) {
protected <A, B> void apply(Step<A, B> step, Step<?, ?> from, A input, CompletableFuture<V> future) {
this.step(step, from, input).ifPresent(completableFuture -> {
completableFuture.thenAccept(b -> {
completableFuture.thenAccept(stepResult -> {
step.aggregation.clear();
step.stepFunctionInput = null;

Set<Transition<?>> transitions = this.transitions(step);
if (transitions == null) {
future.complete((V) b);
return;
if (this.transitions(step).stream()
.filter(transition -> transition.isApplicable(stepResult))
.peek(transition -> {
this.apply(transition.get(), step, transition.map(stepResult), future);
})
.count() == 0
) {
future.complete((V) stepResult);
}
List<? extends Step<B, ?>> nextSteps = transitions.stream()
.map(transition -> (Transition<B>) transition)
.filter(transition -> transition.isApplicable(b))
.map(Transition::get)
.map(next -> (Step<B, ?>) next)
.toList();
if (nextSteps.isEmpty())
future.complete((V) b);
else
nextSteps.forEach(next -> this.apply(next, step, b, future));
}).exceptionally(throwable -> {
future.completeExceptionally(throwable);
return null;
});
});
}
Expand All @@ -168,13 +197,15 @@ protected <A, B> void apply(Step<A, B> step, Step<?, A> from, A input, Completab
* @param <A> the input type of the step
* @param <B> the output type of the step
* @return the result of the step
* @since 1.0.0
*/
protected abstract <A, B> Optional<CompletableFuture<B>> step(Step<A, B> step, Step<?, A> from, A input);
protected abstract <A, B> Optional<CompletableFuture<B>> step(Step<A, B> step, Step<?, ?> from, A input);

/**
* Returns the initial {@link Step} of this {@link StepFunction}.
*
* @return the initial step
* @since 1.0.0
*/
protected Step<T, ?> firstStep() {
return this.initialStep;
Expand All @@ -184,10 +215,17 @@ protected <A, B> void apply(Step<A, B> step, Step<?, A> from, A input, Completab
* Returns the {@link Transition}s that transition from the given {@link Step}.
*
* @param step the step
* @param <A> the output type of the step
* @param <B> the output type of the transitions
* @return the transitions
* @since 1.0.0
*/
protected Set<Transition<?>> transitions(Step<?, ?> step) {
return this.transitions.get(step);
@SuppressWarnings("unchecked")
protected <A, B> Set<Transition<A, B>> transitions(Step<?, A> step) {
Set<Transition<?, ?>> returnSet = this.transitions.get(step);
return returnSet == null ? Set.of() : returnSet.parallelStream()
.map(transition -> (Transition<A, B>) transition)
.collect(Collectors.toSet());
}

}
17 changes: 10 additions & 7 deletions src/main/java/com/koralix/stepfn/SyncStepFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* A {@link StepFunction} that computes its result synchronously.
Expand All @@ -26,15 +25,19 @@ public class SyncStepFunction<T, R> extends StepFunction<T, R, R> {
*
* @param initialStep the initial step
* @param transitions the transitions
* @deprecated use {@link #SyncStepFunction(Step)} instead - this constructor will be removed in 1.2.0
* @since 1.0.0
*/
public SyncStepFunction(Step<T, ?> initialStep, Map<Step<?, ?>, Set<Transition<?>>> transitions) {
@Deprecated
public SyncStepFunction(Step<T, ?> initialStep, Map<Step<?, ?>, Set<Transition<?, ?>>> transitions) {
super(initialStep, transitions);
}

/**
* Creates a new {@link SyncStepFunction} with the given initial step.
*
* @param initialStep the initial step
* @since 1.0.0
*/
public SyncStepFunction(Step<T, ?> initialStep) {
super(initialStep);
Expand All @@ -45,12 +48,11 @@ public SyncStepFunction(Step<T, ?> initialStep) {
*
* @param t the function argument
* @return the function result
* @since 1.0.0
*/
@Override
public R apply(T t) {
CompletableFuture<R> future = new CompletableFuture<>();
// TODO: check that null is stored in the aggregation map
// may be a problem to read the value from the map
this.apply(this.firstStep(), null, t, future);
return future.join();
}
Expand All @@ -65,13 +67,14 @@ public R apply(T t) {
* @param step the step to apply
* @param from the step from which the input was received
* @param input the input to the step
* @param <A> the type of the input to the step
* @param <B> the type of the result of the step
* @param <A> the input type of the step
* @param <B> the output type of the step
* @return the result of the step in a completed {@link CompletableFuture} if the step is complete,
* an empty {@link Optional} otherwise
* @since 1.0.0
*/
@Override
protected <A, B> Optional<CompletableFuture<B>> step(Step<A, B> step, Step<?, A> from, A input) {
protected <A, B> Optional<CompletableFuture<B>> step(Step<A, B> step, Step<?, ?> from, A input) {
step.aggregate(from, input);
if (step.isComplete())
return Optional.of(CompletableFuture.completedFuture(step.apply(input)));
Expand Down
Loading

0 comments on commit bfff7a7

Please sign in to comment.