Skip to content

Commit

Permalink
Addressed initial comments
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 authored and dbwiddis committed Jan 21, 2024
1 parent 760b152 commit 09ed85b
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void executeDeprovisionSequence(
String resourceNameAndId = getResourceNameAndId(resource);
PlainActionFuture<WorkflowData> deprovisionFuture = deprovisionNode.execute();
try {
deprovisionFuture.actionGet();
deprovisionFuture.get();
logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId);
// Remove from list so we don't try again
iter.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
workflowFutureList.add(processNode.execute());
}

// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
// Attempt to join each workflow step future, may throw a ExecutionException if any step completes exceptionally
workflowFutureList.forEach(PlainActionFuture::actionGet);

logger.info("Provisioning completed successfully for workflow {}", workflowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ public PlainActionFuture<WorkflowData> execute(

if (configuration == null) {
// Required workflow data not found
createIngestPipelineFuture.onFailure(new Exception("Failed to create ingest pipeline, required inputs not found"));
createIngestPipelineFuture.onFailure(
new IllegalArgumentException("Failed to create ingest pipeline, required inputs not found")
);
} else {
// Create PutPipelineRequest and execute
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configuration, XContentType.JSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
import org.opensearch.threadpool.Scheduler.ScheduledCancellable;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;

Expand Down Expand Up @@ -147,20 +145,12 @@ public PlainActionFuture<WorkflowData> execute() {
}

CompletableFuture.runAsync(() -> {
List<PlainActionFuture<WorkflowData>> predFutures = predecessors.stream().map(p -> p.future()).collect(Collectors.toList());
List<WorkflowData> waitForPredecessors = new ArrayList<>(predFutures.size());
try {
if (!predecessors.isEmpty()) {
for (PlainActionFuture<WorkflowData> future : predFutures) {
waitForPredecessors.add(future.get());
}

}
logger.info("Starting {}.", this.id);
// get the input data from predecessor(s)
Map<String, WorkflowData> inputMap = new HashMap<>();
for (PlainActionFuture<WorkflowData> cf : predFutures) {
WorkflowData wd = cf.actionGet();
for (ProcessNode node : predecessors) {
WorkflowData wd = node.future().actionGet();
inputMap.put(wd.getNodeId(), wd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,8 @@ public String getName() {
assertEquals("E", nodeE.toString());

PlainActionFuture<WorkflowData> f = nodeE.execute();
UncategorizedExecutionException exception = assertThrows(UncategorizedExecutionException.class, () -> f.actionGet());
RuntimeException exception = assertThrows(RuntimeException.class, () -> f.actionGet());
assertTrue(f.isDone());
assertEquals(
"java.util.concurrent.ExecutionException: java.lang.RuntimeException: Test exception",
exception.getCause().getMessage()
);

// Tests where we already called execute
assertThrows(IllegalStateException.class, () -> nodeE.execute());
}
Expand Down

0 comments on commit 09ed85b

Please sign in to comment.