diff --git a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java index f3cb55950..b95da30bb 100644 --- a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java +++ b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java @@ -9,11 +9,15 @@ package org.opensearch.flowframework.exception; import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; /** * Representation of Flow Framework Exceptions */ -public class FlowFrameworkException extends RuntimeException { +public class FlowFrameworkException extends RuntimeException implements ToXContentObject { private static final long serialVersionUID = 1L; @@ -60,4 +64,11 @@ public FlowFrameworkException(String message, Throwable cause, RestStatus restSt public RestStatus getRestStatus() { return restStatus; } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + + return builder.startObject().field("error", "Request failed with exception: [" + this.getMessage() + "]").endObject(); + + } } diff --git a/src/main/java/org/opensearch/flowframework/indices/GlobalContextHandler.java b/src/main/java/org/opensearch/flowframework/indices/GlobalContextHandler.java index 53037d7ce..56ced6dce 100644 --- a/src/main/java/org/opensearch/flowframework/indices/GlobalContextHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/GlobalContextHandler.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; @@ -19,6 +20,7 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.exception.FlowFrameworkException; @@ -29,7 +31,6 @@ import java.util.HashMap; import java.util.Map; -import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING; import static org.opensearch.flowframework.workflow.CreateIndexStep.getIndexMappings; @@ -73,7 +74,9 @@ private void initGlobalContextIndexIfAbsent(ActionListener listener) { public void putTemplateToGlobalContext(Template template, ActionListener listener) { initGlobalContextIndexIfAbsent(ActionListener.wrap(indexCreated -> { if (!indexCreated) { - listener.onFailure(new FlowFrameworkException("No response to create global_context index", INTERNAL_SERVER_ERROR)); + listener.onFailure( + new FlowFrameworkException("No response to create global_context index", RestStatus.INTERNAL_SERVER_ERROR) + ); return; } IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX); @@ -81,12 +84,12 @@ public void putTemplateToGlobalContext(Template template, ActionListener context.restore())); } catch (Exception e) { logger.error("Failed to index global_context index"); - listener.onFailure(e); + listener.onFailure(new FlowFrameworkException("Failed to index global_context index", e, ExceptionsHelper.status(e))); } }, e -> { logger.error("Failed to create global_context index", e); @@ -113,7 +116,7 @@ public void updateTemplateInGlobalContext(String documentId, Template template, XContentBuilder builder = XContentFactory.jsonBuilder(); ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext() ) { - request.source(template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS)) + request.source(template.toXContent(builder, ToXContent.EMPTY_PARAMS)) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); client.index(request, ActionListener.runBefore(listener, () -> context.restore())); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index 7d08ef240..0fd06aea3 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -25,7 +25,6 @@ import java.util.Map.Entry; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.opensearch.flowframework.common.TemplateUtil.jsonToParser; import static org.opensearch.flowframework.common.TemplateUtil.parseStringToStringMap; /** @@ -161,228 +160,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return xContentBuilder.endObject(); } - /** - * Converts a template object into a Global Context document - * @param builder the XContentBuilder - * @param params the params - * @return the XContentBuilder - * @throws IOException if the document source fails to be generated - */ - public XContentBuilder toDocumentSource(XContentBuilder builder, Params params) throws IOException { - XContentBuilder xContentBuilder = builder.startObject(); - xContentBuilder.field(NAME_FIELD, this.name); - xContentBuilder.field(DESCRIPTION_FIELD, this.description); - xContentBuilder.field(USE_CASE_FIELD, this.useCase); - xContentBuilder.startArray(OPERATIONS_FIELD); - for (String op : this.operations) { - xContentBuilder.value(op); - } - xContentBuilder.endArray(); - - if (this.templateVersion != null || !this.compatibilityVersion.isEmpty()) { - xContentBuilder.startObject(VERSION_FIELD); - if (this.templateVersion != null) { - xContentBuilder.field(TEMPLATE_FIELD, this.templateVersion); - } - if (!this.compatibilityVersion.isEmpty()) { - xContentBuilder.startArray(COMPATIBILITY_FIELD); - for (Version v : this.compatibilityVersion) { - xContentBuilder.value(v); - } - xContentBuilder.endArray(); - } - xContentBuilder.endObject(); - } - - if (!this.userInputs.isEmpty()) { - xContentBuilder.startObject(USER_INPUTS_FIELD); - for (Entry e : userInputs.entrySet()) { - xContentBuilder.field(e.getKey(), e.getValue()); - } - xContentBuilder.endObject(); - } - - try (XContentBuilder workflowBuilder = JsonXContent.contentBuilder()) { - workflowBuilder.startObject(); - for (Entry e : workflows.entrySet()) { - workflowBuilder.field(e.getKey(), e.getValue()); - } - workflowBuilder.endObject(); - xContentBuilder.field(WORKFLOWS_FIELD, workflowBuilder.toString()); - } - - try (XContentBuilder userOutputsBuilder = JsonXContent.contentBuilder()) { - userOutputsBuilder.startObject(); - for (Entry e : userOutputs.entrySet()) { - userOutputsBuilder.field(e.getKey(), e.getValue()); - } - userOutputsBuilder.endObject(); - xContentBuilder.field(USER_OUTPUTS_FIELD, userOutputsBuilder.toString()); - } - - try (XContentBuilder resourcesCreatedBuilder = JsonXContent.contentBuilder()) { - resourcesCreatedBuilder.startObject(); - for (Entry e : resourcesCreated.entrySet()) { - resourcesCreatedBuilder.field(e.getKey(), e.getValue()); - } - resourcesCreatedBuilder.endObject(); - xContentBuilder.field(RESOURCES_CREATED_FIELD, resourcesCreatedBuilder.toString()); - } - - xContentBuilder.endObject(); - - return xContentBuilder; - - } - - /** - * Parse global context document source into a Template instance - * - * @param documentSource the document source string - * @return an instance of the template - * @throws IOException if content can't be parsed correctly - */ - public static Template parseFromDocumentSource(String documentSource) throws IOException { - XContentParser parser = jsonToParser(documentSource); - - String name = null; - String description = ""; - String useCase = ""; - List operations = new ArrayList<>(); - Version templateVersion = null; - List compatibilityVersion = new ArrayList<>(); - Map userInputs = new HashMap<>(); - Map workflows = new HashMap<>(); - Map userOutputs = new HashMap<>(); - Map resourcesCreated = new HashMap<>(); - - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case NAME_FIELD: - name = parser.text(); - break; - case DESCRIPTION_FIELD: - description = parser.text(); - break; - case USE_CASE_FIELD: - useCase = parser.text(); - break; - case OPERATIONS_FIELD: - ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - operations.add(parser.text()); - } - break; - case VERSION_FIELD: - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String versionFieldName = parser.currentName(); - parser.nextToken(); - switch (versionFieldName) { - case TEMPLATE_FIELD: - templateVersion = Version.fromString(parser.text()); - break; - case COMPATIBILITY_FIELD: - ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - compatibilityVersion.add(Version.fromString(parser.text())); - } - break; - default: - throw new IOException("Unable to parse field [" + fieldName + "] in a version object."); - } - } - break; - case USER_INPUTS_FIELD: - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String inputFieldName = parser.currentName(); - switch (parser.nextToken()) { - case VALUE_STRING: - userInputs.put(inputFieldName, parser.text()); - break; - case START_OBJECT: - userInputs.put(inputFieldName, parseStringToStringMap(parser)); - break; - default: - throw new IOException("Unable to parse field [" + inputFieldName + "] in a user inputs object."); - } - } - break; - case WORKFLOWS_FIELD: - String workflowsJson = parser.text(); - XContentParser workflowsParser = jsonToParser(workflowsJson); - while (workflowsParser.nextToken() != XContentParser.Token.END_OBJECT) { - String workflowFieldName = workflowsParser.currentName(); - workflowsParser.nextToken(); - workflows.put(workflowFieldName, Workflow.parse(workflowsParser)); - } - break; - case USER_OUTPUTS_FIELD: - - String userOutputsJson = parser.text(); - XContentParser userOuputsParser = jsonToParser(userOutputsJson); - while (userOuputsParser.nextToken() != XContentParser.Token.END_OBJECT) { - String userOutputsFieldName = userOuputsParser.currentName(); - switch (userOuputsParser.nextToken()) { - case VALUE_STRING: - userOutputs.put(userOutputsFieldName, userOuputsParser.text()); - break; - case START_OBJECT: - userOutputs.put(userOutputsFieldName, parseStringToStringMap(userOuputsParser)); - break; - default: - throw new IOException("Unable to parse field [" + userOutputsFieldName + "] in a user_outputs object."); - } - } - break; - - case RESOURCES_CREATED_FIELD: - - String resourcesCreatedJson = parser.text(); - XContentParser resourcesCreatedParser = jsonToParser(resourcesCreatedJson); - while (resourcesCreatedParser.nextToken() != XContentParser.Token.END_OBJECT) { - String resourcesCreatedField = resourcesCreatedParser.currentName(); - switch (resourcesCreatedParser.nextToken()) { - case VALUE_STRING: - resourcesCreated.put(resourcesCreatedField, resourcesCreatedParser.text()); - break; - case START_OBJECT: - resourcesCreated.put(resourcesCreatedField, parseStringToStringMap(resourcesCreatedParser)); - break; - default: - throw new IOException( - "Unable to parse field [" + resourcesCreatedField + "] in a resources_created object." - ); - } - } - break; - - default: - throw new IOException("Unable to parse field [" + fieldName + "] in a template object."); - } - } - if (name == null) { - throw new IOException("An template object requires a name."); - } - - return new Template( - name, - description, - useCase, - operations, - templateVersion, - compatibilityVersion, - userInputs, - workflows, - userOutputs, - resourcesCreated - ); - } - /** * Parse raw json content into a Template instance. * @@ -507,7 +284,7 @@ public static Template parse(XContentParser parser) throws IOException { } } if (name == null) { - throw new IOException("An template object requires a name."); + throw new IOException("A template object requires a name."); } return new Template( diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index ace440f75..48401a611 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -9,13 +9,20 @@ package org.opensearch.flowframework.rest; import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.client.node.NodeClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.transport.CreateWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; -import org.opensearch.rest.action.RestToXContentListener; import java.io.IOException; import java.util.List; @@ -29,6 +36,8 @@ */ public class RestCreateWorkflowAction extends BaseRestHandler { + private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class); + private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action"; /** @@ -53,11 +62,29 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + try { + String workflowId = request.param(WORKFLOW_ID); + Template template = Template.parse(request.content().utf8ToString()); + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template); + return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { + XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); + channel.sendResponse(new BytesRestResponse(RestStatus.CREATED, builder)); + }, exception -> { + try { + FlowFrameworkException ex = (FlowFrameworkException) exception; + XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); + channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); + } catch (IOException e) { + logger.error("Failed to send back create workflow exception", e); + } + })); + } catch (Exception e) { + FlowFrameworkException ex = new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST); + return channel -> channel.sendResponse( + new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } - String workflowId = request.param(WORKFLOW_ID); - Template template = Template.parse(request.content().utf8ToString()); - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template); - return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel)); } } diff --git a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java index 89471ee00..482679b88 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java @@ -9,14 +9,19 @@ package org.opensearch.flowframework.rest; import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.client.node.NodeClient; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.transport.ProvisionWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; -import org.opensearch.rest.action.RestToXContentListener; import java.io.IOException; import java.util.List; @@ -30,6 +35,8 @@ */ public class RestProvisionWorkflowAction extends BaseRestHandler { + private static final Logger logger = LogManager.getLogger(RestProvisionWorkflowAction.class); + private static final String PROVISION_WORKFLOW_ACTION = "provision_workflow_action"; /** @@ -53,20 +60,35 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - // Validate content - if (request.hasContent()) { - throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST); - } - - // Validate params String workflowId = request.param(WORKFLOW_ID); - if (workflowId == null) { - throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); + try { + // Validate content + if (request.hasContent()) { + throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST); + } + // Validate params + if (workflowId == null) { + throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); + } + // Create request and provision + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { + XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + }, exception -> { + try { + FlowFrameworkException ex = (FlowFrameworkException) exception; + XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); + channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); + } catch (IOException e) { + logger.error("Failed to send back provision workflow exception", e); + } + })); + } catch (FlowFrameworkException ex) { + return channel -> channel.sendResponse( + new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) + ); } - - // Create request and provision - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); - return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel)); } } diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index f4147b144..a754e0135 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -14,8 +14,6 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.GlobalContextHandler; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -54,8 +52,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - logger.error("Failed to save use case template : {}", exception.getMessage()); - listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); + logger.error("Failed to save use case template", exception); + listener.onFailure(exception); })); } else { // Update existing entry, full document replacement @@ -66,8 +64,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage()); - listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); + logger.error("Failed to update use case template", exception); + listener.onFailure(exception); }) ); } diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index e03a1b4d8..22ab8d9d7 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.get.GetRequest; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; @@ -95,7 +96,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { logger.error("Failed to retrieve template from global context.", exception); - listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); + listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); })); } catch (Exception e) { logger.error("Failed to retrieve template from global context.", e); - listener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); + listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); } } @@ -131,7 +132,7 @@ private void executeWorkflowAsync(String workflowId, Workflow workflow) { try { threadPool.executor(PROVISION_THREAD_POOL).execute(() -> { executeWorkflow(workflow, provisionWorkflowListener); }); } catch (Exception exception) { - provisionWorkflowListener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); + provisionWorkflowListener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); } } @@ -171,10 +172,10 @@ private void executeWorkflow(Workflow workflow, ActionListener workflowL // TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now workflowListener.onResponse("READY"); - } catch (IllegalArgumentException e) { - workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST)); + } catch (FlowFrameworkException e) { + workflowListener.onFailure(e); } catch (CancellationException | CompletionException ex) { - workflowListener.onFailure(new FlowFrameworkException(ex.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); + workflowListener.onFailure(new FlowFrameworkException(ex.getCause().getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java index 2b2f7338d..5fd5dc8bf 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java @@ -12,6 +12,7 @@ import com.google.common.io.Resources; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -24,6 +25,7 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndex; @@ -35,7 +37,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; -import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.flowframework.common.CommonValue.META; import static org.opensearch.flowframework.common.CommonValue.NO_SCHEMA_VERSION; import static org.opensearch.flowframework.common.CommonValue.SCHEMA_VERSION_FIELD; @@ -79,7 +80,7 @@ public void onResponse(CreateIndexResponse createIndexResponse) { @Override public void onFailure(Exception e) { logger.error("Failed to create an index", e); - future.completeExceptionally(e); + future.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); } }; @@ -149,7 +150,9 @@ public void initIndexIfAbsent(FlowFrameworkIndex index, ActionListener } }, e -> { logger.error("Failed to create index " + indexName, e); - internalListener.onFailure(e); + internalListener.onFailure( + new FlowFrameworkException("Failed to create index " + indexName, e, ExceptionsHelper.status(e)) + ); }); CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings); client.admin().indices().create(request, actionListener); @@ -177,22 +180,37 @@ public void initIndexIfAbsent(FlowFrameworkIndex index, ActionListener internalListener.onFailure( new FlowFrameworkException( "Failed to update index setting for: " + indexName, - INTERNAL_SERVER_ERROR + RestStatus.INTERNAL_SERVER_ERROR ) ); } }, exception -> { logger.error("Failed to update index setting for: " + indexName, exception); - internalListener.onFailure(exception); + internalListener.onFailure( + new FlowFrameworkException( + "Failed to udpate index setting for: " + indexName, + exception, + ExceptionsHelper.status(exception) + ) + ); })); } else { internalListener.onFailure( - new FlowFrameworkException("Failed to update index: " + indexName, INTERNAL_SERVER_ERROR) + new FlowFrameworkException( + "Failed to update index: " + indexName, + RestStatus.INTERNAL_SERVER_ERROR + ) ); } }, exception -> { - logger.error("Failed to update index " + indexName, exception); - internalListener.onFailure(exception); + logger.error("Failed to update index: " + indexName, exception); + internalListener.onFailure( + new FlowFrameworkException( + "Failed to update index: " + indexName, + exception, + ExceptionsHelper.status(exception) + ) + ); }) ); } else { @@ -202,7 +220,9 @@ public void initIndexIfAbsent(FlowFrameworkIndex index, ActionListener } }, e -> { logger.error("Failed to update index mapping", e); - internalListener.onFailure(e); + internalListener.onFailure( + new FlowFrameworkException("Failed to update index mapping", e, ExceptionsHelper.status(e)) + ); })); } else { // No need to update index if it's already updated. @@ -210,8 +230,8 @@ public void initIndexIfAbsent(FlowFrameworkIndex index, ActionListener } } } catch (Exception e) { - logger.error("Failed to init index " + indexName, e); - listener.onFailure(e); + logger.error("Failed to init index: " + indexName, e); + listener.onFailure(new FlowFrameworkException("Failed to init index: " + indexName, e, ExceptionsHelper.status(e))); } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java b/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java index 2f902755c..4b10ba4e6 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java @@ -11,13 +11,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.threadpool.Scheduler.ScheduledCancellable; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; /** @@ -126,7 +127,7 @@ public TimeValue nodeTimeout() { */ public CompletableFuture execute() { if (this.future.isDone()) { - throw new IllegalStateException("Process Node [" + this.id + "] already executed."); + throw new FlowFrameworkException("Process Node [" + this.id + "] already executed.", RestStatus.BAD_REQUEST); } CompletableFuture.runAsync(() -> { List> predFutures = predecessors.stream().map(p -> p.future()).collect(Collectors.toList()); @@ -148,7 +149,9 @@ public CompletableFuture execute() { if (this.nodeTimeout.compareTo(TimeValue.ZERO) > 0) { delayExec = threadPool.schedule(() -> { if (!future.isDone()) { - future.completeExceptionally(new TimeoutException("Execute timed out for " + this.id)); + future.completeExceptionally( + new FlowFrameworkException("Execute timed out for " + this.id, RestStatus.REQUEST_TIMEOUT) + ); } }, this.nodeTimeout, ThreadPool.Names.SAME); } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 71c44514e..5a42fc839 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -11,6 +11,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.model.WorkflowEdge; import org.opensearch.flowframework.model.WorkflowNode; @@ -87,8 +89,9 @@ private TimeValue parseTimeout(WorkflowNode node) { String fieldName = String.join(".", node.id(), INPUTS_FIELD, NODE_TIMEOUT_FIELD); TimeValue timeValue = TimeValue.parseTimeValue(timeoutValue, fieldName); if (timeValue.millis() < 0) { - throw new IllegalArgumentException( - "Failed to parse timeout value [" + timeoutValue + "] for field [" + fieldName + "]. Must be positive" + throw new FlowFrameworkException( + "Failed to parse timeout value [" + timeoutValue + "] for field [" + fieldName + "]. Must be positive", + RestStatus.BAD_REQUEST ); } return timeValue; @@ -100,14 +103,14 @@ private static List topologicalSort(List workflowNod for (WorkflowEdge edge : workflowEdges) { String source = edge.source(); if (!nodeIds.contains(source)) { - throw new IllegalArgumentException("Edge source " + source + " does not correspond to a node."); + throw new FlowFrameworkException("Edge source " + source + " does not correspond to a node.", RestStatus.BAD_REQUEST); } String dest = edge.destination(); if (!nodeIds.contains(dest)) { - throw new IllegalArgumentException("Edge destination " + dest + " does not correspond to a node."); + throw new FlowFrameworkException("Edge destination " + dest + " does not correspond to a node.", RestStatus.BAD_REQUEST); } if (source.equals(dest)) { - throw new IllegalArgumentException("Edge connects node " + source + " to itself."); + throw new FlowFrameworkException("Edge connects node " + source + " to itself.", RestStatus.BAD_REQUEST); } } @@ -130,7 +133,7 @@ private static List topologicalSort(List workflowNod Queue sourceNodes = new ArrayDeque<>(); workflowNodes.stream().filter(n -> !predecessorEdges.containsKey(n)).forEach(n -> sourceNodes.add(n)); if (sourceNodes.isEmpty()) { - throw new IllegalArgumentException("No start node detected: all nodes have a predecessor."); + throw new FlowFrameworkException("No start node detected: all nodes have a predecessor.", RestStatus.BAD_REQUEST); } logger.debug("Start node(s): {}", sourceNodes); @@ -153,7 +156,7 @@ private static List topologicalSort(List workflowNod } } if (!graph.isEmpty()) { - throw new IllegalArgumentException("Cycle detected: " + graph); + throw new FlowFrameworkException("Cycle detected: " + graph, RestStatus.BAD_REQUEST); } logger.debug("Execution sequence: {}", sortedNodes); return sortedNodes; diff --git a/src/main/resources/mappings/global-context.json b/src/main/resources/mappings/global-context.json index cd3ce4e6b..e4e5443c2 100644 --- a/src/main/resources/mappings/global-context.json +++ b/src/main/resources/mappings/global-context.json @@ -48,13 +48,13 @@ } }, "workflows": { - "type": "text" + "type": "object" }, "user_outputs": { - "type": "text" + "type": "object" }, "resources_created": { - "type": "text" + "type": "object" } } } diff --git a/src/test/java/org/opensearch/flowframework/indices/GlobalContextHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/GlobalContextHandlerTests.java index 78f0fc618..f177f51fb 100644 --- a/src/test/java/org/opensearch/flowframework/indices/GlobalContextHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/GlobalContextHandlerTests.java @@ -73,7 +73,7 @@ public void setUp() throws Exception { public void testPutTemplateToGlobalContext() throws IOException { Template template = mock(Template.class); - when(template.toDocumentSource(any(XContentBuilder.class), eq(ToXContent.EMPTY_PARAMS))).thenAnswer(invocation -> { + when(template.toXContent(any(XContentBuilder.class), eq(ToXContent.EMPTY_PARAMS))).thenAnswer(invocation -> { XContentBuilder builder = invocation.getArgument(0); return builder; }); @@ -112,7 +112,7 @@ public void testStoreResponseToGlobalContext() { public void testUpdateTemplateInGlobalContext() throws IOException { Template template = mock(Template.class); - when(template.toDocumentSource(any(XContentBuilder.class), eq(ToXContent.EMPTY_PARAMS))).thenAnswer(invocation -> { + when(template.toXContent(any(XContentBuilder.class), eq(ToXContent.EMPTY_PARAMS))).thenAnswer(invocation -> { XContentBuilder builder = invocation.getArgument(0); return builder; }); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java index fd07a91eb..2d98db7fc 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java @@ -11,6 +11,7 @@ import org.opensearch.Version; import org.opensearch.client.node.NodeClient; import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.Workflow; @@ -19,9 +20,9 @@ import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestChannel; import org.opensearch.test.rest.FakeRestRequest; -import java.io.IOException; import java.util.List; import java.util.Locale; import java.util.Map; @@ -87,13 +88,15 @@ public void testRestCreateWorkflowActionRoutes() { } - public void testInvalidCreateWorkflowRequest() throws IOException { + public void testInvalidCreateWorkflowRequest() throws Exception { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) .withPath(this.createWorkflowPath) .withContent(new BytesArray(invalidTemplate), MediaTypeRegistry.JSON) .build(); - IOException ex = expectThrows(IOException.class, () -> { createWorkflowRestAction.prepareRequest(request, nodeClient); }); - assertEquals("Unable to parse field [invalid] in a template object.", ex.getMessage()); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + createWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status()); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("Unable to parse field [invalid] in a template object.")); } } diff --git a/src/test/java/org/opensearch/flowframework/rest/RestProvisionWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestProvisionWorkflowActionTests.java index a44817cec..a04dc027b 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestProvisionWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestProvisionWorkflowActionTests.java @@ -12,13 +12,12 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestChannel; import org.opensearch.test.rest.FakeRestRequest; -import java.io.IOException; import java.util.List; import java.util.Locale; @@ -51,31 +50,34 @@ public void testRestProvisiionWorkflowActionRoutes() { assertEquals(this.provisionWorkflowPath, routes.get(0).getPath()); } - public void testNullWorkflowIdAndTemplate() throws IOException { + public void testNullWorkflowId() throws Exception { - // Request with no content or params + // Request with no params RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) .withPath(this.provisionWorkflowPath) .build(); - FlowFrameworkException ex = expectThrows(FlowFrameworkException.class, () -> { - provisionWorkflowRestAction.prepareRequest(request, nodeClient); - }); - assertEquals("workflow_id cannot be null", ex.getMessage()); - assertEquals(RestStatus.BAD_REQUEST, ex.getRestStatus()); + FakeRestChannel channel = new FakeRestChannel(request, true, 1); + provisionWorkflowRestAction.handleRequest(request, channel, nodeClient); + + assertEquals(1, channel.errors().get()); + assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status()); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("workflow_id cannot be null")); } - public void testInvalidRequestWithContent() throws IOException { + public void testInvalidRequestWithContent() { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) .withPath(this.provisionWorkflowPath) .withContent(new BytesArray("request body"), MediaTypeRegistry.JSON) .build(); - FlowFrameworkException ex = expectThrows(FlowFrameworkException.class, () -> { - provisionWorkflowRestAction.prepareRequest(request, nodeClient); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> { + provisionWorkflowRestAction.handleRequest(request, channel, nodeClient); }); - assertEquals("Invalid request format", ex.getMessage()); - assertEquals(RestStatus.BAD_REQUEST, ex.getRestStatus()); + assertEquals( + "request [POST /_plugins/_flow_framework/workflow/{workflow_id}/_provision] does not support having a body", + ex.getMessage() + ); } - } diff --git a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java index f8b2d8490..a821c1068 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java @@ -108,7 +108,7 @@ public void testProvisionWorkflow() { ActionListener responseListener = invocation.getArgument(1); XContentBuilder builder = XContentFactory.jsonBuilder(); - this.template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS); + this.template.toXContent(builder, ToXContent.EMPTY_PARAMS); BytesReference templateBytesRef = BytesReference.bytes(builder); GetResult getResult = new GetResult(GLOBAL_CONTEXT_INDEX, workflowId, 1, 1, 1, true, templateBytesRef, null, null); responseListener.onResponse(new GetResponse(getResult)); diff --git a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java index 1e421c58c..fbf036b16 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java @@ -9,6 +9,7 @@ package org.opensearch.flowframework.workflow; import org.opensearch.common.unit.TimeValue; +import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -22,7 +23,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -139,7 +139,7 @@ public String getName() { CompletableFuture f = nodeZ.execute(); CompletionException exception = assertThrows(CompletionException.class, () -> f.join()); assertTrue(f.isCompletedExceptionally()); - assertEquals(TimeoutException.class, exception.getCause().getClass()); + assertEquals(FlowFrameworkException.class, exception.getCause().getClass()); } public void testExceptions() { @@ -169,6 +169,6 @@ public String getName() { assertEquals("Test exception", exception.getCause().getMessage()); // Tests where we already called execute - assertThrows(IllegalStateException.class, () -> nodeE.execute()); + assertThrows(FlowFrameworkException.class, () -> nodeE.execute()); } } diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index e8ada0e15..6a85a7bf5 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -12,6 +12,7 @@ import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.model.TemplateTestJsonUtil; import org.opensearch.flowframework.model.Workflow; import org.opensearch.test.OpenSearchTestCase; @@ -134,23 +135,23 @@ public void testOrdering() throws IOException { public void testCycles() { Exception ex; - ex = assertThrows(IllegalArgumentException.class, () -> parse(workflow(List.of(node("A")), List.of(edge("A", "A"))))); + ex = assertThrows(FlowFrameworkException.class, () -> parse(workflow(List.of(node("A")), List.of(edge("A", "A"))))); assertEquals("Edge connects node A to itself.", ex.getMessage()); ex = assertThrows( - IllegalArgumentException.class, + FlowFrameworkException.class, () -> parse(workflow(List.of(node("A"), node("B")), List.of(edge("A", "B"), edge("B", "B")))) ); assertEquals("Edge connects node B to itself.", ex.getMessage()); ex = assertThrows( - IllegalArgumentException.class, + FlowFrameworkException.class, () -> parse(workflow(List.of(node("A"), node("B")), List.of(edge("A", "B"), edge("B", "A")))) ); assertEquals(NO_START_NODE_DETECTED, ex.getMessage()); ex = assertThrows( - IllegalArgumentException.class, + FlowFrameworkException.class, () -> parse(workflow(List.of(node("A"), node("B"), node("C")), List.of(edge("A", "B"), edge("B", "C"), edge("C", "B")))) ); assertTrue(ex.getMessage().startsWith(CYCLE_DETECTED)); @@ -158,7 +159,7 @@ public void testCycles() { assertTrue(ex.getMessage().contains("C->B")); ex = assertThrows( - IllegalArgumentException.class, + FlowFrameworkException.class, () -> parse( workflow( List.of(node("A"), node("B"), node("C"), node("D")), @@ -188,13 +189,13 @@ public void testNoEdges() throws IOException { } public void testExceptions() throws IOException { - Exception ex = assertThrows( - IllegalArgumentException.class, + FlowFrameworkException ex = assertThrows( + FlowFrameworkException.class, () -> parse(workflow(List.of(node("A"), node("B")), List.of(edge("C", "B")))) ); assertEquals("Edge source C does not correspond to a node.", ex.getMessage()); - ex = assertThrows(IllegalArgumentException.class, () -> parse(workflow(List.of(node("A"), node("B")), List.of(edge("A", "C"))))); + ex = assertThrows(FlowFrameworkException.class, () -> parse(workflow(List.of(node("A"), node("B")), List.of(edge("A", "C"))))); assertEquals("Edge destination C does not correspond to a node.", ex.getMessage()); } }