Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates exception handling with FlowFrameworkException and fix Global Context mapping #106

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
package org.opensearch.flowframework.exception;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Representation of Flow Framework Exceptions
*/
public class FlowFrameworkException extends RuntimeException {
public class FlowFrameworkException extends RuntimeException implements ToXContentObject {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -60,4 +64,11 @@ public FlowFrameworkException(String message, Throwable cause, RestStatus restSt
public RestStatus getRestStatus() {
return restStatus;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

return builder.startObject().field("error", "Request failed with exception: [" + this.getMessage() + "]").endObject();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
Expand All @@ -19,6 +20,7 @@
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand All @@ -29,7 +31,6 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.workflow.CreateIndexStep.getIndexMappings;
Expand Down Expand Up @@ -73,20 +74,22 @@ private void initGlobalContextIndexIfAbsent(ActionListener<Boolean> listener) {
public void putTemplateToGlobalContext(Template template, ActionListener<IndexResponse> listener) {
initGlobalContextIndexIfAbsent(ActionListener.wrap(indexCreated -> {
if (!indexCreated) {
listener.onFailure(new FlowFrameworkException("No response to create global_context index", INTERNAL_SERVER_ERROR));
listener.onFailure(
new FlowFrameworkException("No response to create global_context index", RestStatus.INTERNAL_SERVER_ERROR)
);
return;
}
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
request.source(template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS))
request.source(template.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to index global_context index");
listener.onFailure(e);
listener.onFailure(new FlowFrameworkException("Failed to index global_context index", e, ExceptionsHelper.status(e)));
}
}, e -> {
logger.error("Failed to create global_context index", e);
Expand All @@ -113,7 +116,7 @@ public void updateTemplateInGlobalContext(String documentId, Template template,
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
request.source(template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS))
request.source(template.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand Down
225 changes: 1 addition & 224 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map.Entry;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.TemplateUtil.jsonToParser;
import static org.opensearch.flowframework.common.TemplateUtil.parseStringToStringMap;

/**
Expand Down Expand Up @@ -161,228 +160,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return xContentBuilder.endObject();
}

/**
* Converts a template object into a Global Context document
* @param builder the XContentBuilder
* @param params the params
* @return the XContentBuilder
* @throws IOException if the document source fails to be generated
*/
public XContentBuilder toDocumentSource(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(NAME_FIELD, this.name);
xContentBuilder.field(DESCRIPTION_FIELD, this.description);
xContentBuilder.field(USE_CASE_FIELD, this.useCase);
xContentBuilder.startArray(OPERATIONS_FIELD);
for (String op : this.operations) {
xContentBuilder.value(op);
}
xContentBuilder.endArray();

if (this.templateVersion != null || !this.compatibilityVersion.isEmpty()) {
xContentBuilder.startObject(VERSION_FIELD);
if (this.templateVersion != null) {
xContentBuilder.field(TEMPLATE_FIELD, this.templateVersion);
}
if (!this.compatibilityVersion.isEmpty()) {
xContentBuilder.startArray(COMPATIBILITY_FIELD);
for (Version v : this.compatibilityVersion) {
xContentBuilder.value(v);
}
xContentBuilder.endArray();
}
xContentBuilder.endObject();
}

if (!this.userInputs.isEmpty()) {
xContentBuilder.startObject(USER_INPUTS_FIELD);
for (Entry<String, Object> e : userInputs.entrySet()) {
xContentBuilder.field(e.getKey(), e.getValue());
}
xContentBuilder.endObject();
}

try (XContentBuilder workflowBuilder = JsonXContent.contentBuilder()) {
workflowBuilder.startObject();
for (Entry<String, Workflow> e : workflows.entrySet()) {
workflowBuilder.field(e.getKey(), e.getValue());
}
workflowBuilder.endObject();
xContentBuilder.field(WORKFLOWS_FIELD, workflowBuilder.toString());
}

try (XContentBuilder userOutputsBuilder = JsonXContent.contentBuilder()) {
userOutputsBuilder.startObject();
for (Entry<String, Object> e : userOutputs.entrySet()) {
userOutputsBuilder.field(e.getKey(), e.getValue());
}
userOutputsBuilder.endObject();
xContentBuilder.field(USER_OUTPUTS_FIELD, userOutputsBuilder.toString());
}

try (XContentBuilder resourcesCreatedBuilder = JsonXContent.contentBuilder()) {
resourcesCreatedBuilder.startObject();
for (Entry<String, Object> e : resourcesCreated.entrySet()) {
resourcesCreatedBuilder.field(e.getKey(), e.getValue());
}
resourcesCreatedBuilder.endObject();
xContentBuilder.field(RESOURCES_CREATED_FIELD, resourcesCreatedBuilder.toString());
}

xContentBuilder.endObject();

return xContentBuilder;

}

/**
* Parse global context document source into a Template instance
*
* @param documentSource the document source string
* @return an instance of the template
* @throws IOException if content can't be parsed correctly
*/
public static Template parseFromDocumentSource(String documentSource) throws IOException {
XContentParser parser = jsonToParser(documentSource);

String name = null;
String description = "";
String useCase = "";
List<String> operations = new ArrayList<>();
Version templateVersion = null;
List<Version> compatibilityVersion = new ArrayList<>();
Map<String, Object> userInputs = new HashMap<>();
Map<String, Workflow> workflows = new HashMap<>();
Map<String, Object> userOutputs = new HashMap<>();
Map<String, Object> resourcesCreated = new HashMap<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case NAME_FIELD:
name = parser.text();
break;
case DESCRIPTION_FIELD:
description = parser.text();
break;
case USE_CASE_FIELD:
useCase = parser.text();
break;
case OPERATIONS_FIELD:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
operations.add(parser.text());
}
break;
case VERSION_FIELD:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String versionFieldName = parser.currentName();
parser.nextToken();
switch (versionFieldName) {
case TEMPLATE_FIELD:
templateVersion = Version.fromString(parser.text());
break;
case COMPATIBILITY_FIELD:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
compatibilityVersion.add(Version.fromString(parser.text()));
}
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a version object.");
}
}
break;
case USER_INPUTS_FIELD:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String inputFieldName = parser.currentName();
switch (parser.nextToken()) {
case VALUE_STRING:
userInputs.put(inputFieldName, parser.text());
break;
case START_OBJECT:
userInputs.put(inputFieldName, parseStringToStringMap(parser));
break;
default:
throw new IOException("Unable to parse field [" + inputFieldName + "] in a user inputs object.");
}
}
break;
case WORKFLOWS_FIELD:
String workflowsJson = parser.text();
XContentParser workflowsParser = jsonToParser(workflowsJson);
while (workflowsParser.nextToken() != XContentParser.Token.END_OBJECT) {
String workflowFieldName = workflowsParser.currentName();
workflowsParser.nextToken();
workflows.put(workflowFieldName, Workflow.parse(workflowsParser));
}
break;
case USER_OUTPUTS_FIELD:

String userOutputsJson = parser.text();
XContentParser userOuputsParser = jsonToParser(userOutputsJson);
while (userOuputsParser.nextToken() != XContentParser.Token.END_OBJECT) {
String userOutputsFieldName = userOuputsParser.currentName();
switch (userOuputsParser.nextToken()) {
case VALUE_STRING:
userOutputs.put(userOutputsFieldName, userOuputsParser.text());
break;
case START_OBJECT:
userOutputs.put(userOutputsFieldName, parseStringToStringMap(userOuputsParser));
break;
default:
throw new IOException("Unable to parse field [" + userOutputsFieldName + "] in a user_outputs object.");
}
}
break;

case RESOURCES_CREATED_FIELD:

String resourcesCreatedJson = parser.text();
XContentParser resourcesCreatedParser = jsonToParser(resourcesCreatedJson);
while (resourcesCreatedParser.nextToken() != XContentParser.Token.END_OBJECT) {
String resourcesCreatedField = resourcesCreatedParser.currentName();
switch (resourcesCreatedParser.nextToken()) {
case VALUE_STRING:
resourcesCreated.put(resourcesCreatedField, resourcesCreatedParser.text());
break;
case START_OBJECT:
resourcesCreated.put(resourcesCreatedField, parseStringToStringMap(resourcesCreatedParser));
break;
default:
throw new IOException(
"Unable to parse field [" + resourcesCreatedField + "] in a resources_created object."
);
}
}
break;

default:
throw new IOException("Unable to parse field [" + fieldName + "] in a template object.");
}
}
if (name == null) {
throw new IOException("An template object requires a name.");
}

return new Template(
name,
description,
useCase,
operations,
templateVersion,
compatibilityVersion,
userInputs,
workflows,
userOutputs,
resourcesCreated
);
}

/**
* Parse raw json content into a Template instance.
*
Expand Down Expand Up @@ -507,7 +284,7 @@ public static Template parse(XContentParser parser) throws IOException {
}
}
if (name == null) {
throw new IOException("An template object requires a name.");
throw new IOException("A template object requires a name.");
}

return new Template(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;
Expand All @@ -29,6 +36,8 @@
*/
public class RestCreateWorkflowAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);

private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";

/**
Expand All @@ -53,11 +62,29 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
try {
String workflowId = request.param(WORKFLOW_ID);
Template template = Template.parse(request.content().utf8ToString());
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template);
return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.CREATED, builder));
}, exception -> {
try {
FlowFrameworkException ex = (FlowFrameworkException) exception;
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
logger.error("Failed to send back create workflow exception", e);
}
}));
} catch (Exception e) {
FlowFrameworkException ex = new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST);
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}

String workflowId = request.param(WORKFLOW_ID);
Template template = Template.parse(request.content().utf8ToString());
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template);
return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel));
}

}
Loading