Skip to content

Commit

Permalink
[feature/agent_framework] Changing resources created format (#231)
Browse files Browse the repository at this point in the history
* adding new resources created format and adding enum for resource types

Signed-off-by: Amit Galitzky <[email protected]>

* remove spotless from java 17

Signed-off-by: Amit Galitzky <[email protected]>

* add action listener to update resource created

Signed-off-by: Amit Galitzky <[email protected]>

* fixing UT

Signed-off-by: Amit Galitzky <[email protected]>

* changed exception type

Signed-off-by: Amit Galitzky <[email protected]>

---------

Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz authored and dbwiddis committed Dec 18, 2023
1 parent de8f151 commit feae94c
Show file tree
Hide file tree
Showing 22 changed files with 581 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ private CommonValue() {}
public static final String INDEX_NAME = "index_name";
/** Type field */
public static final String TYPE = "type";
/** default_mapping_option filed */
public static final String DEFAULT_MAPPING_OPTION = "default_mapping_option";
/** ID Field */
public static final String ID = "id";
/** Pipeline Id field */
Expand All @@ -103,6 +105,8 @@ private CommonValue() {}
public static final String MODEL_VERSION = "model_version";
/** Model Group Id field */
public static final String MODEL_GROUP_ID = "model_group_id";
/** Model Group Id field */
public static final String MODEL_GROUP_STATUS = "model_group_status";
/** Description field */
public static final String DESCRIPTION_FIELD = "description";
/** Connector Id field */
Expand Down Expand Up @@ -158,10 +162,10 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The field name for the ResourceCreated's resource ID */
public static final String RESOURCE_ID_FIELD = "resource_id";
/** The field name for the ResourceCreated's resource name */
/** The field name for the step name where a resource is created */
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";
/** The field name for the step ID where a resource is created */
public static final String WORKFLOW_STEP_ID = "workflow_step_id";
/** LLM Name for registering an agent */
public static final String LLM_FIELD = "llm";
/** The tools' field for an agent */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Enum encapsulating the different step names and the resources they create
*/
public enum WorkflowResources {

/** official workflow step name for creating a connector and associated created resource */
CREATE_CONNECTOR("create_connector", "connector_id"),
/** official workflow step name for registering a remote model and associated created resource */
REGISTER_REMOTE_MODEL("register_remote_model", "model_id"),
/** official workflow step name for registering a local model and associated created resource */
REGISTER_LOCAL_MODEL("register_local_model", "model_id"),
/** official workflow step name for registering a model group and associated created resource */
REGISTER_MODEL_GROUP("register_model_group", "model_group_id"),
/** official workflow step name for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id"),
/** official workflow step name for creating an index and associated created resource */
CREATE_INDEX("create_index", "index_name");

private final String workflowStep;
private final String resourceCreated;
private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
}

/**
* Returns the workflowStep for the given enum Constant
* @return the workflowStep of this data.
*/
public String getWorkflowStep() {
return workflowStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
*/
public String getResourceCreated() {
return resourceCreated;
}

/**
* gets the resources created type based on the workflowStep
* @param workflowStep workflow step name
* @return the resource that will be created
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
return mapping.getResourceCreated();
}
}
}
logger.error("Unable to find resource type for step: " + workflowStep);
throw new FlowFrameworkException("Unable to find resource type for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
*/
public static Set<String> getAllResourcesCreated() {
return allResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -435,6 +438,7 @@ public void updateFlowFrameworkSystemIndexDoc(
updatedContent.putAll(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(3);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand Down Expand Up @@ -468,7 +472,8 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
// TODO: Also add ability to change other fields at the same time when adding detailed provision progress
updateRequest.script(script);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
updateRequest.retryOnConflict(3);
// TODO: Implement our own concurrency control to improve on retry mechanism
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update {} entry : {}. {}", indexName, documentId, e.getMessage());
Expand All @@ -478,4 +483,38 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
}
}
}

/**
* Creates a new ResourceCreated object and a script to update the state index
* @param workflowId workflowId for the relevant step
* @param nodeId WorkflowData object with relevent step information
* @param workflowStepName the workflowstep name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
* @throws IOException if parsing fails on new resource
*/
public void updateResourceInStateIndex(
String workflowId,
String nodeId,
String workflowStepName,
String resourceId,
ActionListener<UpdateResponse> listener
) throws IOException {
ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceId);
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
newResource.toXContent(builder, ToXContentObject.EMPTY_PARAMS);

// The script to append a new object to the resources_created array
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource)",
Collections.singletonMap("newResource", newResource)
);

updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> {
logger.info("updated resources created of {}", workflowId);
listener.onResponse(updateResponse);
}, exception -> { listener.onFailure(exception); }));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@
*/
package org.opensearch.flowframework.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.exception.FlowFrameworkException;

import java.io.IOException;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_NAME;

/**
Expand All @@ -27,16 +32,21 @@
// TODO: create an enum to add the resource name itself for each step example (create_connector_step -> connector)
public class ResourceCreated implements ToXContentObject, Writeable {

private static final Logger logger = LogManager.getLogger(ResourceCreated.class);

private final String workflowStepName;
private final String workflowStepId;
private final String resourceId;

/**
* Create this resources created object with given resource name and ID.
* Create this resources created object with given workflow step name, ID and resource ID.
* @param workflowStepName The workflow step name associating to the step where it was created
* @param workflowStepId The workflow step ID associating to the step where it was created
* @param resourceId The resources ID for relating to the created resource
*/
public ResourceCreated(String workflowStepName, String resourceId) {
public ResourceCreated(String workflowStepName, String workflowStepId, String resourceId) {
this.workflowStepName = workflowStepName;
this.workflowStepId = workflowStepId;
this.resourceId = resourceId;
}

Expand All @@ -47,20 +57,23 @@ public ResourceCreated(String workflowStepName, String resourceId) {
*/
public ResourceCreated(StreamInput input) throws IOException {
this.workflowStepName = input.readString();
this.workflowStepId = input.readString();
this.resourceId = input.readString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject()
.field(WORKFLOW_STEP_NAME, workflowStepName)
.field(RESOURCE_ID_FIELD, resourceId);
.field(WORKFLOW_STEP_ID, workflowStepId)
.field(WorkflowResources.getResourceByWorkflowStep(workflowStepName), resourceId);
return xContentBuilder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(workflowStepName);
out.writeString(workflowStepId);
out.writeString(resourceId);
}

Expand All @@ -82,6 +95,15 @@ public String workflowStepName() {
return workflowStepName;
}

/**
* Gets the workflow step id associated to the created resource
*
* @return the workflowStepId.
*/
public String workflowStepId() {
return workflowStepId;
}

/**
* Parse raw JSON content into a ResourceCreated instance.
*
Expand All @@ -91,6 +113,7 @@ public String workflowStepName() {
*/
public static ResourceCreated parse(XContentParser parser) throws IOException {
String workflowStepName = null;
String workflowStepId = null;
String resourceId = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
Expand All @@ -102,22 +125,50 @@ public static ResourceCreated parse(XContentParser parser) throws IOException {
case WORKFLOW_STEP_NAME:
workflowStepName = parser.text();
break;
case RESOURCE_ID_FIELD:
resourceId = parser.text();
case WORKFLOW_STEP_ID:
workflowStepId = parser.text();
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
if (!isValidFieldName(fieldName)) {
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
} else {
if (fieldName.equals(WorkflowResources.getResourceByWorkflowStep(workflowStepName))) {
resourceId = parser.text();
}
break;
}
}
}
if (workflowStepName == null || resourceId == null) {
throw new IOException("A ResourceCreated object requires both a workflowStepName and resourceId.");
if (workflowStepName == null) {
logger.error("Resource created object failed parsing: workflowStepName: {}", workflowStepName);
throw new FlowFrameworkException("A ResourceCreated object requires workflowStepName", RestStatus.BAD_REQUEST);
}
if (workflowStepId == null) {
logger.error("Resource created object failed parsing: workflowStepId: {}", workflowStepId);
throw new FlowFrameworkException("A ResourceCreated object requires workflowStepId", RestStatus.BAD_REQUEST);
}
if (resourceId == null) {
logger.error("Resource created object failed parsing: resourceId: {}", resourceId);
throw new FlowFrameworkException("A ResourceCreated object requires resourceId", RestStatus.BAD_REQUEST);
}
return new ResourceCreated(workflowStepName, resourceId);
return new ResourceCreated(workflowStepName, workflowStepId, resourceId);
}

private static boolean isValidFieldName(String fieldName) {
return (WORKFLOW_STEP_NAME.equals(fieldName)
|| WORKFLOW_STEP_ID.equals(fieldName)
|| WorkflowResources.getAllResourcesCreated().contains(fieldName));
}

@Override
public String toString() {
return "resources_Created [resource_name=" + workflowStepName + ", id=" + resourceId + "]";
return "resources_Created [workflow_step_name= "
+ workflowStepName
+ ", workflow_step_id= "
+ workflowStepName
+ ", resource_id= "
+ resourceId
+ "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This represents the an object of workflow steps json which maps each step to expected inputs and outputs
* This represents an object of workflow steps json which maps each step to expected inputs and outputs
*/
public class WorkflowStepValidator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage()); })
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage(), exception); })
);
} catch (Exception ex) {
logger.error("Provisioning failed for workflow {} : {}", workflowId, ex);
logger.error("Provisioning failed for workflow: {}", workflowId, ex);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
ImmutableMap.of(
Expand All @@ -235,7 +235,7 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage()); })
}, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage(), ex); })
);
}
}
Expand Down
Loading

0 comments on commit feae94c

Please sign in to comment.