diff --git a/CHANGELOG.md b/CHANGELOG.md index a53d9b93a..b302d51e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,3 +23,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Documentation ### Maintenance ### Refactoring +- Improve error messages for workflow states other than NOT_STARTED ([#642](https://github.com/opensearch-project/flow-framework/pull/642)) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 0547c875a..9e7dd9433 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -49,6 +49,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -419,8 +420,8 @@ public void updateTemplateInGlobalContext( } doesTemplateExist(documentId, templateExists -> { if (templateExists) { - isWorkflowNotStarted(documentId, workflowIsNotStarted -> { - if (workflowIsNotStarted || ignoreNotStartedCheck) { + getProvisioningProgress(documentId, progress -> { + if (ignoreNotStartedCheck || ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) { IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId); try ( XContentBuilder builder = XContentFactory.jsonBuilder(); @@ -436,7 +437,9 @@ public void updateTemplateInGlobalContext( listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); } } else { - String errorMessage = "The template has already been provisioned so it can't be updated: " + documentId; + String errorMessage = "The template can not be updated unless its provisioning state is NOT_STARTED: " + + documentId + + ". Deprovision the workflow to reset the state."; logger.error(errorMessage); listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); } @@ -474,20 +477,24 @@ public void doesTemplateExist(String documentId, Consumer booleanRe } /** - * Check if the workflow has been provisioned and executes the consumer by passing a boolean + * Check workflow provisioning state and executes the consumer * * @param documentId document id - * @param booleanResultConsumer boolean consumer function based on if workflow is provisioned or not + * @param provisioningProgressConsumer consumer function based on if workflow is provisioned. * @param listener action listener * @param action listener response type */ - public void isWorkflowNotStarted(String documentId, Consumer booleanResultConsumer, ActionListener listener) { + public void getProvisioningProgress( + String documentId, + Consumer> provisioningProgressConsumer, + ActionListener listener + ) { GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { client.get(getRequest, ActionListener.wrap(response -> { context.restore(); if (!response.isExists()) { - booleanResultConsumer.accept(false); + provisioningProgressConsumer.accept(Optional.empty()); return; } try ( @@ -495,7 +502,7 @@ public void isWorkflowNotStarted(String documentId, Consumer boolea ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); WorkflowState workflowState = WorkflowState.parse(parser); - booleanResultConsumer.accept(workflowState.getProvisioningProgress().equals(ProvisioningProgress.NOT_STARTED.name())); + provisioningProgressConsumer.accept(Optional.of(ProvisioningProgress.valueOf(workflowState.getProvisioningProgress()))); } catch (Exception e) { String errorMessage = "Failed to parse workflow state " + documentId; logger.error(errorMessage, e); @@ -503,7 +510,7 @@ public void isWorkflowNotStarted(String documentId, Consumer boolea } }, exception -> { logger.error("Failed to get workflow state for {} ", documentId); - booleanResultConsumer.accept(false); + provisioningProgressConsumer.accept(Optional.empty()); })); } catch (Exception e) { String errorMessage = "Failed to retrieve workflow state to check provisioning status"; diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 18ca73f26..add83edfe 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -42,7 +42,6 @@ import java.util.Map; import java.util.stream.Collectors; -import static java.lang.Boolean.TRUE; import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD; @@ -132,8 +131,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - if (TRUE.equals(workflowIsNotStarted)) { + flowFrameworkIndicesHandler.getProvisioningProgress(workflowId, progress -> { + if (ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) { // update state index flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( workflowId, @@ -174,7 +173,11 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener function = mock(Consumer.class); + Consumer> function = mock(Consumer.class); ActionListener listener = mock(ActionListener.class); WorkflowState workFlowState = new WorkflowState( documentId, @@ -277,7 +279,7 @@ public void testIsWorkflowProvisionedFailedParsing() { responseListener.onResponse(new GetResponse(getResult)); return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.isWorkflowNotStarted(documentId, function, listener); + flowFrameworkIndicesHandler.getProvisioningProgress(documentId, function, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); assertTrue(exceptionCaptor.getValue().getMessage().contains("Failed to parse workflow state")); diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 7e006d693..ada8d5513 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -110,7 +110,9 @@ public void testFailedUpdateWorkflow() throws Exception { ResponseException.class, () -> updateWorkflow(client(), workflowId, template) ); - assertTrue(exceptionProvisioned.getMessage().contains("The template has already been provisioned so it can't be updated")); + assertTrue( + exceptionProvisioned.getMessage().contains("The template can not be updated unless its provisioning state is NOT_STARTED") + ); } diff --git a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java index 9e60c0407..5cc11a92d 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java @@ -24,6 +24,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.TestHelpers; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.model.ProvisioningProgress; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.model.WorkflowEdge; @@ -40,6 +41,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Consumer; import org.mockito.ArgumentCaptor; @@ -138,10 +140,10 @@ public void testProvisionWorkflow() { // Bypass isWorkflowNotStarted and force true response doAnswer(invocation -> { - Consumer boolConsumer = invocation.getArgument(1); - boolConsumer.accept(true); + Consumer> progressConsumer = invocation.getArgument(1); + progressConsumer.accept(Optional.of(ProvisioningProgress.NOT_STARTED)); return null; - }).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any()); + }).when(flowFrameworkIndicesHandler).getProvisioningProgress(any(), any(), any()); // Bypass updateFlowFrameworkSystemIndexDoc and stub on response doAnswer(invocation -> { @@ -185,10 +187,10 @@ public void testProvisionWorkflowTwice() { // Bypass isWorkflowNotStarted and force false response doAnswer(invocation -> { - Consumer boolConsumer = invocation.getArgument(1); - boolConsumer.accept(false); + Consumer> progressConsumer = invocation.getArgument(1); + progressConsumer.accept(Optional.of(ProvisioningProgress.DONE)); return null; - }).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any()); + }).when(flowFrameworkIndicesHandler).getProvisioningProgress(any(), any(), any()); // Bypass updateFlowFrameworkSystemIndexDoc and stub on response doAnswer(invocation -> { @@ -200,7 +202,10 @@ public void testProvisionWorkflowTwice() { provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); - assertEquals("The template has already been provisioned: 2", exceptionCaptor.getValue().getMessage()); + assertEquals( + "The workflow provisioning state is DONE and can not be provisioned unless its state is NOT_STARTED: 2. Deprovision the workflow to reset the state.", + exceptionCaptor.getValue().getMessage() + ); } public void testFailedToRetrieveTemplateFromGlobalContext() {