Skip to content

Commit

Permalink
Improve error messages for workflow states other than NOT_STARTED (op…
Browse files Browse the repository at this point in the history
…ensearch-project#642)

* Improve error messages for workflow states other than NOT_STARTED

Signed-off-by: Daniel Widdis <[email protected]>

* Update CHANGELOG

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Apr 6, 2024
1 parent f6e9036 commit 452aec5
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Documentation
### Maintenance
### Refactoring
- Improve error messages for workflow states other than NOT_STARTED ([#642](https://github.com/opensearch-project/flow-framework/pull/642))
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -419,8 +420,8 @@ public void updateTemplateInGlobalContext(
}
doesTemplateExist(documentId, templateExists -> {
if (templateExists) {
isWorkflowNotStarted(documentId, workflowIsNotStarted -> {
if (workflowIsNotStarted || ignoreNotStartedCheck) {
getProvisioningProgress(documentId, progress -> {
if (ignoreNotStartedCheck || ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
Expand All @@ -436,7 +437,9 @@ public void updateTemplateInGlobalContext(
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
} else {
String errorMessage = "The template has already been provisioned so it can't be updated: " + documentId;
String errorMessage = "The template can not be updated unless its provisioning state is NOT_STARTED: "
+ documentId
+ ". Deprovision the workflow to reset the state.";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
Expand Down Expand Up @@ -474,36 +477,40 @@ public <T> void doesTemplateExist(String documentId, Consumer<Boolean> booleanRe
}

/**
* Check if the workflow has been provisioned and executes the consumer by passing a boolean
* Check workflow provisioning state and executes the consumer
*
* @param documentId document id
* @param booleanResultConsumer boolean consumer function based on if workflow is provisioned or not
* @param provisioningProgressConsumer consumer function based on if workflow is provisioned.
* @param listener action listener
* @param <T> action listener response type
*/
public <T> void isWorkflowNotStarted(String documentId, Consumer<Boolean> booleanResultConsumer, ActionListener<T> listener) {
public <T> void getProvisioningProgress(
String documentId,
Consumer<Optional<ProvisioningProgress>> provisioningProgressConsumer,
ActionListener<T> listener
) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();
if (!response.isExists()) {
booleanResultConsumer.accept(false);
provisioningProgressConsumer.accept(Optional.empty());
return;
}
try (
XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
booleanResultConsumer.accept(workflowState.getProvisioningProgress().equals(ProvisioningProgress.NOT_STARTED.name()));
provisioningProgressConsumer.accept(Optional.of(ProvisioningProgress.valueOf(workflowState.getProvisioningProgress())));
} catch (Exception e) {
String errorMessage = "Failed to parse workflow state " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}
}, exception -> {
logger.error("Failed to get workflow state for {} ", documentId);
booleanResultConsumer.accept(false);
provisioningProgressConsumer.accept(Optional.empty());
}));
} catch (Exception e) {
String errorMessage = "Failed to retrieve workflow state to check provisioning status";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.lang.Boolean.TRUE;
import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
Expand Down Expand Up @@ -132,8 +131,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
);
workflowProcessSorter.validate(provisionProcessSequence, pluginsService);

flowFrameworkIndicesHandler.isWorkflowNotStarted(workflowId, workflowIsNotStarted -> {
if (TRUE.equals(workflowIsNotStarted)) {
flowFrameworkIndicesHandler.getProvisioningProgress(workflowId, progress -> {
if (ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) {
// update state index
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Expand Down Expand Up @@ -174,7 +173,11 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
})
);
} else {
String errorMessage = "The template has already been provisioned: " + workflowId;
String errorMessage = "The workflow provisioning state is "
+ (progress.isPresent() ? progress.get().toString() : "unknown")
+ " and can not be provisioned unless its state is NOT_STARTED: "
+ workflowId
+ ". Deprovision the workflow to reset the state.";
logger.info(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowState;
Expand All @@ -46,6 +47,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -253,7 +255,7 @@ public void testInitIndexIfAbsent_IndexNotPresent() {

public void testIsWorkflowProvisionedFailedParsing() {
String documentId = randomAlphaOfLength(5);
Consumer<Boolean> function = mock(Consumer.class);
Consumer<Optional<ProvisioningProgress>> function = mock(Consumer.class);
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
Expand All @@ -277,7 +279,7 @@ public void testIsWorkflowProvisionedFailedParsing() {
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());
flowFrameworkIndicesHandler.isWorkflowNotStarted(documentId, function, listener);
flowFrameworkIndicesHandler.getProvisioningProgress(documentId, function, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertTrue(exceptionCaptor.getValue().getMessage().contains("Failed to parse workflow state"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ public void testFailedUpdateWorkflow() throws Exception {
ResponseException.class,
() -> updateWorkflow(client(), workflowId, template)
);
assertTrue(exceptionProvisioned.getMessage().contains("The template has already been provisioned so it can't be updated"));
assertTrue(
exceptionProvisioned.getMessage().contains("The template can not be updated unless its provisioning state is NOT_STARTED")
);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
Expand All @@ -40,6 +41,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -138,10 +140,10 @@ public void testProvisionWorkflow() {

// Bypass isWorkflowNotStarted and force true response
doAnswer(invocation -> {
Consumer<Boolean> boolConsumer = invocation.getArgument(1);
boolConsumer.accept(true);
Consumer<Optional<ProvisioningProgress>> progressConsumer = invocation.getArgument(1);
progressConsumer.accept(Optional.of(ProvisioningProgress.NOT_STARTED));
return null;
}).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any());
}).when(flowFrameworkIndicesHandler).getProvisioningProgress(any(), any(), any());

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
Expand Down Expand Up @@ -185,10 +187,10 @@ public void testProvisionWorkflowTwice() {

// Bypass isWorkflowNotStarted and force false response
doAnswer(invocation -> {
Consumer<Boolean> boolConsumer = invocation.getArgument(1);
boolConsumer.accept(false);
Consumer<Optional<ProvisioningProgress>> progressConsumer = invocation.getArgument(1);
progressConsumer.accept(Optional.of(ProvisioningProgress.DONE));
return null;
}).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any());
}).when(flowFrameworkIndicesHandler).getProvisioningProgress(any(), any(), any());

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
Expand All @@ -200,7 +202,10 @@ public void testProvisionWorkflowTwice() {
provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("The template has already been provisioned: 2", exceptionCaptor.getValue().getMessage());
assertEquals(
"The workflow provisioning state is DONE and can not be provisioned unless its state is NOT_STARTED: 2. Deprovision the workflow to reset the state.",
exceptionCaptor.getValue().getMessage()
);
}

public void testFailedToRetrieveTemplateFromGlobalContext() {
Expand Down

0 comments on commit 452aec5

Please sign in to comment.