From a0caf9cb747e27421735797faf8874cb3ea621bc Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 7 May 2024 16:46:14 -0700 Subject: [PATCH 01/16] Add user mapping to Workflow State index (#705) * Add user mapping to Workflow State index Signed-off-by: Daniel Widdis * Increment schema version Signed-off-by: Daniel Widdis --------- Signed-off-by: Daniel Widdis Signed-off-by: martinpkr --- CHANGELOG.md | 18 +-------- .../resources/mappings/workflow-state.json | 40 ++++++++++++++++++- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98c31175f..465053a1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,27 +12,13 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Maintenance ### Refactoring -## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.13...2.x) +## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x) ### Features ### Enhancements -- Add guardrails to default use case params ([#658](https://github.com/opensearch-project/flow-framework/pull/658)) -- Allow strings for boolean workflow step parameters ([#671](https://github.com/opensearch-project/flow-framework/pull/671)) -- Add optional delay parameter to no-op step ([#674](https://github.com/opensearch-project/flow-framework/pull/674)) -- Add model interface support for remote and local custom models ([#701](https://github.com/opensearch-project/flow-framework/pull/701)) - ### Bug Fixes -- Reset workflow state to initial state after successful deprovision ([#635](https://github.com/opensearch-project/flow-framework/pull/635)) -- Silently ignore content on APIs that don't require it ([#639](https://github.com/opensearch-project/flow-framework/pull/639)) -- Hide user and credential field from search response ([#680](https://github.com/opensearch-project/flow-framework/pull/680)) -- Throw the correct error message in status API for WorkflowSteps ([#676](https://github.com/opensearch-project/flow-framework/pull/676)) -- Delete workflow state when template is deleted and no resources exist ([#689](https://github.com/opensearch-project/flow-framework/pull/689)) -- Fixing model group parsing and restoring context ([#695] (https://github.com/opensearch-project/flow-framework/pull/695)) +- Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705)) ### Infrastructure -- Switch macos runner to macos-13 from macos-latest since macos-latest is now arm64 ([#686](https://github.com/opensearch-project/flow-framework/pull/686)) - ### Documentation ### Maintenance ### Refactoring -- Improve error messages for workflow states other than NOT_STARTED ([#642](https://github.com/opensearch-project/flow-framework/pull/642)) -- Create a Config XContent model for Config index ([#679](https://github.com/opensearch-project/flow-framework/pull/679)) diff --git a/src/main/resources/mappings/workflow-state.json b/src/main/resources/mappings/workflow-state.json index a42e3b749..ecb635413 100644 --- a/src/main/resources/mappings/workflow-state.json +++ b/src/main/resources/mappings/workflow-state.json @@ -1,7 +1,7 @@ { "dynamic": false, "_meta": { - "schema_version": 2 + "schema_version": 3 }, "properties": { "schema_version": { @@ -30,6 +30,44 @@ "user_outputs": { "type": "object" }, + "user": { + "type": "nested", + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } + }, "resources_created": { "type": "nested", "properties": { From c537129ae969fa9cd02ced91dab82dc857d6a7b9 Mon Sep 17 00:00:00 2001 From: martinpkr Date: Thu, 30 May 2024 15:51:20 +0300 Subject: [PATCH 02/16] Added a new parse util method to avoid repetition / refactored code with new method Signed-off-by: martinpkr --- .../java/org/opensearch/flowframework/util/ParseUtils.java | 5 +++++ .../workflow/AbstractRegisterLocalModelStep.java | 2 +- .../flowframework/workflow/RegisterModelGroupStep.java | 4 +--- .../flowframework/workflow/RegisterRemoteModelStep.java | 2 +- .../java/org/opensearch/flowframework/workflow/ToolStep.java | 4 +--- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index b4b908aeb..3d509cb35 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; +import org.opensearch.common.Booleans; import org.opensearch.common.io.Streams; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; @@ -472,4 +473,8 @@ public static Map convertStringToObjectMapToStringToStringMap(Ma return stringToStringMap; } } + + public static Boolean checkIfInputsContainsKey(Map inputs, String enumKey){ + return inputs.containsKey(enumKey) ? Booleans.parseBoolean(inputs.get(enumKey).toString()) : null; + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java index 80294a7cd..39d5b19c4 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java @@ -123,7 +123,7 @@ public PlainActionFuture execute( String modelGroupId = (String) inputs.get(MODEL_GROUP_ID); String allConfig = (String) inputs.get(ALL_CONFIG); String modelInterface = (String) inputs.get(INTERFACE_FIELD); - final Boolean deploy = inputs.containsKey(DEPLOY_FIELD) ? Booleans.parseBoolean(inputs.get(DEPLOY_FIELD).toString()) : null; + final Boolean deploy = ParseUtils.checkIfInputsContainsKey(inputs, DEPLOY_FIELD); // Build register model input MLRegisterModelInputBuilder mlInputBuilder = MLRegisterModelInput.builder() diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java index 9771d7c77..09b52ac9b 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java @@ -147,9 +147,7 @@ public void onFailure(Exception ex) { if (inputs.containsKey(MODEL_ACCESS_MODE)) { modelAccessMode = AccessMode.from((inputs.get(MODEL_ACCESS_MODE)).toString().toLowerCase(Locale.ROOT)); } - Boolean isAddAllBackendRoles = inputs.containsKey(ADD_ALL_BACKEND_ROLES) - ? Booleans.parseBoolean(inputs.get(ADD_ALL_BACKEND_ROLES).toString()) - : null; + Boolean isAddAllBackendRoles = ParseUtils.checkIfInputsContainsKey(inputs, ADD_ALL_BACKEND_ROLES); MLRegisterModelGroupInputBuilder builder = MLRegisterModelGroupInput.builder(); builder.name(modelGroupName); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java index db5d290ca..b18103381 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java @@ -100,7 +100,7 @@ public PlainActionFuture execute( String connectorId = (String) inputs.get(CONNECTOR_ID); Guardrails guardRails = (Guardrails) inputs.get(GUARDRAILS_FIELD); String modelInterface = (String) inputs.get(INTERFACE_FIELD); - final Boolean deploy = inputs.containsKey(DEPLOY_FIELD) ? Booleans.parseBoolean(inputs.get(DEPLOY_FIELD).toString()) : null; + final Boolean deploy = ParseUtils.checkIfInputsContainsKey(inputs, DEPLOY_FIELD); MLRegisterModelInputBuilder builder = MLRegisterModelInput.builder() .functionName(FunctionName.REMOTE) diff --git a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java index 87ecf762e..5a5d4b703 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java @@ -64,9 +64,7 @@ public PlainActionFuture execute( String type = (String) inputs.get(TYPE); String name = (String) inputs.get(NAME_FIELD); String description = (String) inputs.get(DESCRIPTION_FIELD); - Boolean includeOutputInAgentResponse = inputs.containsKey(INCLUDE_OUTPUT_IN_AGENT_RESPONSE) - ? Booleans.parseBoolean(inputs.get(INCLUDE_OUTPUT_IN_AGENT_RESPONSE).toString()) - : null; + Boolean includeOutputInAgentResponse = ParseUtils.checkIfInputsContainsKey(inputs ,INCLUDE_OUTPUT_IN_AGENT_RESPONSE); Map parameters = getToolsParametersMap(inputs.get(PARAMETERS_FIELD), previousNodeInputs, outputs); MLToolSpec.MLToolSpecBuilder builder = MLToolSpec.builder(); From 7e3a612a43c0070c30ca29b4979a98c78192e3b2 Mon Sep 17 00:00:00 2001 From: martinpkr Date: Thu, 30 May 2024 20:01:53 +0300 Subject: [PATCH 03/16] refactored method name and added unit test Signed-off-by: martinpkr --- .../opensearch/flowframework/util/ParseUtils.java | 11 +++++++++-- .../workflow/AbstractRegisterLocalModelStep.java | 3 +-- .../workflow/RegisterModelGroupStep.java | 3 +-- .../workflow/RegisterRemoteModelStep.java | 3 +-- .../opensearch/flowframework/workflow/ToolStep.java | 3 +-- .../flowframework/util/ParseUtilsTests.java | 12 ++++++++++++ 6 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index 3d509cb35..89397fb2b 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -474,7 +474,14 @@ public static Map convertStringToObjectMapToStringToStringMap(Ma } } - public static Boolean checkIfInputsContainsKey(Map inputs, String enumKey){ - return inputs.containsKey(enumKey) ? Booleans.parseBoolean(inputs.get(enumKey).toString()) : null; + /** + * Checks if the inputs map contains the specified key and parses the associated value to a Boolean. + * + * @param inputs the map containing the input data + * @param key the key to check in the map + * @return the Boolean value associated with the key if present, or null if the key is not found + */ + public static Boolean parseBooleanIfExists(Map inputs, String key) { + return inputs.containsKey(key) ? Booleans.parseBoolean(inputs.get(key).toString()) : null; } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java index 39d5b19c4..58d0b5578 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.action.support.PlainActionFuture; -import org.opensearch.common.Booleans; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; @@ -123,7 +122,7 @@ public PlainActionFuture execute( String modelGroupId = (String) inputs.get(MODEL_GROUP_ID); String allConfig = (String) inputs.get(ALL_CONFIG); String modelInterface = (String) inputs.get(INTERFACE_FIELD); - final Boolean deploy = ParseUtils.checkIfInputsContainsKey(inputs, DEPLOY_FIELD); + final Boolean deploy = ParseUtils.parseBooleanIfExists(inputs, DEPLOY_FIELD); // Build register model input MLRegisterModelInputBuilder mlInputBuilder = MLRegisterModelInput.builder() diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java index 09b52ac9b..782632f1b 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.action.support.PlainActionFuture; -import org.opensearch.common.Booleans; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.rest.RestStatus; @@ -147,7 +146,7 @@ public void onFailure(Exception ex) { if (inputs.containsKey(MODEL_ACCESS_MODE)) { modelAccessMode = AccessMode.from((inputs.get(MODEL_ACCESS_MODE)).toString().toLowerCase(Locale.ROOT)); } - Boolean isAddAllBackendRoles = ParseUtils.checkIfInputsContainsKey(inputs, ADD_ALL_BACKEND_ROLES); + Boolean isAddAllBackendRoles = ParseUtils.parseBooleanIfExists(inputs, ADD_ALL_BACKEND_ROLES); MLRegisterModelGroupInputBuilder builder = MLRegisterModelGroupInput.builder(); builder.name(modelGroupName); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java index b18103381..660f7e037 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java @@ -13,7 +13,6 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.update.UpdateResponse; -import org.opensearch.common.Booleans; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; @@ -100,7 +99,7 @@ public PlainActionFuture execute( String connectorId = (String) inputs.get(CONNECTOR_ID); Guardrails guardRails = (Guardrails) inputs.get(GUARDRAILS_FIELD); String modelInterface = (String) inputs.get(INTERFACE_FIELD); - final Boolean deploy = ParseUtils.checkIfInputsContainsKey(inputs, DEPLOY_FIELD); + final Boolean deploy = ParseUtils.parseBooleanIfExists(inputs, DEPLOY_FIELD); MLRegisterModelInputBuilder builder = MLRegisterModelInput.builder() .functionName(FunctionName.REMOTE) diff --git a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java index 5a5d4b703..9f50fcf11 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.support.PlainActionFuture; -import org.opensearch.common.Booleans; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.exception.WorkflowStepException; @@ -64,7 +63,7 @@ public PlainActionFuture execute( String type = (String) inputs.get(TYPE); String name = (String) inputs.get(NAME_FIELD); String description = (String) inputs.get(DESCRIPTION_FIELD); - Boolean includeOutputInAgentResponse = ParseUtils.checkIfInputsContainsKey(inputs ,INCLUDE_OUTPUT_IN_AGENT_RESPONSE); + Boolean includeOutputInAgentResponse = ParseUtils.parseBooleanIfExists(inputs ,INCLUDE_OUTPUT_IN_AGENT_RESPONSE); Map parameters = getToolsParametersMap(inputs.get(PARAMETERS_FIELD), previousNodeInputs, outputs); MLToolSpec.MLToolSpecBuilder builder = MLToolSpec.builder(); diff --git a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java index 29167c740..b755be6f7 100644 --- a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java @@ -221,4 +221,16 @@ public void testGetInputsFromPreviousSteps() { assertEquals("Missing required inputs [not-here] in workflow [workflowId] node [nodeId]", e.getMessage()); assertEquals(RestStatus.BAD_REQUEST, e.getRestStatus()); } + + public void testParseBooleanIfExists() { + Map inputs = new HashMap<>(); + inputs.put("key1", "true"); + inputs.put("key2", "false"); + inputs.put("key3", "true"); + + assertEquals(Boolean.TRUE, ParseUtils.parseBooleanIfExists(inputs, "key1")); + assertEquals(Boolean.FALSE, ParseUtils.parseBooleanIfExists(inputs, "key2")); + assertNull(ParseUtils.parseBooleanIfExists(inputs, "keyThatDoesntExist")); + + } } From 7a37deaf9c4eeb59b59a52d323588fe4b69dd2a4 Mon Sep 17 00:00:00 2001 From: martinpkr Date: Thu, 30 May 2024 20:44:01 +0300 Subject: [PATCH 04/16] made method use generics + added test Signed-off-by: martinpkr --- .../flowframework/util/ParseUtils.java | 16 ++++++++-- .../AbstractRegisterLocalModelStep.java | 2 +- .../workflow/RegisterModelGroupStep.java | 2 +- .../workflow/RegisterRemoteModelStep.java | 2 +- .../flowframework/workflow/ToolStep.java | 2 +- .../flowframework/util/ParseUtilsTests.java | 30 ++++++++++++++++--- 6 files changed, 43 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index 89397fb2b..a27cbb2e0 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; -import org.opensearch.common.Booleans; import org.opensearch.common.io.Streams; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; @@ -481,7 +480,18 @@ public static Map convertStringToObjectMapToStringToStringMap(Ma * @param key the key to check in the map * @return the Boolean value associated with the key if present, or null if the key is not found */ - public static Boolean parseBooleanIfExists(Map inputs, String key) { - return inputs.containsKey(key) ? Booleans.parseBoolean(inputs.get(key).toString()) : null; + public static T parseIfExists(Map inputs, String key, Class type) { + if (!inputs.containsKey(key)) { + return null; + } + + Object value = inputs.get(key); + if (type == Boolean.class) { + return type.cast(Boolean.valueOf(value.toString())); + } else if (type == Float.class) { + return type.cast(Float.valueOf(value.toString())); + } else { + throw new IllegalArgumentException("Unsupported type: " + type); + } } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java index 58d0b5578..10c6a884b 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java @@ -122,7 +122,7 @@ public PlainActionFuture execute( String modelGroupId = (String) inputs.get(MODEL_GROUP_ID); String allConfig = (String) inputs.get(ALL_CONFIG); String modelInterface = (String) inputs.get(INTERFACE_FIELD); - final Boolean deploy = ParseUtils.parseBooleanIfExists(inputs, DEPLOY_FIELD); + final Boolean deploy = ParseUtils.parseIfExists(inputs, DEPLOY_FIELD, Boolean.class); // Build register model input MLRegisterModelInputBuilder mlInputBuilder = MLRegisterModelInput.builder() diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java index 782632f1b..8fe1271df 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java @@ -146,7 +146,7 @@ public void onFailure(Exception ex) { if (inputs.containsKey(MODEL_ACCESS_MODE)) { modelAccessMode = AccessMode.from((inputs.get(MODEL_ACCESS_MODE)).toString().toLowerCase(Locale.ROOT)); } - Boolean isAddAllBackendRoles = ParseUtils.parseBooleanIfExists(inputs, ADD_ALL_BACKEND_ROLES); + Boolean isAddAllBackendRoles = ParseUtils.parseIfExists(inputs, ADD_ALL_BACKEND_ROLES, Boolean.class); MLRegisterModelGroupInputBuilder builder = MLRegisterModelGroupInput.builder(); builder.name(modelGroupName); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java index 660f7e037..b585ad92d 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java @@ -99,7 +99,7 @@ public PlainActionFuture execute( String connectorId = (String) inputs.get(CONNECTOR_ID); Guardrails guardRails = (Guardrails) inputs.get(GUARDRAILS_FIELD); String modelInterface = (String) inputs.get(INTERFACE_FIELD); - final Boolean deploy = ParseUtils.parseBooleanIfExists(inputs, DEPLOY_FIELD); + final Boolean deploy = ParseUtils.parseIfExists(inputs, DEPLOY_FIELD,Boolean.class); MLRegisterModelInputBuilder builder = MLRegisterModelInput.builder() .functionName(FunctionName.REMOTE) diff --git a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java index 9f50fcf11..909e6b0ad 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java @@ -63,7 +63,7 @@ public PlainActionFuture execute( String type = (String) inputs.get(TYPE); String name = (String) inputs.get(NAME_FIELD); String description = (String) inputs.get(DESCRIPTION_FIELD); - Boolean includeOutputInAgentResponse = ParseUtils.parseBooleanIfExists(inputs ,INCLUDE_OUTPUT_IN_AGENT_RESPONSE); + Boolean includeOutputInAgentResponse = ParseUtils.parseIfExists(inputs ,INCLUDE_OUTPUT_IN_AGENT_RESPONSE, Boolean.class); Map parameters = getToolsParametersMap(inputs.get(PARAMETERS_FIELD), previousNodeInputs, outputs); MLToolSpec.MLToolSpecBuilder builder = MLToolSpec.builder(); diff --git a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java index b755be6f7..2cf1e9ea3 100644 --- a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java @@ -8,6 +8,8 @@ */ package org.opensearch.flowframework.util; +import org.junit.Test; +import org.junit.rules.ExpectedException; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentBuilder; @@ -222,15 +224,35 @@ public void testGetInputsFromPreviousSteps() { assertEquals(RestStatus.BAD_REQUEST, e.getRestStatus()); } - public void testParseBooleanIfExists() { + public void testParseIfExistsWithBooleanClass() { Map inputs = new HashMap<>(); inputs.put("key1", "true"); inputs.put("key2", "false"); inputs.put("key3", "true"); - assertEquals(Boolean.TRUE, ParseUtils.parseBooleanIfExists(inputs, "key1")); - assertEquals(Boolean.FALSE, ParseUtils.parseBooleanIfExists(inputs, "key2")); - assertNull(ParseUtils.parseBooleanIfExists(inputs, "keyThatDoesntExist")); + assertEquals(Boolean.TRUE, ParseUtils.parseIfExists(inputs, "key1", Boolean.class)); + assertEquals(Boolean.FALSE, ParseUtils.parseIfExists(inputs, "key2", Boolean.class)); + assertNull(ParseUtils.parseIfExists(inputs, "keyThatDoesntExist", Boolean.class)); } + + public void testParseIfExistsWithFloatClass() { + Map inputs = new HashMap<>(); + inputs.put("key1", "3.14"); + inputs.put("key2", "0.01"); + inputs.put("key3", "90.22"); + + assertEquals(Float.valueOf("3.14"), ParseUtils.parseIfExists(inputs, "key1", Float.class)); + assertEquals(Float.valueOf("0.01"), ParseUtils.parseIfExists(inputs, "key2", Float.class)); + assertNull(ParseUtils.parseIfExists(inputs, "keyThatDoesntExist", Float.class)); + + } + + public void testParseIfExistWhenWrongTypeIsPassed() { + + Map inputs = new HashMap<>(); + inputs.put("key1", "3.14"); + + assertThrows(IllegalArgumentException.class, () -> ParseUtils.parseIfExists(inputs, "key1", Integer.class)); + } } From 00bd9c758993bc9b7fb187eb44eea300e3e6c5a6 Mon Sep 17 00:00:00 2001 From: martinpkr Date: Thu, 30 May 2024 22:58:08 +0300 Subject: [PATCH 05/16] fixed javadoc Signed-off-by: martinpkr --- .../java/org/opensearch/flowframework/util/ParseUtils.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index a27cbb2e0..97b74ae31 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -474,11 +474,12 @@ public static Map convertStringToObjectMapToStringToStringMap(Ma } /** - * Checks if the inputs map contains the specified key and parses the associated value to a Boolean. + * Checks if the inputs map contains the specified key and parses the associated value to a generic class. * * @param inputs the map containing the input data * @param key the key to check in the map - * @return the Boolean value associated with the key if present, or null if the key is not found + * @param type the class to parse the value to + * @return the generic type value associated with the key if present, or null if the key is not found */ public static T parseIfExists(Map inputs, String key, Class type) { if (!inputs.containsKey(key)) { From 2d9c0acb1511210ed837395f4a21dad5723a712f Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Thu, 30 May 2024 14:20:15 -0700 Subject: [PATCH 06/16] Added workflow step for ReIndex Step (#718) * Initial commit for reindex workflow step with extra params Signed-off-by: owaiskazi19 * Addressed PR comments Signed-off-by: owaiskazi19 * Changed request per second to Float Signed-off-by: owaiskazi19 * Addressed string array for source indices and removed state index entry Signed-off-by: owaiskazi19 * Minor comments Signed-off-by: owaiskazi19 --------- Signed-off-by: owaiskazi19 Signed-off-by: martinpkr --- CHANGELOG.md | 2 + .../flowframework/common/CommonValue.java | 5 +- .../common/WorkflowResources.java | 3 + .../flowframework/workflow/ReindexStep.java | 176 +++++++++++++++ .../workflow/WorkflowStepFactory.java | 6 + .../model/WorkflowValidatorTests.java | 2 +- .../workflow/ReindexStepTests.java | 201 ++++++++++++++++++ 7 files changed, 393 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java create mode 100644 src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 465053a1e..9bc658575 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x) ### Features ### Enhancements +- Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718)) + ### Bug Fixes - Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705)) diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index ac0291687..2a835b852 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -174,7 +174,10 @@ private CommonValue() {} public static final String DELAY_FIELD = "delay"; /** Model Interface Field */ public static final String INTERFACE_FIELD = "interface"; - + /** The source index field for reindex */ + public static final String SOURCE_INDEX = "source_index"; + /** The destination index field for reindex */ + public static final String DESTINATION_INDEX = "destination_index"; /* * Constants associated with resource provisioning / state */ diff --git a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java index a024ec3b8..e349e57e0 100644 --- a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java +++ b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java @@ -27,6 +27,7 @@ import org.opensearch.flowframework.workflow.RegisterLocalSparseEncodingModelStep; import org.opensearch.flowframework.workflow.RegisterModelGroupStep; import org.opensearch.flowframework.workflow.RegisterRemoteModelStep; +import org.opensearch.flowframework.workflow.ReindexStep; import org.opensearch.flowframework.workflow.UndeployModelStep; import java.util.Set; @@ -58,6 +59,8 @@ public enum WorkflowResources { CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step /** Workflow steps for creating an index and associated created resource */ CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME), + /** Workflow steps for reindex a source index to destination index and associated created resource */ + REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME), /** Workflow steps for registering/deleting an agent and the associated created resource */ REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME); diff --git a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java new file mode 100644 index 000000000..bc335db97 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java @@ -0,0 +1,176 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.Client; +import org.opensearch.common.Booleans; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.Strings; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.util.ParseUtils; +import org.opensearch.index.reindex.BulkByScrollResponse; +import org.opensearch.index.reindex.ReindexAction; +import org.opensearch.index.reindex.ReindexRequest; + +import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX; +import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX; + +/** + * Step to reindex + */ +public class ReindexStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(ReindexStep.class); + private final Client client; + private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "reindex"; + /** The refresh field for reindex */ + private static final String REFRESH = "refresh"; + /** The requests_per_second field for reindex */ + private static final String REQUESTS_PER_SECOND = "requests_per_second"; + /** The require_alias field for reindex */ + private static final String REQUIRE_ALIAS = "require_alias"; + /** The slices field for reindex */ + private static final String SLICES = "slices"; + /** The max_docs field for reindex */ + private static final String MAX_DOCS = "max_docs"; + + /** + * Instantiate this class + * + * @param client Client to create an index + * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices + */ + public ReindexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) { + this.client = client; + this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + + PlainActionFuture reIndexFuture = PlainActionFuture.newFuture(); + + Set requiredKeys = Set.of(SOURCE_INDEX, DESTINATION_INDEX); + + Set optionalKeys = Set.of(REFRESH, REQUESTS_PER_SECOND, REQUIRE_ALIAS, SLICES, MAX_DOCS); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + requiredKeys, + optionalKeys, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String sourceIndices = (String) inputs.get(SOURCE_INDEX); + String destinationIndex = (String) inputs.get(DESTINATION_INDEX); + Boolean refresh = inputs.containsKey(REFRESH) ? Booleans.parseBoolean(inputs.get(REFRESH).toString()) : null; + Float requestsPerSecond = inputs.containsKey(REQUESTS_PER_SECOND) + ? Float.parseFloat(inputs.get(REQUESTS_PER_SECOND).toString()) + : null; + Boolean requireAlias = inputs.containsKey(REQUIRE_ALIAS) ? Booleans.parseBoolean(inputs.get(REQUIRE_ALIAS).toString()) : null; + Integer slices = (Integer) inputs.get(SLICES); + Integer maxDocs = (Integer) inputs.get(MAX_DOCS); + + ReindexRequest reindexRequest = new ReindexRequest().setSourceIndices(Strings.splitStringByCommaToArray(sourceIndices)) + .setDestIndex(destinationIndex); + + if (refresh != null) { + reindexRequest.setRefresh(refresh); + } + if (requestsPerSecond != null) { + reindexRequest.setRequestsPerSecond(requestsPerSecond); + } + if (requireAlias != null) { + reindexRequest.setRequireAlias(requireAlias); + } + if (maxDocs != null) { + reindexRequest.setMaxDocs(maxDocs); + } + if (slices != null) { + reindexRequest.setSlices(slices); + } + + ActionListener actionListener = new ActionListener<>() { + + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + logger.info("Reindex from source: {} to destination {}", sourceIndices, destinationIndex); + try { + if (bulkByScrollResponse.getBulkFailures().isEmpty() && bulkByScrollResponse.getSearchFailures().isEmpty()) { + reIndexFuture.onResponse( + new WorkflowData( + Map.of( + NAME, + Map.ofEntries( + Map.entry(DESTINATION_INDEX, destinationIndex), + Map.entry(SOURCE_INDEX, sourceIndices) + ) + ), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + } else { + String errorMessage = "Failed to get bulk response " + bulkByScrollResponse.getBulkFailures(); + reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); + } + } catch (Exception e) { + String errorMessage = "Failed to parse and update new created resource"; + logger.error(errorMessage, e); + reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + } + } + + @Override + public void onFailure(Exception e) { + String errorMessage = "Failed to reindex from source " + sourceIndices + " to " + destinationIndex; + logger.error(errorMessage, e); + reIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); + } + }; + + client.execute(ReindexAction.INSTANCE, reindexRequest, actionListener); + + } catch (IllegalArgumentException iae) { + String error = "Failed to reindex " + iae.getMessage(); + reIndexFuture.onFailure(new WorkflowStepException(error, RestStatus.BAD_REQUEST)); + } catch (Exception e) { + reIndexFuture.onFailure(e); + } + + return reIndexFuture; + } + + @Override + public String getName() { + return NAME; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index 224cbf1eb..7ab5c1061 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -34,6 +34,7 @@ import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD; import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD; +import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX; import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION; import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE; import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME; @@ -47,6 +48,7 @@ import static org.opensearch.flowframework.common.CommonValue.PIPELINE_ID; import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD; import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS; +import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX; import static org.opensearch.flowframework.common.CommonValue.SUCCESS; import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD; import static org.opensearch.flowframework.common.CommonValue.TYPE; @@ -84,6 +86,7 @@ public WorkflowStepFactory( ) { stepMap.put(NoOpStep.NAME, NoOpStep::new); stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler)); + stepMap.put(ReindexStep.NAME, () -> new ReindexStep(client, flowFrameworkIndicesHandler)); stepMap.put( RegisterLocalCustomModelStep.NAME, () -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings) @@ -125,6 +128,9 @@ public enum WorkflowSteps { /** Create Index Step */ CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null), + /** Create ReIndex Step */ + REINDEX(ReindexStep.NAME, List.of(SOURCE_INDEX, DESTINATION_INDEX), List.of(ReindexStep.NAME), Collections.emptyList(), null), + /** Create Connector Step */ CREATE_CONNECTOR( CreateConnectorStep.NAME, diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index 80a9788c2..19cb3d718 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException { WorkflowValidator validator = new WorkflowValidator(workflowStepValidators); - assertEquals(17, validator.getWorkflowStepValidators().size()); + assertEquals(18, validator.getWorkflowStepValidators().size()); assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector")); assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java new file mode 100644 index 000000000..97eff365a --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java @@ -0,0 +1,201 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.OpenSearchException; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Client; +import org.opensearch.common.Randomness; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.index.reindex.BulkByScrollResponse; +import org.opensearch.index.reindex.BulkByScrollTask; +import org.opensearch.index.reindex.ReindexRequest; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.mockito.ArgumentCaptor; +import org.mockito.MockitoAnnotations; + +import static java.lang.Math.abs; +import static java.util.stream.Collectors.toList; +import static org.opensearch.action.DocWriteResponse.Result.UPDATED; +import static org.opensearch.common.unit.TimeValue.timeValueMillis; +import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX; +import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; +import static org.opensearch.flowframework.workflow.ReindexStep.NAME; +import static org.apache.lucene.tests.util.TestUtil.randomSimpleString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ReindexStepTests extends OpenSearchTestCase { + private WorkflowData inputData = WorkflowData.EMPTY; + private Client client; + private ReindexStep reIndexStep; + /** The refresh field for reindex */ + private static final String REFRESH = "refresh"; + /** The requests_per_second field for reindex */ + private static final String REQUESTS_PER_SECOND = "requests_per_second"; + /** The require_alias field for reindex */ + private static final String REQUIRE_ALIAS = "require_alias"; + /** The slices field for reindex */ + private static final String SLICES = "slices"; + /** The max_docs field for reindex */ + private static final String MAX_DOCS = "max_docs"; + + private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + + @Override + public void setUp() throws Exception { + super.setUp(); + this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); + MockitoAnnotations.openMocks(this); + + inputData = new WorkflowData( + Map.ofEntries( + Map.entry(SOURCE_INDEX, "demo"), + Map.entry(DESTINATION_INDEX, "dest"), + Map.entry(REFRESH, true), + Map.entry(REQUESTS_PER_SECOND, 2.0), + Map.entry(REQUIRE_ALIAS, false), + Map.entry(SLICES, 1), + Map.entry(MAX_DOCS, 2) + ), + "test-id", + "test-node-id" + ); + + client = mock(Client.class); + reIndexStep = new ReindexStep(client, flowFrameworkIndicesHandler); + } + + public void testReindexStep() throws ExecutionException, InterruptedException, IOException { + + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer(invocation -> { + ActionListener updateResponseListener = invocation.getArgument(4); + updateResponseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, UPDATED)); + return null; + }).when(flowFrameworkIndicesHandler).updateResourceInStateIndex(anyString(), anyString(), anyString(), anyString(), any()); + + PlainActionFuture future = reIndexStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + verify(client, times(1)).execute(any(), any(ReindexRequest.class), actionListenerCaptor.capture()); + actionListenerCaptor.getValue() + .onResponse( + new BulkByScrollResponse( + timeValueMillis(randomNonNegativeLong()), + randomStatus(), + Collections.emptyList(), + Collections.emptyList(), + randomBoolean() + ) + ); + + assertTrue(future.isDone()); + Map outputData = Map.of(NAME, Map.ofEntries(Map.entry(DESTINATION_INDEX, "dest"), Map.entry(SOURCE_INDEX, "demo"))); + assertEquals(outputData, future.get().getContent()); + + } + + public void testReindexStepFailure() throws ExecutionException, InterruptedException { + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + PlainActionFuture future = reIndexStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + assertFalse(future.isDone()); + verify(client, times(1)).execute(any(), any(ReindexRequest.class), actionListenerCaptor.capture()); + + actionListenerCaptor.getValue().onFailure(new Exception("Failed to reindex from source demo to dest")); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof Exception); + assertEquals("Failed to reindex from source demo to dest", ex.getCause().getMessage()); + } + + private static BulkByScrollTask.Status randomStatus() { + if (randomBoolean()) { + return randomWorkingStatus(null); + } + boolean canHaveNullStatues = randomBoolean(); + List statuses = IntStream.range(0, between(0, 10)).mapToObj(i -> { + if (canHaveNullStatues && LuceneTestCase.rarely()) { + return null; + } + if (randomBoolean()) { + return new BulkByScrollTask.StatusOrException(new OpenSearchException(randomAlphaOfLength(5))); + } + return new BulkByScrollTask.StatusOrException(randomWorkingStatus(i)); + }).collect(toList()); + return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null); + } + + private static BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) { + // These all should be believably small because we sum them if we have multiple workers + int total = between(0, 10000000); + int updated = between(0, total); + int created = between(0, total - updated); + int deleted = between(0, total - updated - created); + int noops = total - updated - created - deleted; + int batches = between(0, 10000); + long versionConflicts = between(0, total); + long bulkRetries = between(0, 10000000); + long searchRetries = between(0, 100000); + // smallest unit of time during toXContent is Milliseconds + TimeUnit[] timeUnits = { TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS }; + TimeValue throttled = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits)); + TimeValue throttledUntil = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits)); + return new BulkByScrollTask.Status( + sliceId, + total, + updated, + created, + deleted, + batches, + versionConflicts, + noops, + bulkRetries, + searchRetries, + throttled, + abs(Randomness.get().nextFloat()), + randomBoolean() ? null : randomSimpleString(Randomness.get()), + throttledUntil + ); + } +} From aca39c61fc7881ddcffc4967388e166bb50f6365 Mon Sep 17 00:00:00 2001 From: martinpkr Date: Fri, 31 May 2024 10:15:43 +0300 Subject: [PATCH 07/16] Incorporating parseIfExist method into ReindexStep class Signed-off-by: martinpkr --- .../opensearch/flowframework/workflow/ReindexStep.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java index bc335db97..c3b7cc52d 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java @@ -91,11 +91,9 @@ public PlainActionFuture execute( String sourceIndices = (String) inputs.get(SOURCE_INDEX); String destinationIndex = (String) inputs.get(DESTINATION_INDEX); - Boolean refresh = inputs.containsKey(REFRESH) ? Booleans.parseBoolean(inputs.get(REFRESH).toString()) : null; - Float requestsPerSecond = inputs.containsKey(REQUESTS_PER_SECOND) - ? Float.parseFloat(inputs.get(REQUESTS_PER_SECOND).toString()) - : null; - Boolean requireAlias = inputs.containsKey(REQUIRE_ALIAS) ? Booleans.parseBoolean(inputs.get(REQUIRE_ALIAS).toString()) : null; + Boolean refresh = ParseUtils.parseIfExists(inputs, REFRESH, Boolean.class); + Float requestsPerSecond = ParseUtils.parseIfExists(inputs, REQUESTS_PER_SECOND, Float.class); + Boolean requireAlias = ParseUtils.parseIfExists(inputs, REQUIRE_ALIAS, Boolean.class); Integer slices = (Integer) inputs.get(SLICES); Integer maxDocs = (Integer) inputs.get(MAX_DOCS); From 348dee673684cdf1a6c3662f6d9a672599cc2e54 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 30 May 2024 18:00:51 -0700 Subject: [PATCH 08/16] Add param to delete workflow API to clear status even if resources exist (#719) Signed-off-by: Daniel Widdis Signed-off-by: martinpkr --- CHANGELOG.md | 1 + .../flowframework/common/CommonValue.java | 2 ++ .../indices/FlowFrameworkIndicesHandler.java | 12 ++++++-- .../flowframework/model/WorkflowState.java | 2 +- .../rest/RestDeleteWorkflowAction.java | 4 ++- .../rest/RestGetWorkflowStateAction.java | 2 +- .../DeleteWorkflowTransportAction.java | 6 +++- .../GetWorkflowStateTransportAction.java | 4 +-- .../FlowFrameworkRestTestCase.java | 29 +++++++++++++++++-- .../FlowFrameworkIndicesHandlerTests.java | 28 +++++++++--------- .../rest/FlowFrameworkRestApiIT.java | 21 ++++++++++++++ 11 files changed, 84 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bc658575..5021e3f14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Features ### Enhancements - Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718)) +- Add param to delete workflow API to clear status even if resources exist ([#719](https://github.com/opensearch-project/flow-framework/pull/719)) ### Bug Fixes - Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705)) diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index 2a835b852..87c2f2180 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -199,6 +199,8 @@ private CommonValue() {} public static final String USER_OUTPUTS_FIELD = "user_outputs"; /** The template field name for template resources created */ public static final String RESOURCES_CREATED_FIELD = "resources_created"; + /** The parameter to clear workflow state when deleting template */ + public static final String CLEAR_STATUS = "clear_status"; /** The field name for the step name where a resource is created */ public static final String WORKFLOW_STEP_NAME = "workflow_step_name"; /** The field name for the step ID where a resource is created */ diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 34cdf2b56..8fcfd1207 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -525,11 +525,17 @@ public void getProvisioningProgress( * Check workflow provisioning state and resources to see if state can be deleted with template * * @param documentId document id - * @param canDeleteStateConsumer consumer function which will be true if NOT_STARTED or COMPLETED and no resources + * @param clearStatus if set true, always deletes the state document unless status is IN_PROGRESS + * @param canDeleteStateConsumer consumer function which will be true if workflow state is not IN_PROGRESS and either no resources or true clearStatus * @param listener action listener from caller to fail on error * @param action listener response type */ - public void canDeleteWorkflowStateDoc(String documentId, Consumer canDeleteStateConsumer, ActionListener listener) { + public void canDeleteWorkflowStateDoc( + String documentId, + boolean clearStatus, + Consumer canDeleteStateConsumer, + ActionListener listener + ) { GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { client.get(getRequest, ActionListener.wrap(response -> { @@ -545,7 +551,7 @@ public void canDeleteWorkflowStateDoc(String documentId, Consumer c ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); WorkflowState workflowState = WorkflowState.parse(parser); canDeleteStateConsumer.accept( - workflowState.resourcesCreated().isEmpty() + (clearStatus || workflowState.resourcesCreated().isEmpty()) && !ProvisioningProgress.IN_PROGRESS.equals( ProvisioningProgress.valueOf(workflowState.getProvisioningProgress()) ) diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java index 29d808fb7..6b1593b6e 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java @@ -70,7 +70,7 @@ public class WorkflowState implements ToXContentObject, Writeable { * @param provisionEndTime Indicates the end time of the whole provisioning flow * @param user The user extracted from the thread context from the request * @param userOutputs A map of essential API responses for backend to use and lookup. - * @param resourcesCreated A map of all the resources created. + * @param resourcesCreated A list of all the resources created. */ public WorkflowState( String workflowId, diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java index a7c8711dd..ebef48c86 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Locale; +import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; @@ -62,6 +63,7 @@ public List routes() { @Override protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { String workflowId = request.param(WORKFLOW_ID); + request.param(CLEAR_STATUS); // consume and ignore, we will pass params to workflow try { if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) { throw new FlowFrameworkException( @@ -78,7 +80,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request if (workflowId == null) { throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); } - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, request.params()); return channel -> client.execute(DeleteWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java index b95c75a56..bdf4df35f 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java @@ -83,7 +83,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request try { FlowFrameworkException ex = exception instanceof FlowFrameworkException ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); + : new FlowFrameworkException("Failed to get workflow status.", ExceptionsHelper.status(exception)); XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java index 0e4699f20..151c3da7c 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.util.Booleans; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.support.ActionFilters; @@ -24,6 +25,7 @@ import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; /** @@ -65,10 +67,12 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener stateListener = ActionListener.wrap(response -> { logger.info("Deleted workflow state doc: {}", workflowId); }, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); }); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, canDelete -> { + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> { if (Boolean.TRUE.equals(canDelete)) { flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener); } diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java index f2303a606..9625ce731 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java @@ -83,11 +83,11 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); } } else { - listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND)); + listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND)); } }, e -> { if (e instanceof IndexNotFoundException) { - listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND)); + listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND)); } else { String errorMessage = "Failed to get workflow status of: " + workflowId; logger.error(errorMessage, e); diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 2eaff69b4..9a1d89c2e 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -28,6 +28,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Request; import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.common.settings.Settings; @@ -454,10 +455,22 @@ protected Response deprovisionWorkflow(RestClient client, String workflowId) thr * @throws Exception if the request fails */ protected Response deleteWorkflow(RestClient client, String workflowId) throws Exception { + return deleteWorkflow(client, workflowId, ""); + } + + /** + * Helper method to invoke the Delete Workflow Rest Action + * @param client the rest client + * @param workflowId the workflow ID to delete + * @param params a string adding any rest path params + * @return a rest response + * @throws Exception if the request fails + */ + protected Response deleteWorkflow(RestClient client, String workflowId, String params) throws Exception { return TestHelpers.makeRequest( client, "DELETE", - String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, workflowId), + String.format(Locale.ROOT, "%s/%s%s", WORKFLOW_URI, workflowId, params), Collections.emptyMap(), "", null @@ -481,7 +494,6 @@ protected Response getWorkflowStatus(RestClient client, String workflowId, boole "", null ); - } /** @@ -586,7 +598,7 @@ protected SearchResponse searchWorkflowState(RestClient client, String query) th } /** - * Helper method to invoke the Get Workflow Rest Action and assert the provisioning and state status + * Helper method to invoke the Get Workflow Status Rest Action and assert the provisioning and state status * @param client the rest client * @param workflowId the workflow ID to get the status * @param stateStatus the state status name @@ -607,6 +619,17 @@ protected void getAndAssertWorkflowStatus( assertEquals(provisioningStatus.name(), (String) responseMap.get(CommonValue.PROVISIONING_PROGRESS_FIELD)); } + /** + * Helper method to invoke the Get Workflow Status Rest Action and assert document is not found + * @param client the rest client + * @param workflowId the workflow ID to get the status + * @throws Exception if the request fails + */ + protected void getAndAssertWorkflowStatusNotFound(RestClient client, String workflowId) throws Exception { + ResponseException ex = assertThrows(ResponseException.class, () -> getWorkflowStatus(client, workflowId, true)); + assertEquals(RestStatus.NOT_FOUND.getStatus(), ex.getResponse().getStatusLine().getStatusCode()); + } + /** * Helper method to invoke the Get Workflow status Rest Action and get the error field * @param client the rest client diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index ad2ebca0e..f55cb3bc1 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -72,6 +72,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@SuppressWarnings("deprecation") public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase { @Mock private Client client; @@ -262,19 +263,10 @@ public void testInitIndexIfAbsent_IndexNotPresent() { public void testIsWorkflowProvisionedFailedParsing() { String documentId = randomAlphaOfLength(5); + @SuppressWarnings("unchecked") Consumer> function = mock(Consumer.class); + @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); - WorkflowState workFlowState = new WorkflowState( - documentId, - "test", - "PROVISIONING", - "IN_PROGRESS", - Instant.now(), - Instant.now(), - TestHelpers.randomUser(), - Collections.emptyMap(), - Collections.emptyList() - ); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(1); @@ -318,7 +310,7 @@ public void testCanDeleteWorkflowStateDoc() { return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertTrue(canDelete); }, listener); + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertTrue(canDelete); }, listener); } public void testCanNotDeleteWorkflowStateDocInProgress() { @@ -347,10 +339,10 @@ public void testCanNotDeleteWorkflowStateDocInProgress() { return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener); + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertFalse(canDelete); }, listener); } - public void testCanNotDeleteWorkflowStateDocResourcesExist() { + public void testDeleteWorkflowStateDocResourcesExist() { String documentId = randomAlphaOfLength(5); @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); @@ -376,12 +368,18 @@ public void testCanNotDeleteWorkflowStateDocResourcesExist() { return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener); + // Can't delete because resources exist + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertFalse(canDelete); }, listener); + + // But can delete if clearStatus set true + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertTrue(canDelete); }, listener); } public void testDoesTemplateExist() { String documentId = randomAlphaOfLength(5); + @SuppressWarnings("unchecked") Consumer function = mock(Consumer.class); + @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(1); diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index c516978f0..7f51cd276 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -175,6 +175,17 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception { assertNotNull(resourcesCreated.get(0).resourceId()); assertEquals("deploy_model", resourcesCreated.get(1).workflowStepName()); assertNotNull(resourcesCreated.get(1).resourceId()); + + // Delete the workflow without deleting the resources + Response deleteResponse = deleteWorkflow(client(), workflowId); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); + + // Verify state doc is not deleted + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); }, + 30, + TimeUnit.SECONDS + ); } public void testCreateAndProvisionCyclicalTemplate() throws Exception { @@ -235,6 +246,13 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception { assertNotNull(resourcesCreated.get(1).resourceId()); assertEquals("deploy_model", resourcesCreated.get(2).workflowStepName()); assertNotNull(resourcesCreated.get(2).resourceId()); + + // Delete the workflow without deleting the resources + Response deleteResponse = deleteWorkflow(client(), workflowId, "?clear_status=true"); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); + + // Verify state doc is deleted + assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS); } public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { @@ -305,6 +323,9 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { // Hit Delete API Response deleteResponse = deleteWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); + + // Verify state doc is deleted + assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS); } public void testTimestamps() throws Exception { From 4ca0b7102eada35cd2063a242abe01008028ab52 Mon Sep 17 00:00:00 2001 From: martinpkr Date: Sat, 1 Jun 2024 02:10:06 +0300 Subject: [PATCH 09/16] refactored method to use parseBoolean and parseFloat methods Signed-off-by: martinpkr --- .../java/org/opensearch/flowframework/util/ParseUtils.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index 97b74ae31..b257d2c3f 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; +import org.opensearch.common.Booleans; import org.opensearch.common.io.Streams; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; @@ -488,9 +489,9 @@ public static T parseIfExists(Map inputs, String key, Class< Object value = inputs.get(key); if (type == Boolean.class) { - return type.cast(Boolean.valueOf(value.toString())); + return type.cast(Booleans.parseBoolean(value.toString())); } else if (type == Float.class) { - return type.cast(Float.valueOf(value.toString())); + return type.cast(Float.parseFloat(value.toString())); } else { throw new IllegalArgumentException("Unsupported type: " + type); } From 7c62fd9b3af39f02e0b0f401e06781d929b45b4d Mon Sep 17 00:00:00 2001 From: martinpkr Date: Sat, 1 Jun 2024 13:18:05 +0300 Subject: [PATCH 10/16] Adding a missing param in javaDoc Signed-off-by: martinpkr --- src/main/java/org/opensearch/flowframework/util/ParseUtils.java | 2 ++ .../flowframework/workflow/RegisterRemoteModelStep.java | 2 +- .../java/org/opensearch/flowframework/workflow/ReindexStep.java | 1 - .../java/org/opensearch/flowframework/workflow/ToolStep.java | 2 +- .../java/org/opensearch/flowframework/util/ParseUtilsTests.java | 2 -- 5 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index b257d2c3f..7cd8645fe 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -477,9 +477,11 @@ public static Map convertStringToObjectMapToStringToStringMap(Ma /** * Checks if the inputs map contains the specified key and parses the associated value to a generic class. * + * @param the type to which the value should be parsed * @param inputs the map containing the input data * @param key the key to check in the map * @param type the class to parse the value to + * @throws IllegalArgumentException if the type is not supported * @return the generic type value associated with the key if present, or null if the key is not found */ public static T parseIfExists(Map inputs, String key, Class type) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java index b585ad92d..25c0de272 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java @@ -99,7 +99,7 @@ public PlainActionFuture execute( String connectorId = (String) inputs.get(CONNECTOR_ID); Guardrails guardRails = (Guardrails) inputs.get(GUARDRAILS_FIELD); String modelInterface = (String) inputs.get(INTERFACE_FIELD); - final Boolean deploy = ParseUtils.parseIfExists(inputs, DEPLOY_FIELD,Boolean.class); + final Boolean deploy = ParseUtils.parseIfExists(inputs, DEPLOY_FIELD, Boolean.class); MLRegisterModelInputBuilder builder = MLRegisterModelInput.builder() .functionName(FunctionName.REMOTE) diff --git a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java index c3b7cc52d..c0da6369c 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java @@ -13,7 +13,6 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; -import org.opensearch.common.Booleans; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.rest.RestStatus; diff --git a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java index 909e6b0ad..7f9bd609d 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java @@ -63,7 +63,7 @@ public PlainActionFuture execute( String type = (String) inputs.get(TYPE); String name = (String) inputs.get(NAME_FIELD); String description = (String) inputs.get(DESCRIPTION_FIELD); - Boolean includeOutputInAgentResponse = ParseUtils.parseIfExists(inputs ,INCLUDE_OUTPUT_IN_AGENT_RESPONSE, Boolean.class); + Boolean includeOutputInAgentResponse = ParseUtils.parseIfExists(inputs, INCLUDE_OUTPUT_IN_AGENT_RESPONSE, Boolean.class); Map parameters = getToolsParametersMap(inputs.get(PARAMETERS_FIELD), previousNodeInputs, outputs); MLToolSpec.MLToolSpecBuilder builder = MLToolSpec.builder(); diff --git a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java index 2cf1e9ea3..193616b20 100644 --- a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java @@ -8,8 +8,6 @@ */ package org.opensearch.flowframework.util; -import org.junit.Test; -import org.junit.rules.ExpectedException; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentBuilder; From f8d9b2bb951d740118f0c9c931451ee480b9c479 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Thu, 30 May 2024 14:20:15 -0700 Subject: [PATCH 11/16] Added workflow step for ReIndex Step (#718) * Initial commit for reindex workflow step with extra params Signed-off-by: owaiskazi19 * Addressed PR comments Signed-off-by: owaiskazi19 * Changed request per second to Float Signed-off-by: owaiskazi19 * Addressed string array for source indices and removed state index entry Signed-off-by: owaiskazi19 * Minor comments Signed-off-by: owaiskazi19 --------- Signed-off-by: owaiskazi19 Signed-off-by: martinpkr --- CHANGELOG.md | 3 +++ .../flowframework/workflow/ReindexStep.java | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5021e3f14..6f4c568ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,10 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Features ### Enhancements - Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718)) +<<<<<<< HEAD - Add param to delete workflow API to clear status even if resources exist ([#719](https://github.com/opensearch-project/flow-framework/pull/719)) +======= +>>>>>>> 4ee2171 (Added workflow step for ReIndex Step (#718)) ### Bug Fixes - Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705)) diff --git a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java index c0da6369c..b57723db0 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java @@ -13,6 +13,10 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; +<<<<<<< HEAD +======= +import org.opensearch.common.Booleans; +>>>>>>> 4ee2171 (Added workflow step for ReIndex Step (#718)) import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.rest.RestStatus; @@ -90,9 +94,17 @@ public PlainActionFuture execute( String sourceIndices = (String) inputs.get(SOURCE_INDEX); String destinationIndex = (String) inputs.get(DESTINATION_INDEX); +<<<<<<< HEAD Boolean refresh = ParseUtils.parseIfExists(inputs, REFRESH, Boolean.class); Float requestsPerSecond = ParseUtils.parseIfExists(inputs, REQUESTS_PER_SECOND, Float.class); Boolean requireAlias = ParseUtils.parseIfExists(inputs, REQUIRE_ALIAS, Boolean.class); +======= + Boolean refresh = inputs.containsKey(REFRESH) ? Booleans.parseBoolean(inputs.get(REFRESH).toString()) : null; + Float requestsPerSecond = inputs.containsKey(REQUESTS_PER_SECOND) + ? Float.parseFloat(inputs.get(REQUESTS_PER_SECOND).toString()) + : null; + Boolean requireAlias = inputs.containsKey(REQUIRE_ALIAS) ? Booleans.parseBoolean(inputs.get(REQUIRE_ALIAS).toString()) : null; +>>>>>>> 4ee2171 (Added workflow step for ReIndex Step (#718)) Integer slices = (Integer) inputs.get(SLICES); Integer maxDocs = (Integer) inputs.get(MAX_DOCS); From d574dab2d150d051632ed974d34dabd105a063de Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 30 May 2024 18:00:51 -0700 Subject: [PATCH 12/16] Add param to delete workflow API to clear status even if resources exist (#719) Signed-off-by: Daniel Widdis Signed-off-by: martinpkr --- CHANGELOG.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f4c568ed..5021e3f14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Features ### Enhancements - Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718)) -<<<<<<< HEAD - Add param to delete workflow API to clear status even if resources exist ([#719](https://github.com/opensearch-project/flow-framework/pull/719)) -======= ->>>>>>> 4ee2171 (Added workflow step for ReIndex Step (#718)) ### Bug Fixes - Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705)) From 84d33dfb62bdf3289257e3359bd63ae160daf2f7 Mon Sep 17 00:00:00 2001 From: martinpkr Date: Sat, 1 Jun 2024 13:30:47 +0300 Subject: [PATCH 13/16] Added a chagelog entry Signed-off-by: martinpkr --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5021e3f14..375525627 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Enhancements - Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718)) - Add param to delete workflow API to clear status even if resources exist ([#719](https://github.com/opensearch-project/flow-framework/pull/719)) - +- Add a utility method to parse Float and Boolean values if they exist ([#721](https://github.com/opensearch-project/flow-framework/pull/721)) ### Bug Fixes - Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705)) From ff536e684bd54f9f2a3ad38a88d8706506569d10 Mon Sep 17 00:00:00 2001 From: martinpkr Date: Sun, 2 Jun 2024 21:03:10 +0300 Subject: [PATCH 14/16] fixed failing spotless check Signed-off-by: martinpkr --- .../flowframework/workflow/ReindexStep.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java index b57723db0..c0da6369c 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java @@ -13,10 +13,6 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; -<<<<<<< HEAD -======= -import org.opensearch.common.Booleans; ->>>>>>> 4ee2171 (Added workflow step for ReIndex Step (#718)) import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.rest.RestStatus; @@ -94,17 +90,9 @@ public PlainActionFuture execute( String sourceIndices = (String) inputs.get(SOURCE_INDEX); String destinationIndex = (String) inputs.get(DESTINATION_INDEX); -<<<<<<< HEAD Boolean refresh = ParseUtils.parseIfExists(inputs, REFRESH, Boolean.class); Float requestsPerSecond = ParseUtils.parseIfExists(inputs, REQUESTS_PER_SECOND, Float.class); Boolean requireAlias = ParseUtils.parseIfExists(inputs, REQUIRE_ALIAS, Boolean.class); -======= - Boolean refresh = inputs.containsKey(REFRESH) ? Booleans.parseBoolean(inputs.get(REFRESH).toString()) : null; - Float requestsPerSecond = inputs.containsKey(REQUESTS_PER_SECOND) - ? Float.parseFloat(inputs.get(REQUESTS_PER_SECOND).toString()) - : null; - Boolean requireAlias = inputs.containsKey(REQUIRE_ALIAS) ? Booleans.parseBoolean(inputs.get(REQUIRE_ALIAS).toString()) : null; ->>>>>>> 4ee2171 (Added workflow step for ReIndex Step (#718)) Integer slices = (Integer) inputs.get(SLICES); Integer maxDocs = (Integer) inputs.get(MAX_DOCS); From 180cd1e420704a311a9ec754acb869439b9f85f8 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Thu, 30 May 2024 14:20:15 -0700 Subject: [PATCH 15/16] Added workflow step for ReIndex Step (#718) * Initial commit for reindex workflow step with extra params Signed-off-by: owaiskazi19 * Addressed PR comments Signed-off-by: owaiskazi19 * Changed request per second to Float Signed-off-by: owaiskazi19 * Addressed string array for source indices and removed state index entry Signed-off-by: owaiskazi19 * Minor comments Signed-off-by: owaiskazi19 --------- Signed-off-by: owaiskazi19 Signed-off-by: martinpkr --- .../opensearch/flowframework/workflow/ReindexStep.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java index c0da6369c..bc335db97 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java @@ -13,6 +13,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; +import org.opensearch.common.Booleans; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.rest.RestStatus; @@ -90,9 +91,11 @@ public PlainActionFuture execute( String sourceIndices = (String) inputs.get(SOURCE_INDEX); String destinationIndex = (String) inputs.get(DESTINATION_INDEX); - Boolean refresh = ParseUtils.parseIfExists(inputs, REFRESH, Boolean.class); - Float requestsPerSecond = ParseUtils.parseIfExists(inputs, REQUESTS_PER_SECOND, Float.class); - Boolean requireAlias = ParseUtils.parseIfExists(inputs, REQUIRE_ALIAS, Boolean.class); + Boolean refresh = inputs.containsKey(REFRESH) ? Booleans.parseBoolean(inputs.get(REFRESH).toString()) : null; + Float requestsPerSecond = inputs.containsKey(REQUESTS_PER_SECOND) + ? Float.parseFloat(inputs.get(REQUESTS_PER_SECOND).toString()) + : null; + Boolean requireAlias = inputs.containsKey(REQUIRE_ALIAS) ? Booleans.parseBoolean(inputs.get(REQUIRE_ALIAS).toString()) : null; Integer slices = (Integer) inputs.get(SLICES); Integer maxDocs = (Integer) inputs.get(MAX_DOCS); From ec25f9181baaf7e7aa2505727ab08f0951b36042 Mon Sep 17 00:00:00 2001 From: martinpkr Date: Sun, 2 Jun 2024 21:37:19 +0300 Subject: [PATCH 16/16] removed unnecessary changelog info Signed-off-by: martinpkr --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 375525627..080555235 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,6 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Enhancements - Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718)) - Add param to delete workflow API to clear status even if resources exist ([#719](https://github.com/opensearch-project/flow-framework/pull/719)) -- Add a utility method to parse Float and Boolean values if they exist ([#721](https://github.com/opensearch-project/flow-framework/pull/721)) ### Bug Fixes - Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705))