From 0a58e636f165c405a5ce510cea49635bb9943468 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 3 Jan 2024 13:04:31 -0800 Subject: [PATCH] Get installed plugins from local node and add a timeout Signed-off-by: Daniel Widdis --- .../workflow/WorkflowProcessSorter.java | 62 ++++++++++++------- .../workflow/WorkflowProcessSorterTests.java | 40 ++++++++---- 2 files changed, 66 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index b865786e2..46a895beb 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -10,9 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; @@ -79,9 +79,9 @@ public WorkflowProcessSorter( this.workflowStepFactory = workflowStepFactory; this.threadPool = threadPool; this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it); this.clusterService = clusterService; this.client = client; - clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it); } /** @@ -153,31 +153,45 @@ public void validate(List processNodes) throws Exception { * @throws Exception on validation failure */ public void validatePluginsInstalled(List processNodes, WorkflowValidator validator) throws Exception { - - // Retrieve node information to ascertain installed plugins - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.clear().addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); - CompletableFuture> installedPluginsFuture = new CompletableFuture<>(); - client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(response -> { - List installedPlugins = new ArrayList<>(); - - // Retrieve installed plugin names from the local node - String localNodeId = clusterService.state().getNodes().getLocalNodeId(); - NodeInfo info = response.getNodesMap().get(localNodeId); - PluginsAndModules plugins = info.getInfo(PluginsAndModules.class); - for (PluginInfo pluginInfo : plugins.getPluginInfos()) { - installedPlugins.add(pluginInfo.getName()); - } - - installedPluginsFuture.complete(installedPlugins); - - }, exception -> { - logger.error("Failed to retrieve installed plugins"); - installedPluginsFuture.completeExceptionally(exception); + final CompletableFuture> installedPluginsFuture = new CompletableFuture<>(); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear().nodes(true).local(true); + client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(stateResponse -> { + final String localNodeId = stateResponse.getState().nodes().getLocalNodeId(); + + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.clear().addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(infoResponse -> { + // Retrieve installed plugin names from the local node + try { + installedPluginsFuture.complete( + infoResponse.getNodesMap() + .get(localNodeId) + .getInfo(PluginsAndModules.class) + .getPluginInfos() + .stream() + .map(PluginInfo::getName) + .collect(Collectors.toList()) + ); + } catch (Exception e) { + logger.error("Failed to retrieve installed plugins from local node"); + installedPluginsFuture.completeExceptionally(e); + } + }, infoException -> { + logger.error("Failed to retrieve installed plugins"); + installedPluginsFuture.completeExceptionally(infoException); + })); + }, stateException -> { + logger.error("Failed to retrieve cluster state"); + installedPluginsFuture.completeExceptionally(stateException); })); // Block execution until installed plugin list is returned - List installedPlugins = installedPluginsFuture.get(); + List installedPlugins = installedPluginsFuture.orTimeout( + NODE_TIMEOUT_DEFAULT_VALUE.duration(), + NODE_TIMEOUT_DEFAULT_VALUE.timeUnit() + ).get(); // Iterate through process nodes in graph for (ProcessNode processNode : processNodes) { diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index 41a65141f..bae7afd54 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -12,6 +12,8 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.ClusterAdminClient; @@ -426,12 +428,19 @@ public void testSuccessfulInstalledPluginValidation() throws Exception { when(client.admin()).thenReturn(adminClient); when(adminClient.cluster()).thenReturn(clusterAdminClient); - // Mock and stub the clusterservice to get the local node - ClusterState clusterState = mock(ClusterState.class); - DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); - when(clusterService.state()).thenReturn(clusterState); - when(clusterState.getNodes()).thenReturn(discoveryNodes); - when(discoveryNodes.getLocalNodeId()).thenReturn("123"); + // Stub cluster state request + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + + ClusterStateResponse response = mock(ClusterStateResponse.class); + ClusterState clusterState = mock(ClusterState.class); + DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); + when(response.getState()).thenReturn(clusterState); + when(clusterState.nodes()).thenReturn(discoveryNodes); + when(discoveryNodes.getLocalNodeId()).thenReturn("123"); + listener.onResponse(response); + return null; + }).when(clusterAdminClient).state(any(ClusterStateRequest.class), any()); // Stub cluster admin client's node info request doAnswer(invocation -> { @@ -510,12 +519,19 @@ public void testFailedInstalledPluginValidation() throws Exception { when(client.admin()).thenReturn(adminClient); when(adminClient.cluster()).thenReturn(clusterAdminClient); - // Mock and stub the clusterservice to get the local node - ClusterState clusterState = mock(ClusterState.class); - DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); - when(clusterService.state()).thenReturn(clusterState); - when(clusterState.getNodes()).thenReturn(discoveryNodes); - when(discoveryNodes.getLocalNodeId()).thenReturn("123"); + // Stub cluster state request + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + + ClusterStateResponse response = mock(ClusterStateResponse.class); + ClusterState clusterState = mock(ClusterState.class); + DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); + when(response.getState()).thenReturn(clusterState); + when(clusterState.nodes()).thenReturn(discoveryNodes); + when(discoveryNodes.getLocalNodeId()).thenReturn("123"); + listener.onResponse(response); + return null; + }).when(clusterAdminClient).state(any(ClusterStateRequest.class), any()); // Stub cluster admin client's node info request doAnswer(invocation -> {