Skip to content

Commit

Permalink
Merge pull request #6 from flipkart-incubator/async
Browse files Browse the repository at this point in the history
Support for Async Data Adapters
  • Loading branch information
bageshwar authored Jun 20, 2022
2 parents ed3c116 + d50baa2 commit 144cf3b
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
</build>

<properties>
<revision>0.1.0</revision>
<revision>0.1.1</revision>
<guava.version>19.0</guava.version>
<guice.version>4.2.3</guice.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright [2021] [The Original Author]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package flipkart.tef.bizlogics;

import flipkart.tef.exception.TefExecutionException;

import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
* A DataAdapter which emits a future. During the injection phase the caller can either
* 1) do a blocking get (thread will stall till the result is available)
* 2) do a done and get check (to consume the result only if its available)
* 3) do a get with timeout (to wait for the result before consuming them)
* <p>
* The semantics of the Generic Type parameter expect a `Future<Optional<X>>` explicitly
* instead of just <X> because during the flow building phase, the complete signature of the
* generic type interface needs to be present at impl or superclass.
* Only taking the final result type as input from implementation classes (for the value of generic parameter)
* will break that contract.
* <p>
* Since the flow builder uses reflection to get generic params to know what will the return type of data adapter,
* taking only final result type (say X) from implementation class, will appear to TEF as if
* the implementation classes return 'X' rather than the Future.
* <p>
* <p>
* Date: 18/06/22
* Time: 7:42 PM
*/
public abstract class AsyncDataAdapterBizlogic<T extends Future<Optional<U>>, U> extends DataAdapterBizlogic<T> {

private final ThreadPoolExecutor threadPoolExecutor;
private final boolean bubbleException;

/**
* @param threadPoolExecutor Threadpool executor to which to task will be submitted
*/
public AsyncDataAdapterBizlogic(ThreadPoolExecutor threadPoolExecutor) {
this(threadPoolExecutor, false);
}

/**
* @param threadPoolExecutor Threadpool executor to which to task will be submitted
* @param bubbleException if true, any exception thrown as part of computing the result will be rethrown,
* else `Optional.empty` will be returned.
*/
public AsyncDataAdapterBizlogic(ThreadPoolExecutor threadPoolExecutor, boolean bubbleException) {
this.threadPoolExecutor = threadPoolExecutor;
this.bubbleException = bubbleException;
}

@Override
public final T adapt(final TefContext tefContext) {
/*
Submit the task to the threadpool
The `bubbleException` flag will be used to decide the behavior in case of an exception,
either to return an empty value, or rethrow the exception
*/
return (T) threadPoolExecutor.submit(() -> getResultImpl(tefContext));
}

/**
* This method should compute and return the result. The flow execution will not be blocked on this method.
* i.e. This method will run in async
*
* @param tefContext TefContext for callers
* @return The result
* @throws TefExecutionException The retriability error codes are not honoured when executing this bizlogic
*/
public abstract U getResult(TefContext tefContext) throws TefExecutionException;

private Optional<U> getResultImpl(TefContext tefContext) throws Exception {
try {
U result = getResult(tefContext);
return Optional.ofNullable(result);
} catch (Exception e) {
// Catch-all block to minimize side effects
if (bubbleException) {
throw e;
} else {
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private void convertDataDependencyToBizLogicDependency() {
}
Class<? extends IDataBizlogic<?>> adapter = dataAdapterMap.get(injectedData);
Preconditions.checkArgument(adapter != null, String.format(Messages.DATA_ADAPTER_NOT_RESOLVED_FOR,
injectedData.getResultClass().getCanonicalName()));
injectedData.getResultClass().getCanonicalName(), entry.getKey().getName()));
addDependencies(entry.getKey(), adapter);
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ private void populateDataAdapterMap(Class<? extends IBizlogic> bizlogic) {
throw new AdapterConflictRuntimeException(returnType, bizlogic, dataAdapterMap.get(key));
}
}
} catch (AdapterConflictRuntimeException|UnableToResolveDataFromAdapterRuntimeException e) {
} catch (AdapterConflictRuntimeException | UnableToResolveDataFromAdapterRuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -266,7 +266,7 @@ private Class<?> getReturnTypeFromBizlogicUsingSunApi(Class<? extends DataAdapte
// This could be a case of a data adapter being a subclass of another
// data adapter where type parameter is specified on the superclass
if(DataAdapterBizlogic.class.isAssignableFrom(dataAdapterBizLogic.getSuperclass())) {
return getReturnTypeFromBizlogicUsingSunApi((Class<? extends DataAdapterBizlogic<?>>)dataAdapterBizLogic.getSuperclass(), classHierarchy);
return getReturnTypeFromBizlogicUsingSunApi((Class<? extends DataAdapterBizlogic<?>>) dataAdapterBizLogic.getSuperclass(), classHierarchy);
}
}

Expand All @@ -278,7 +278,7 @@ static class Messages {
public static final String A_BIZLOGIC_CANNOT_DEPEND_ON_SELF = "A bizlogic cannot depend on Self";
public static final String MORE_THAN_1_DEPENDS_ON_ANNOTATIONS_FOUND = "More than 1 @DependsOn annotations found";
public static final String COULD_NOT_DEDUCE_THE_STARTING_STEP = "Could not deduce the starting step";
public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s";
public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s in bizlogic %s";
}

private void addDependencies(Class<? extends IBizlogic> bizlogic, Class<? extends IBizlogic>... dependencies) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package flipkart.tef.bizlogics;

import com.google.common.collect.Queues;
import flipkart.tef.TestTefContext;
import flipkart.tef.annotations.EmitData;
import flipkart.tef.annotations.InjectData;
import flipkart.tef.exception.TefExecutionException;
import flipkart.tef.execution.DataContext;
import flipkart.tef.execution.DataDependencyException;
import flipkart.tef.execution.FlowExecutor;
import flipkart.tef.execution.FluentCapabilityBuilder;
import flipkart.tef.execution.MyFlowExecutionListener;
import flipkart.tef.flow.SimpleFlow;
import org.junit.Before;
import org.junit.Test;

import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class AsyncDataAdapterBizlogicTest {

private static ThreadPoolExecutor threadPoolExecutor;

private FluentCapabilityBuilder flowBuilder;

@Before
public void setUp() {
flowBuilder = new FluentCapabilityBuilder();
BlockingQueue<Runnable> blockingQueue = Queues.newArrayBlockingQueue(10);
threadPoolExecutor = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, blockingQueue, (r, executor) -> {
r.run();
});
}

@Test
public void testAsyncDataInjection() throws TefExecutionException, DataDependencyException, IllegalAccessException, InstantiationException {


flowBuilder.withAdapter(Sample1AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample2AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample3AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample4AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample5AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample6AsyncDataAdapterBizlogic.class);
flowBuilder.withBizlogic(SimpleBizlogic.class);
SimpleFlow dataflow = flowBuilder.dataflow();

assertEquals(7, dataflow.getBizlogics().size());
assertTrue(dataflow.getBizlogics().contains(Sample1AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample2AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample3AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample4AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample5AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample6AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(SimpleBizlogic.class));

DataContext dataContext = new DataContext();
FlowExecutor executor = new FlowExecutor(dataflow, dataContext, new TestTefContext());

MyFlowExecutionListener listener = new MyFlowExecutionListener();
executor.addListener(listener);
executor.execute();
}

static class SimpleBizlogic implements IBizlogic {

// Sleeps and returns
@InjectData(name = "1")
Future<Optional<String>> asyncResult1;

// Sleeps and returns with bubbleException=true, but does not throw an exception
@InjectData(name = "2")
Future<Optional<String>> asyncResult2;

// throws exception with bubbleException=true
@InjectData(name = "3")
Future<Optional<String>> asyncResult3;

// throws exception without bubbleException=true
@InjectData(name = "4")
Future<Optional<String>> asyncResult4;

// returns null with bubbleException=true
@InjectData(name = "5")
Future<Optional<String>> asyncResult5;

// returns null with bubbleException=false
@InjectData(name = "6")
Future<Optional<String>> asyncResult6;

@Override
public void execute(TefContext tefContext) throws TefExecutionException {
// 100ms sleep in workers ensures result is not ready instantly
assertFalse(asyncResult1.isDone());
assertFalse(asyncResult2.isDone());
assertFalse(asyncResult3.isDone());
assertFalse(asyncResult4.isDone());
assertFalse(asyncResult5.isDone());
assertFalse(asyncResult6.isDone());

// 200ms sleep ensures results are ready
sleep(200);
assertTrue(asyncResult1.isDone());
assertTrue(asyncResult2.isDone());
assertTrue(asyncResult3.isDone());
assertTrue(asyncResult4.isDone());
assertTrue(asyncResult5.isDone());
assertTrue(asyncResult6.isDone());

try {
// assert on results
assertEquals("1", asyncResult1.get().get());
assertEquals("2", asyncResult2.get().get());
// #4 does not return a result since it throws an exception
assertFalse(asyncResult4.get().isPresent());
} catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}

try {
// #3 throws an exception with bubbleException=true
asyncResult3.get();
fail("Runtime exception was expected");
} catch (RuntimeException | InterruptedException | ExecutionException e) {
assertEquals("java.lang.RuntimeException: 3", e.getMessage());
}

try {
// irrespective of bubbleException state,
// data adapters returning nulls, should land as empty data injections
assertFalse(asyncResult5.get().isPresent());
assertFalse(asyncResult6.get().isPresent());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
fail("Unexpected exception");
}
}
}

static void sleep(long t) {
try {
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static void sleep() {
sleep(100);
}


@EmitData(name = "1")
static class Sample1AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample1AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
return "1";
}
}

@EmitData(name = "2")
static class Sample2AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample2AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
return "2";
}
}

@EmitData(name = "3")
static class Sample3AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample3AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
throw new RuntimeException("3");
}
}

@EmitData(name = "4")
static class Sample4AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample4AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
throw new RuntimeException("4");
}
}

@EmitData(name = "5")
static class Sample5AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample5AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
return null;
}
}

@EmitData(name = "6")
static class Sample6AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample6AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
return null;
}
}
}
Loading

0 comments on commit 144cf3b

Please sign in to comment.