From eb430be0c0d3994d622fcc63ec1de3d07a5b2d2e Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Mon, 20 Nov 2023 19:31:20 +0000 Subject: [PATCH 1/3] adding check to init GC index if absent before max workflow check Signed-off-by: Amit Galitzky --- .../CreateWorkflowTransportAction.java | 142 ++++++++++-------- 1 file changed, 80 insertions(+), 62 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index d4f6b3b3f..1b8220873 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -39,6 +39,7 @@ import java.util.List; +import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD; import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD; import static org.opensearch.flowframework.util.ParseUtils.getUserContext; @@ -109,83 +110,100 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - if (!max) { - String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows(); - logger.error(errorMessage); - FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST); - listener.onFailure(ffe); - return; - } else { - // Create new global context and state index entries - flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> { - flowFrameworkIndicesHandler.putInitialStateToWorkflowState( - globalContextResponse.getId(), - user, - ActionListener.wrap(stateResponse -> { - logger.info("create state workflow doc"); - listener.onResponse(new WorkflowResponse(globalContextResponse.getId())); + flowFrameworkIndicesHandler.initGlobalContextIndexIfAbsent(ActionListener.wrap(indexCreated -> { + if (!indexCreated) { + listener.onFailure(new FlowFrameworkException("No response to create global_context index", INTERNAL_SERVER_ERROR)); + return; + } + if (request.getWorkflowId() == null) { + // Throttle incoming requests + checkMaxWorkflows(request.getRequestTimeout(), request.getMaxWorkflows(), ActionListener.wrap(max -> { + if (!max) { + String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows(); + logger.error(errorMessage); + FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST); + listener.onFailure(ffe); + return; + } else { + // Create new global context and state index entries + flowFrameworkIndicesHandler.putTemplateToGlobalContext( + templateWithUser, + ActionListener.wrap(globalContextResponse -> { + flowFrameworkIndicesHandler.putInitialStateToWorkflowState( + globalContextResponse.getId(), + user, + ActionListener.wrap(stateResponse -> { + logger.info("create state workflow doc"); + listener.onResponse(new WorkflowResponse(globalContextResponse.getId())); + }, exception -> { + logger.error("Failed to save workflow state : {}", exception.getMessage()); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)); + } + }) + ); }, exception -> { - logger.error("Failed to save workflow state : {}", exception.getMessage()); + logger.error("Failed to save use case template : {}", exception.getMessage()); if (exception instanceof FlowFrameworkException) { listener.onFailure(exception); } else { - listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)); + listener.onFailure( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + } + + }) + ); + } + }, e -> { + logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage()); + if (e instanceof FlowFrameworkException) { + listener.onFailure(e); + } else { + listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); + } + })); + } else { + // Update existing entry, full document replacement + flowFrameworkIndicesHandler.updateTemplateInGlobalContext( + request.getWorkflowId(), + request.getTemplate(), + ActionListener.wrap(response -> { + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( + request.getWorkflowId(), + ImmutableMap.of(STATE_FIELD, State.NOT_STARTED, PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED), + ActionListener.wrap(updateResponse -> { + logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); + listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + }, exception -> { + logger.error("Failed to update workflow state : {}", exception.getMessage()); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); } }) ); }, exception -> { - logger.error("Failed to save use case template : {}", exception.getMessage()); + logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage()); if (exception instanceof FlowFrameworkException) { listener.onFailure(exception); } else { listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); } - })); - } - }, e -> { - logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage()); - if (e instanceof FlowFrameworkException) { - listener.onFailure(e); - } else { - listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); - } - })); - } else { - // Update existing entry, full document replacement - flowFrameworkIndicesHandler.updateTemplateInGlobalContext( - request.getWorkflowId(), - request.getTemplate(), - ActionListener.wrap(response -> { - flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( - request.getWorkflowId(), - ImmutableMap.of(STATE_FIELD, State.NOT_STARTED, PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED), - ActionListener.wrap(updateResponse -> { - logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); - listener.onResponse(new WorkflowResponse(request.getWorkflowId())); - }, exception -> { - logger.error("Failed to update workflow state : {}", exception.getMessage()); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); - } else { - listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); - } - }) - ); - }, exception -> { - logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage()); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); - } else { - listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); - } + }) + ); + } + }, e -> { + logger.error("Failed to create global_context index", e); + listener.onFailure(new FlowFrameworkException("Failed to create global_context index", INTERNAL_SERVER_ERROR)); + })); - }) - ); - } } /** From f6927db8fc0c6419470bdba54f3da68d5dfea646 Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Mon, 20 Nov 2023 19:54:36 +0000 Subject: [PATCH 2/3] moved check if index exists earlier Signed-off-by: Amit Galitzky --- .../CreateWorkflowTransportAction.java | 146 ++++++++---------- 1 file changed, 66 insertions(+), 80 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index 1b8220873..2d3915ed6 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -39,7 +39,6 @@ import java.util.List; -import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD; import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD; import static org.opensearch.flowframework.util.ParseUtils.getUserContext; @@ -110,100 +109,83 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - if (!indexCreated) { - listener.onFailure(new FlowFrameworkException("No response to create global_context index", INTERNAL_SERVER_ERROR)); - return; - } - if (request.getWorkflowId() == null) { - // Throttle incoming requests - checkMaxWorkflows(request.getRequestTimeout(), request.getMaxWorkflows(), ActionListener.wrap(max -> { - if (!max) { - String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows(); - logger.error(errorMessage); - FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST); - listener.onFailure(ffe); - return; - } else { - // Create new global context and state index entries - flowFrameworkIndicesHandler.putTemplateToGlobalContext( - templateWithUser, - ActionListener.wrap(globalContextResponse -> { - flowFrameworkIndicesHandler.putInitialStateToWorkflowState( - globalContextResponse.getId(), - user, - ActionListener.wrap(stateResponse -> { - logger.info("create state workflow doc"); - listener.onResponse(new WorkflowResponse(globalContextResponse.getId())); - }, exception -> { - logger.error("Failed to save workflow state : {}", exception.getMessage()); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); - } else { - listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)); - } - }) - ); - }, exception -> { - logger.error("Failed to save use case template : {}", exception.getMessage()); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); - } else { - listener.onFailure( - new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) - ); - } - - }) - ); - } - }, e -> { - logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage()); - if (e instanceof FlowFrameworkException) { - listener.onFailure(e); - } else { - listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); - } - })); - } else { - // Update existing entry, full document replacement - flowFrameworkIndicesHandler.updateTemplateInGlobalContext( - request.getWorkflowId(), - request.getTemplate(), - ActionListener.wrap(response -> { - flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( - request.getWorkflowId(), - ImmutableMap.of(STATE_FIELD, State.NOT_STARTED, PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED), - ActionListener.wrap(updateResponse -> { - logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); - listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + if (request.getWorkflowId() == null) { + // Throttle incoming requests + checkMaxWorkflows(request.getRequestTimeout(), request.getMaxWorkflows(), ActionListener.wrap(max -> { + if (!max) { + String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows(); + logger.error(errorMessage); + FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST); + listener.onFailure(ffe); + return; + } else { + // Create new global context and state index entries + flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> { + flowFrameworkIndicesHandler.putInitialStateToWorkflowState( + globalContextResponse.getId(), + user, + ActionListener.wrap(stateResponse -> { + logger.info("create state workflow doc"); + listener.onResponse(new WorkflowResponse(globalContextResponse.getId())); }, exception -> { - logger.error("Failed to update workflow state : {}", exception.getMessage()); + logger.error("Failed to save workflow state : {}", exception.getMessage()); if (exception instanceof FlowFrameworkException) { listener.onFailure(exception); } else { - listener.onFailure( - new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) - ); + listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)); } }) ); }, exception -> { - logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage()); + logger.error("Failed to save use case template : {}", exception.getMessage()); if (exception instanceof FlowFrameworkException) { listener.onFailure(exception); } else { listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); } - }) - ); - } - }, e -> { - logger.error("Failed to create global_context index", e); - listener.onFailure(new FlowFrameworkException("Failed to create global_context index", INTERNAL_SERVER_ERROR)); - })); + })); + } + }, e -> { + logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage()); + if (e instanceof FlowFrameworkException) { + listener.onFailure(e); + } else { + listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); + } + })); + } else { + // Update existing entry, full document replacement + flowFrameworkIndicesHandler.updateTemplateInGlobalContext( + request.getWorkflowId(), + request.getTemplate(), + ActionListener.wrap(response -> { + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( + request.getWorkflowId(), + ImmutableMap.of(STATE_FIELD, State.NOT_STARTED, PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED), + ActionListener.wrap(updateResponse -> { + logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); + listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + }, exception -> { + logger.error("Failed to update workflow state : {}", exception.getMessage()); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); + } + }) + ); + }, exception -> { + logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage()); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); + } + }) + ); + } } /** @@ -213,6 +195,10 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener internalListener) { + if (!flowFrameworkIndicesHandler.doesIndexExist(CommonValue.GLOBAL_CONTEXT_INDEX)) { + internalListener.onResponse(true); + return; + } QueryBuilder query = QueryBuilders.matchAllQuery(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeOut); From 4ecbef3fb5623ae33f4d543c19a7c8b5a696d5f1 Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Mon, 20 Nov 2023 20:37:21 +0000 Subject: [PATCH 3/3] added test and switched to if/else Signed-off-by: Amit Galitzky --- .../CreateWorkflowTransportAction.java | 30 +++++++++---------- .../CreateWorkflowTransportActionTests.java | 15 ++++++++++ 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index 2d3915ed6..a77e7dd79 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -197,23 +197,23 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener internalListener) { if (!flowFrameworkIndicesHandler.doesIndexExist(CommonValue.GLOBAL_CONTEXT_INDEX)) { internalListener.onResponse(true); - return; - } - QueryBuilder query = QueryBuilders.matchAllQuery(); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeOut); + } else { + QueryBuilder query = QueryBuilders.matchAllQuery(); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeOut); - SearchRequest searchRequest = new SearchRequest(CommonValue.GLOBAL_CONTEXT_INDEX).source(searchSourceBuilder); + SearchRequest searchRequest = new SearchRequest(CommonValue.GLOBAL_CONTEXT_INDEX).source(searchSourceBuilder); - client.search(searchRequest, ActionListener.wrap(searchResponse -> { - if (searchResponse.getHits().getTotalHits().value >= maxWorkflow) { - internalListener.onResponse(false); - } else { - internalListener.onResponse(true); - } - }, exception -> { - logger.error("Unable to fetch the workflows {}", exception); - internalListener.onFailure(new FlowFrameworkException("Unable to fetch the workflows", RestStatus.BAD_REQUEST)); - })); + client.search(searchRequest, ActionListener.wrap(searchResponse -> { + if (searchResponse.getHits().getTotalHits().value >= maxWorkflow) { + internalListener.onResponse(false); + } else { + internalListener.onResponse(true); + } + }, exception -> { + logger.error("Unable to fetch the workflows {}", exception); + internalListener.onFailure(new FlowFrameworkException("Unable to fetch the workflows", RestStatus.BAD_REQUEST)); + })); + } } private void validateWorkflows(Template template) throws Exception { diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index 5492ad822..22d831f2b 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.mockito.ArgumentCaptor; @@ -198,6 +199,20 @@ public void testMaxWorkflow() { assertEquals(("Maximum workflows limit reached 1000"), exceptionCaptor.getValue().getMessage()); } + public void testMaxWorkflowWithNoIndex() { + @SuppressWarnings("unchecked") + ActionListener listener = new ActionListener() { + @Override + public void onResponse(Boolean booleanResponse) { + assertTrue(booleanResponse); + } + + @Override + public void onFailure(Exception e) {} + }; + createWorkflowTransportAction.checkMaxWorkflows(new TimeValue(10, TimeUnit.SECONDS), 10, listener); + } + public void testFailedToCreateNewWorkflow() { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class);