From d2ef37d9eee123a20e6023cc61d3e58afce6bd2d Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Thu, 11 Jul 2024 19:56:03 +0530 Subject: [PATCH 01/13] Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call Signed-off-by: Pranshu Shukla --- .../node/info/TransportNodesInfoAction.java | 2 +- .../node/stats/TransportNodesStatsAction.java | 2 +- .../stats/TransportClusterStatsAction.java | 2 +- .../support/nodes/BaseNodesRequest.java | 16 +++ .../support/nodes/TransportNodesAction.java | 32 ++++- .../admin/cluster/RestClusterStatsAction.java | 1 + .../admin/cluster/RestNodesInfoAction.java | 2 +- .../admin/cluster/RestNodesStatsAction.java | 1 + .../rest/action/cat/RestNodesAction.java | 2 + .../TransportClusterStatsActionTests.java | 134 ++++++++++++++++++ .../nodes/TransportNodesActionTests.java | 13 +- .../nodes/TransportNodesInfoActionTests.java | 131 +++++++++++++++++ .../nodes/TransportNodesStatsActionTests.java | 130 +++++++++++++++++ 13 files changed, 454 insertions(+), 14 deletions(-) create mode 100644 server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java create mode 100644 server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java create mode 100644 server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index 2c4f8522a5a5c..dda54cce334ec 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -129,7 +129,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) { */ public static class NodeInfoRequest extends TransportRequest { - NodesInfoRequest request; + protected NodesInfoRequest request; public NodeInfoRequest(StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 2e93e5e7841cb..2c808adc97c7a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -140,7 +140,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { */ public static class NodeStatsRequest extends TransportRequest { - NodesStatsRequest request; + protected NodesStatsRequest request; public NodeStatsRequest(StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index e4f483f796f44..c7d03596a2a36 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -223,7 +223,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq */ public static class ClusterStatsNodeRequest extends TransportRequest { - ClusterStatsRequest request; + protected ClusterStatsRequest request; public ClusterStatsNodeRequest(StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index 4d54ce51c923c..ab660a81f83db 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -65,6 +65,14 @@ public abstract class BaseNodesRequest * will be ignored and this will be used. * */ private DiscoveryNode[] concreteNodes; + + /** + * Since do not use the discovery nodes coming from the request in all code paths following a request extended off from + * BaseNodeRequest, we do not require it to sent around across all nodes. + * + * Setting default behavior as `true` but can be explicitly changed in requests that do not require. + */ + private boolean populateDiscoveryNodesInTransportRequest = true; private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30); private TimeValue timeout; @@ -119,6 +127,14 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) { this.concreteNodes = concreteNodes; } + public void populateDiscoveryNodesInTransportRequest(boolean value) { + populateDiscoveryNodesInTransportRequest = value; + } + + public boolean populateDiscoveryNodesInTransportRequest() { + return populateDiscoveryNodesInTransportRequest; + } + @Override public ActionRequestValidationException validate() { return null; diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index 9a1a28dd70636..042374b772980 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -209,6 +209,15 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) { request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new)); } + /** + * Return the concrete nodes from the request node ids which will be later used for routing requests to nodes. + **/ + protected DiscoveryNode[] resolveConcreteNodes(NodesRequest request, ClusterState clusterState) { + assert request.concreteNodes() == null : "request concreteNodes shouldn't be set"; + String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds()); + return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new); + } + /** * Get a backwards compatible transport action name */ @@ -226,6 +235,7 @@ class AsyncAction { private final NodesRequest request; private final ActionListener listener; private final AtomicReferenceArray responses; + private final DiscoveryNode[] concreteNodes; private final AtomicInteger counter = new AtomicInteger(); private final Task task; @@ -234,14 +244,27 @@ class AsyncAction { this.request = request; this.listener = listener; if (request.concreteNodes() == null) { - resolveRequest(request, clusterService.state()); - assert request.concreteNodes() != null; + if (request.populateDiscoveryNodesInTransportRequest()) { + resolveRequest(request, clusterService.state()); + assert request.concreteNodes() != null; + this.concreteNodes = null; + } else { + this.concreteNodes = resolveConcreteNodes(request, clusterService.state()); + assert request.concreteNodes() == null; + } + } else { + this.concreteNodes = null; + } + if (request.concreteNodes() == null) { + assert concreteNodes != null; + this.responses = new AtomicReferenceArray<>(concreteNodes.length); + } else { + this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); } - this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); } void start() { - final DiscoveryNode[] nodes = request.concreteNodes(); + final DiscoveryNode[] nodes = request.concreteNodes() != null ? request.concreteNodes() : concreteNodes; if (nodes.length == 0) { // nothing to notify threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses))); @@ -260,7 +283,6 @@ void start() { if (task != null) { nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); } - transportService.sendRequest( node, getTransportNodeAction(node), diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java index 0766e838210fa..0ea651ba8b7d6 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -66,6 +66,7 @@ public String getName() { public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null)); clusterStatsRequest.timeout(request.param("timeout")); + clusterStatsRequest.populateDiscoveryNodesInTransportRequest(false); return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java index 3b83bf9d6f68c..e5e94f1cefc3b 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java @@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final NodesInfoRequest nodesInfoRequest = prepareRequest(request); nodesInfoRequest.timeout(request.param("timeout")); settingsFilter.addFilterSettingParams(request); - + nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false); return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java index 267bfde576dec..a277b08e7a34c 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java @@ -232,6 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // If no levels are passed in this results in an empty array. String[] levels = Strings.splitStringByCommaToArray(request.param("level")); nodesStatsRequest.indices().setLevels(levels); + nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false); return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index e11012a23fce7..81c2e3b533d62 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -125,6 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.timeout(request.param("timeout")); + nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false); nodesInfoRequest.clear() .addMetrics( NodesInfoRequest.Metric.JVM.metricName(), @@ -137,6 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) { public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); nodesStatsRequest.timeout(request.param("timeout")); + nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false); nodesStatsRequest.clear() .indices(true) .addMetrics( diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java new file mode 100644 index 0000000000000..4c6c4154293f0 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.nodes; + +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest; +import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.indices.IndicesService; +import org.opensearch.node.NodeService; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TransportClusterStatsActionTests extends TransportNodesActionTests { + + /** + * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This + * behavior is asserted in this test. + */ + public void testDefaultBehavior() { + ClusterStatsRequest request = new ClusterStatsRequest(); + request.populateDiscoveryNodesInTransportRequest(true); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNotNull(sentRequest.getDiscoveryNodes()); + assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize()); + }); + }); + } + + /** + * In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is + * asserted in this test. + */ + public void testOptimizedBehavior() { + ClusterStatsRequest request = new ClusterStatsRequest(); + request.populateDiscoveryNodesInTransportRequest(false); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); }); + }); + } + + private Map> performNodesInfoAction(ClusterStatsRequest request) { + TransportNodesAction action = getTestTransportClusterStatsAction(); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + Map> combinedSentRequest = new HashMap<>(); + + capturedRequests.forEach((node, capturedRequestList) -> { + List sentRequestList = new ArrayList<>(); + + capturedRequestList.forEach(preSentRequest -> { + BytesStreamOutput out = new BytesStreamOutput(); + try { + TransportClusterStatsAction.ClusterStatsNodeRequest clusterStatsNodeRequestFromCoordinator = + (TransportClusterStatsAction.ClusterStatsNodeRequest) preSentRequest.request; + clusterStatsNodeRequestFromCoordinator.writeTo(out); + StreamInput in = out.bytes().streamInput(); + MockClusterStatsNodeRequest mockClusterStatsNodeRequest = new MockClusterStatsNodeRequest(in); + sentRequestList.add(mockClusterStatsNodeRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + combinedSentRequest.put(node, sentRequestList); + }); + + return combinedSentRequest; + } + + public TestTransportClusterStatsAction getTestTransportClusterStatsAction() { + return new TestTransportClusterStatsAction( + THREAD_POOL, + clusterService, + transportService, + nodeService, + indicesService, + new ActionFilters(Collections.emptySet()) + ); + } + + private static class TestTransportClusterStatsAction extends TransportClusterStatsAction { + public TestTransportClusterStatsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + NodeService nodeService, + IndicesService indicesService, + ActionFilters actionFilters + ) { + super(threadPool, clusterService, transportService, nodeService, indicesService, actionFilters); + } + } + + private static class MockClusterStatsNodeRequest extends TransportClusterStatsAction.ClusterStatsNodeRequest { + + public MockClusterStatsNodeRequest(StreamInput in) throws IOException { + super(in); + } + + public DiscoveryNode[] getDiscoveryNodes() { + return this.request.concreteNodes(); + } + } +} diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java index 445934b0ccdfd..7e968aa8fb199 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java @@ -46,6 +46,8 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.indices.IndicesService; +import org.opensearch.node.NodeService; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; @@ -76,11 +78,12 @@ public class TransportNodesActionTests extends OpenSearchTestCase { - private static ThreadPool THREAD_POOL; - - private ClusterService clusterService; - private CapturingTransport transport; - private TransportService transportService; + protected static ThreadPool THREAD_POOL; + protected ClusterService clusterService; + protected CapturingTransport transport; + protected TransportService transportService; + protected NodeService nodeService; + protected IndicesService indicesService; public void testRequestIsSentToEachNode() throws Exception { TransportNodesAction action = getTestTransportNodesAction(); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java new file mode 100644 index 0000000000000..3dffaeeb95dff --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java @@ -0,0 +1,131 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.nodes; + +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.TransportNodesInfoAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.node.NodeService; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TransportNodesInfoActionTests extends TransportNodesActionTests { + + /** + * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This + * behavior is asserted in this test. + */ + public void testDefaultBehavior() { + NodesInfoRequest request = new NodesInfoRequest(); + request.populateDiscoveryNodesInTransportRequest(true); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNotNull(sentRequest.getDiscoveryNodes()); + assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize()); + }); + }); + } + + /** + * In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is + * asserted in this test. + */ + public void testOptimizedBehavior() { + NodesInfoRequest request = new NodesInfoRequest(); + request.populateDiscoveryNodesInTransportRequest(false); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); }); + }); + } + + private Map> performNodesInfoAction(NodesInfoRequest request) { + TransportNodesAction action = getTestTransportNodesInfoAction(); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + Map> combinedSentRequest = new HashMap<>(); + + capturedRequests.forEach((node, capturedRequestList) -> { + List sentRequestList = new ArrayList<>(); + + capturedRequestList.forEach(preSentRequest -> { + BytesStreamOutput out = new BytesStreamOutput(); + try { + TransportNodesInfoAction.NodeInfoRequest nodesInfoRequestFromCoordinator = + (TransportNodesInfoAction.NodeInfoRequest) preSentRequest.request; + nodesInfoRequestFromCoordinator.writeTo(out); + StreamInput in = out.bytes().streamInput(); + MockNodesInfoRequest nodesStatsRequest = new MockNodesInfoRequest(in); + sentRequestList.add(nodesStatsRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + combinedSentRequest.put(node, sentRequestList); + }); + + return combinedSentRequest; + } + + public TestTransportNodesInfoAction getTestTransportNodesInfoAction() { + return new TestTransportNodesInfoAction( + THREAD_POOL, + clusterService, + transportService, + nodeService, + new ActionFilters(Collections.emptySet()) + ); + } + + private static class TestTransportNodesInfoAction extends TransportNodesInfoAction { + public TestTransportNodesInfoAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + NodeService nodeService, + ActionFilters actionFilters + ) { + super(threadPool, clusterService, transportService, nodeService, actionFilters); + } + } + + private static class MockNodesInfoRequest extends TransportNodesInfoAction.NodeInfoRequest { + + public MockNodesInfoRequest(StreamInput in) throws IOException { + super(in); + } + + public DiscoveryNode[] getDiscoveryNodes() { + return this.request.concreteNodes(); + } + } +} diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java new file mode 100644 index 0000000000000..a409e7eddc61f --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.nodes; + +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.TransportNodesStatsAction; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.node.NodeService; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TransportNodesStatsActionTests extends TransportNodesActionTests { + + /** + * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This + * behavior is asserted in this test. + */ + public void testDefaultBehavior() { + NodesStatsRequest request = new NodesStatsRequest(); + request.populateDiscoveryNodesInTransportRequest(true); + Map> combinedSentRequest = performNodesStatsAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNotNull(sentRequest.getDiscoveryNodes()); + assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize()); + }); + }); + } + + /** + * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This + * behavior is asserted in this test. + */ + public void testOptimizedBehavior() { + NodesStatsRequest request = new NodesStatsRequest(); + request.populateDiscoveryNodesInTransportRequest(false); + Map> combinedSentRequest = performNodesStatsAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); }); + }); + } + + private Map> performNodesStatsAction(NodesStatsRequest request) { + TransportNodesAction action = getTestTransportNodesStatsAction(); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + Map> combinedSentRequest = new HashMap<>(); + + capturedRequests.forEach((node, capturedRequestList) -> { + List sentRequestList = new ArrayList<>(); + + capturedRequestList.forEach(preSentRequest -> { + BytesStreamOutput out = new BytesStreamOutput(); + try { + TransportNodesStatsAction.NodeStatsRequest nodesStatsRequestFromCoordinator = + (TransportNodesStatsAction.NodeStatsRequest) preSentRequest.request; + nodesStatsRequestFromCoordinator.writeTo(out); + StreamInput in = out.bytes().streamInput(); + MockNodeStatsRequest nodesStatsRequest = new MockNodeStatsRequest(in); + sentRequestList.add(nodesStatsRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + combinedSentRequest.put(node, sentRequestList); + }); + + return combinedSentRequest; + } + + public TestTransportNodesStatsAction getTestTransportNodesStatsAction() { + return new TestTransportNodesStatsAction( + THREAD_POOL, + clusterService, + transportService, + nodeService, + new ActionFilters(Collections.emptySet()) + ); + } + + private static class TestTransportNodesStatsAction extends TransportNodesStatsAction { + public TestTransportNodesStatsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + NodeService nodeService, + ActionFilters actionFilters + ) { + super(threadPool, clusterService, transportService, nodeService, actionFilters); + } + } + + private static class MockNodeStatsRequest extends TransportNodesStatsAction.NodeStatsRequest { + + public MockNodeStatsRequest(StreamInput in) throws IOException { + super(in); + } + + public DiscoveryNode[] getDiscoveryNodes() { + return this.request.concreteNodes(); + } + } +} From 33af363debebdf586c317800da6a2a2b7bd5c1c6 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Mon, 15 Jul 2024 19:32:51 +0530 Subject: [PATCH 02/13] Retry Build Signed-off-by: Pranshu Shukla From 2de5f3090de5f75aafbb51a1df3328bec3e784c6 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Mon, 15 Jul 2024 20:27:19 +0530 Subject: [PATCH 03/13] Retry Build Signed-off-by: Pranshu Shukla From 556da0ea41a477c8998aaafaeac42308c6ea1149 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Tue, 16 Jul 2024 10:58:15 +0530 Subject: [PATCH 04/13] Add ChangeLogs Signed-off-by: Pranshu Shukla --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f666dbf3b8d5..4f07a93e31533 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749)) - Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724)) - [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/)) - Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865)) From bd2cc8f287acba80989d694eddf8958387ffb65d Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Wed, 17 Jul 2024 10:45:12 +0530 Subject: [PATCH 05/13] Fix test naming and refactoring Signed-off-by: Pranshu Shukla --- .../support/nodes/TransportNodesAction.java | 36 ++++++++++++------- .../TransportClusterStatsActionTests.java | 6 ++-- .../nodes/TransportNodesInfoActionTests.java | 6 ++-- .../nodes/TransportNodesStatsActionTests.java | 6 ++-- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index 042374b772980..445e2f1c18223 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -210,9 +210,12 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) { } /** - * Return the concrete nodes from the request node ids which will be later used for routing requests to nodes. + * Returns the concrete nodes from the request node ids which will be later used for routing requests to nodes. + * @param request Requests extended from {@link NodesRequest} which contains target nodeIDs. + * @param clusterState Cluster State fetched from {@link ClusterService} + * @return DiscoveryNode[] which is the collection of {@link DiscoveryNode} representation of NodeIDs from the request **/ - protected DiscoveryNode[] resolveConcreteNodes(NodesRequest request, ClusterState clusterState) { + protected DiscoveryNode[] getConcreteNodes(NodesRequest request, ClusterState clusterState) { assert request.concreteNodes() == null : "request concreteNodes shouldn't be set"; String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds()); return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new); @@ -243,23 +246,32 @@ class AsyncAction { this.task = task; this.request = request; this.listener = listener; - if (request.concreteNodes() == null) { + + // Check if concrete nodes are already available + if (request.concreteNodes() != null) { + this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); + if (request.populateDiscoveryNodesInTransportRequest()) { - resolveRequest(request, clusterService.state()); - assert request.concreteNodes() != null; this.concreteNodes = null; } else { - this.concreteNodes = resolveConcreteNodes(request, clusterService.state()); + this.concreteNodes = request.concreteNodes(); + request.setConcreteNodes(null); assert request.concreteNodes() == null; } - } else { - this.concreteNodes = null; + return; } - if (request.concreteNodes() == null) { - assert concreteNodes != null; - this.responses = new AtomicReferenceArray<>(concreteNodes.length); - } else { + + // Check if we want to populate the DiscoveryNodes in the transport Request and accordingly backfill the + // concrete nodes. + if (request.populateDiscoveryNodesInTransportRequest()) { + resolveRequest(request, clusterService.state()); + assert request.concreteNodes() != null; this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); + this.concreteNodes = null; + } else { + this.concreteNodes = getConcreteNodes(request, clusterService.state()); + assert request.concreteNodes() == null; + this.responses = new AtomicReferenceArray<>(concreteNodes.length); } } diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java index 4c6c4154293f0..d3bbb13f14243 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java @@ -36,7 +36,7 @@ public class TransportClusterStatsActionTests extends TransportNodesActionTests * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This * behavior is asserted in this test. */ - public void testDefaultBehavior() { + public void testClusterStatsActionWithDiscoveryNodesListPopulated() { ClusterStatsRequest request = new ClusterStatsRequest(); request.populateDiscoveryNodesInTransportRequest(true); Map> combinedSentRequest = performNodesInfoAction(request); @@ -55,7 +55,7 @@ public void testDefaultBehavior() { * In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is * asserted in this test. */ - public void testOptimizedBehavior() { + public void testClusterStatsActionWithDiscoveryNodesListNotPopulated() { ClusterStatsRequest request = new ClusterStatsRequest(); request.populateDiscoveryNodesInTransportRequest(false); Map> combinedSentRequest = performNodesInfoAction(request); @@ -97,7 +97,7 @@ private Map> performNodesInfoAction(Cl return combinedSentRequest; } - public TestTransportClusterStatsAction getTestTransportClusterStatsAction() { + private TestTransportClusterStatsAction getTestTransportClusterStatsAction() { return new TestTransportClusterStatsAction( THREAD_POOL, clusterService, diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java index 3dffaeeb95dff..5b0e3541742c5 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java @@ -35,7 +35,7 @@ public class TransportNodesInfoActionTests extends TransportNodesActionTests { * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This * behavior is asserted in this test. */ - public void testDefaultBehavior() { + public void testNodesInfoActionWithDiscoveryNodesListPopulated() { NodesInfoRequest request = new NodesInfoRequest(); request.populateDiscoveryNodesInTransportRequest(true); Map> combinedSentRequest = performNodesInfoAction(request); @@ -54,7 +54,7 @@ public void testDefaultBehavior() { * In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is * asserted in this test. */ - public void testOptimizedBehavior() { + public void testNodesInfoActionWithDiscoveryNodesListNotPopulated() { NodesInfoRequest request = new NodesInfoRequest(); request.populateDiscoveryNodesInTransportRequest(false); Map> combinedSentRequest = performNodesInfoAction(request); @@ -96,7 +96,7 @@ private Map> performNodesInfoAction(NodesInfo return combinedSentRequest; } - public TestTransportNodesInfoAction getTestTransportNodesInfoAction() { + private TestTransportNodesInfoAction getTestTransportNodesInfoAction() { return new TestTransportNodesInfoAction( THREAD_POOL, clusterService, diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java index a409e7eddc61f..b695641f979a1 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java @@ -34,7 +34,7 @@ public class TransportNodesStatsActionTests extends TransportNodesActionTests { * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This * behavior is asserted in this test. */ - public void testDefaultBehavior() { + public void testNodesStatsActionWithDiscoveryNodesListPopulated() { NodesStatsRequest request = new NodesStatsRequest(); request.populateDiscoveryNodesInTransportRequest(true); Map> combinedSentRequest = performNodesStatsAction(request); @@ -53,7 +53,7 @@ public void testDefaultBehavior() { * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This * behavior is asserted in this test. */ - public void testOptimizedBehavior() { + public void testNodesStatsActionWithDiscoveryNodesListNotPopulated() { NodesStatsRequest request = new NodesStatsRequest(); request.populateDiscoveryNodesInTransportRequest(false); Map> combinedSentRequest = performNodesStatsAction(request); @@ -95,7 +95,7 @@ private Map> performNodesStatsAction(NodesSta return combinedSentRequest; } - public TestTransportNodesStatsAction getTestTransportNodesStatsAction() { + private TestTransportNodesStatsAction getTestTransportNodesStatsAction() { return new TestTransportNodesStatsAction( THREAD_POOL, clusterService, From b53c772642c107a8f4aba9dd1962fac74c558ace Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Wed, 17 Jul 2024 14:33:37 +0530 Subject: [PATCH 06/13] Adding code coverage Signed-off-by: Pranshu Shukla --- .../action/RestStatsActionTests.java | 48 +++++++++++++++++++ .../TransportClusterStatsActionTests.java | 31 ++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 server/src/test/java/org/opensearch/action/RestStatsActionTests.java diff --git a/server/src/test/java/org/opensearch/action/RestStatsActionTests.java b/server/src/test/java/org/opensearch/action/RestStatsActionTests.java new file mode 100644 index 0000000000000..df38d601f8c92 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/RestStatsActionTests.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.rest.action.admin.cluster.RestClusterStatsAction; +import org.opensearch.rest.action.admin.cluster.RestNodesInfoAction; +import org.opensearch.rest.action.admin.cluster.RestNodesStatsAction; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.threadpool.TestThreadPool; +import org.junit.After; + +import java.io.IOException; +import java.util.Collections; + +public class RestStatsActionTests extends OpenSearchTestCase { + private final TestThreadPool threadPool = new TestThreadPool(RestStatsActionTests.class.getName()); + private final NodeClient client = new NodeClient(Settings.EMPTY, threadPool); + + @After + public void terminateThreadPool() { + terminate(threadPool); + } + + public void testClusterStatsActionPrepareRequestNoError() throws IOException { + RestClusterStatsAction action = new RestClusterStatsAction(); + action.prepareRequest(new FakeRestRequest(), client); + } + + public void testNodesStatsActionPrepareRequestNoError() throws IOException { + RestNodesStatsAction action = new RestNodesStatsAction(); + action.prepareRequest(new FakeRestRequest(), client); + } + + public void testNodesInfoActionPrepareRequestNoError() throws IOException { + RestNodesInfoAction action = new RestNodesInfoAction(new SettingsFilter(Collections.singleton("foo.filtered"))); + action.prepareRequest(new FakeRestRequest(), client); + } +} diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java index d3bbb13f14243..82a2d5e6b990d 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -51,6 +52,22 @@ public void testClusterStatsActionWithDiscoveryNodesListPopulated() { }); } + public void testClusterStatsActionWithDiscoveryNodesListInRestRequestPopulated() { + ClusterStatsRequest request = new ClusterStatsRequest(); + Collection discoveryNodes = clusterService.state().getNodes().getNodes().values(); + request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new)); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNotNull(sentRequest.getDiscoveryNodes()); + assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize()); + }); + }); + } + /** * In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is * asserted in this test. @@ -67,6 +84,20 @@ public void testClusterStatsActionWithDiscoveryNodesListNotPopulated() { }); } + public void testClusterStatsWithDiscoveryNodesListInRestRequestNotPopulated() { + ClusterStatsRequest request = new ClusterStatsRequest(); + Collection discoveryNodes = clusterService.state().getNodes().getNodes().values(); + request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new)); + request.populateDiscoveryNodesInTransportRequest(false); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); }); + }); + } + private Map> performNodesInfoAction(ClusterStatsRequest request) { TransportNodesAction action = getTestTransportClusterStatsAction(); PlainActionFuture listener = new PlainActionFuture<>(); From 5fffc47a8dbaed05e1b5c814e132ebb8927bac8d Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Thu, 18 Jul 2024 14:48:44 +0530 Subject: [PATCH 07/13] fix naming and simplifying code Signed-off-by: Pranshu Shukla --- .../support/nodes/BaseNodesRequest.java | 10 +++--- .../support/nodes/TransportNodesAction.java | 34 ++++++------------- .../admin/cluster/RestClusterStatsAction.java | 2 +- .../admin/cluster/RestNodesInfoAction.java | 2 +- .../admin/cluster/RestNodesStatsAction.java | 2 +- .../rest/action/cat/RestNodesAction.java | 4 +-- .../action/RestStatsActionTests.java | 25 ++++++++++---- .../TransportClusterStatsActionTests.java | 14 ++++---- .../nodes/TransportNodesInfoActionTests.java | 8 ++--- .../nodes/TransportNodesStatsActionTests.java | 8 ++--- 10 files changed, 54 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index ab660a81f83db..37659bc7b3435 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -72,7 +72,7 @@ public abstract class BaseNodesRequest * * Setting default behavior as `true` but can be explicitly changed in requests that do not require. */ - private boolean populateDiscoveryNodesInTransportRequest = true; + private boolean retainDiscoveryNodes = true; private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30); private TimeValue timeout; @@ -127,12 +127,12 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) { this.concreteNodes = concreteNodes; } - public void populateDiscoveryNodesInTransportRequest(boolean value) { - populateDiscoveryNodesInTransportRequest = value; + public void retainDiscoveryNodes(boolean value) { + retainDiscoveryNodes = value; } - public boolean populateDiscoveryNodesInTransportRequest() { - return populateDiscoveryNodesInTransportRequest; + public boolean retainDiscoveryNodes() { + return retainDiscoveryNodes; } @Override diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index 445e2f1c18223..bafc847169c25 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -246,32 +246,20 @@ class AsyncAction { this.task = task; this.request = request; this.listener = listener; - - // Check if concrete nodes are already available - if (request.concreteNodes() != null) { - this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); - - if (request.populateDiscoveryNodesInTransportRequest()) { - this.concreteNodes = null; - } else { - this.concreteNodes = request.concreteNodes(); - request.setConcreteNodes(null); - assert request.concreteNodes() == null; - } - return; - } - - // Check if we want to populate the DiscoveryNodes in the transport Request and accordingly backfill the - // concrete nodes. - if (request.populateDiscoveryNodesInTransportRequest()) { + if (request.concreteNodes() == null) { resolveRequest(request, clusterService.state()); assert request.concreteNodes() != null; - this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); - this.concreteNodes = null; + } + this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); + + if (request.retainDiscoveryNodes() == false) { + // We transfer the ownership of discovery nodes to route the request to into the AsyncAction class. + // This reduces the payload of the request and improves the number of concrete nodes in the memory + this.concreteNodes = request.concreteNodes(); + request.setConcreteNodes(null); } else { - this.concreteNodes = getConcreteNodes(request, clusterService.state()); - assert request.concreteNodes() == null; - this.responses = new AtomicReferenceArray<>(concreteNodes.length); + // initializing it separately as we keep the `concreteNodes` as final since we want it to be immutable. + this.concreteNodes = null; } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java index 0ea651ba8b7d6..d136c733876e5 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -66,7 +66,7 @@ public String getName() { public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null)); clusterStatsRequest.timeout(request.param("timeout")); - clusterStatsRequest.populateDiscoveryNodesInTransportRequest(false); + clusterStatsRequest.retainDiscoveryNodes(false); return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java index e5e94f1cefc3b..f285add0e8189 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java @@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final NodesInfoRequest nodesInfoRequest = prepareRequest(request); nodesInfoRequest.timeout(request.param("timeout")); settingsFilter.addFilterSettingParams(request); - nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false); + nodesInfoRequest.retainDiscoveryNodes(false); return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java index a277b08e7a34c..2cf10af94d694 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java @@ -232,7 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // If no levels are passed in this results in an empty array. String[] levels = Strings.splitStringByCommaToArray(request.param("level")); nodesStatsRequest.indices().setLevels(levels); - nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false); + nodesStatsRequest.retainDiscoveryNodes(false); return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index f95e464b6d649..1cc8bb111c65c 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -125,7 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.timeout(request.param("timeout")); - nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false); + nodesInfoRequest.retainDiscoveryNodes(false); nodesInfoRequest.clear() .addMetrics( NodesInfoRequest.Metric.JVM.metricName(), @@ -138,7 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) { public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); nodesStatsRequest.timeout(request.param("timeout")); - nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false); + nodesStatsRequest.retainDiscoveryNodes(false); nodesStatsRequest.clear() .indices(true) .addMetrics( diff --git a/server/src/test/java/org/opensearch/action/RestStatsActionTests.java b/server/src/test/java/org/opensearch/action/RestStatsActionTests.java index df38d601f8c92..9b8a0640ee343 100644 --- a/server/src/test/java/org/opensearch/action/RestStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/RestStatsActionTests.java @@ -19,7 +19,6 @@ import org.opensearch.threadpool.TestThreadPool; import org.junit.After; -import java.io.IOException; import java.util.Collections; public class RestStatsActionTests extends OpenSearchTestCase { @@ -31,18 +30,30 @@ public void terminateThreadPool() { terminate(threadPool); } - public void testClusterStatsActionPrepareRequestNoError() throws IOException { + public void testClusterStatsActionPrepareRequestNoError() { RestClusterStatsAction action = new RestClusterStatsAction(); - action.prepareRequest(new FakeRestRequest(), client); + try { + action.prepareRequest(new FakeRestRequest(), client); + } catch (Throwable t) { + fail(t.getMessage()); + } } - public void testNodesStatsActionPrepareRequestNoError() throws IOException { + public void testNodesStatsActionPrepareRequestNoError() { RestNodesStatsAction action = new RestNodesStatsAction(); - action.prepareRequest(new FakeRestRequest(), client); + try { + action.prepareRequest(new FakeRestRequest(), client); + } catch (Throwable t) { + fail(t.getMessage()); + } } - public void testNodesInfoActionPrepareRequestNoError() throws IOException { + public void testNodesInfoActionPrepareRequestNoError() { RestNodesInfoAction action = new RestNodesInfoAction(new SettingsFilter(Collections.singleton("foo.filtered"))); - action.prepareRequest(new FakeRestRequest(), client); + try { + action.prepareRequest(new FakeRestRequest(), client); + } catch (Throwable t) { + fail(t.getMessage()); + } } } diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java index 82a2d5e6b990d..1391e04664566 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java @@ -37,9 +37,9 @@ public class TransportClusterStatsActionTests extends TransportNodesActionTests * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This * behavior is asserted in this test. */ - public void testClusterStatsActionWithDiscoveryNodesListPopulated() { + public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() { ClusterStatsRequest request = new ClusterStatsRequest(); - request.populateDiscoveryNodesInTransportRequest(true); + request.retainDiscoveryNodes(true); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -52,7 +52,7 @@ public void testClusterStatsActionWithDiscoveryNodesListPopulated() { }); } - public void testClusterStatsActionWithDiscoveryNodesListInRestRequestPopulated() { + public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() { ClusterStatsRequest request = new ClusterStatsRequest(); Collection discoveryNodes = clusterService.state().getNodes().getNodes().values(); request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new)); @@ -72,9 +72,9 @@ public void testClusterStatsActionWithDiscoveryNodesListInRestRequestPopulated() * In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is * asserted in this test. */ - public void testClusterStatsActionWithDiscoveryNodesListNotPopulated() { + public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() { ClusterStatsRequest request = new ClusterStatsRequest(); - request.populateDiscoveryNodesInTransportRequest(false); + request.retainDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -84,11 +84,11 @@ public void testClusterStatsActionWithDiscoveryNodesListNotPopulated() { }); } - public void testClusterStatsWithDiscoveryNodesListInRestRequestNotPopulated() { + public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionOfDiscoveryNodesList() { ClusterStatsRequest request = new ClusterStatsRequest(); Collection discoveryNodes = clusterService.state().getNodes().getNodes().values(); request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new)); - request.populateDiscoveryNodesInTransportRequest(false); + request.retainDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java index 5b0e3541742c5..eaf4013f12b6a 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java @@ -35,9 +35,9 @@ public class TransportNodesInfoActionTests extends TransportNodesActionTests { * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This * behavior is asserted in this test. */ - public void testNodesInfoActionWithDiscoveryNodesListPopulated() { + public void testNodesInfoActionWithRetentionOfDiscoveryNodesList() { NodesInfoRequest request = new NodesInfoRequest(); - request.populateDiscoveryNodesInTransportRequest(true); + request.retainDiscoveryNodes(true); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -54,9 +54,9 @@ public void testNodesInfoActionWithDiscoveryNodesListPopulated() { * In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is * asserted in this test. */ - public void testNodesInfoActionWithDiscoveryNodesListNotPopulated() { + public void testNodesInfoActionWithoutRetentionOfDiscoveryNodesList() { NodesInfoRequest request = new NodesInfoRequest(); - request.populateDiscoveryNodesInTransportRequest(false); + request.retainDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java index b695641f979a1..3d192e2a35633 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java @@ -34,9 +34,9 @@ public class TransportNodesStatsActionTests extends TransportNodesActionTests { * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This * behavior is asserted in this test. */ - public void testNodesStatsActionWithDiscoveryNodesListPopulated() { + public void testNodesStatsActionWithRetentionOfDiscoveryNodesList() { NodesStatsRequest request = new NodesStatsRequest(); - request.populateDiscoveryNodesInTransportRequest(true); + request.retainDiscoveryNodes(true); Map> combinedSentRequest = performNodesStatsAction(request); assertNotNull(combinedSentRequest); @@ -53,9 +53,9 @@ public void testNodesStatsActionWithDiscoveryNodesListPopulated() { * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This * behavior is asserted in this test. */ - public void testNodesStatsActionWithDiscoveryNodesListNotPopulated() { + public void testNodesStatsActionWithoutRetentionOfDiscoveryNodesList() { NodesStatsRequest request = new NodesStatsRequest(); - request.populateDiscoveryNodesInTransportRequest(false); + request.retainDiscoveryNodes(false); Map> combinedSentRequest = performNodesStatsAction(request); assertNotNull(combinedSentRequest); From a43d7258a5a888d3dfcb162dde773a3fc4c50981 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Thu, 18 Jul 2024 15:51:14 +0530 Subject: [PATCH 08/13] Removing unncessary methods after refactoring Signed-off-by: Pranshu Shukla --- .../action/support/nodes/TransportNodesAction.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index bafc847169c25..f4fe96a59554e 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -209,18 +209,6 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) { request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new)); } - /** - * Returns the concrete nodes from the request node ids which will be later used for routing requests to nodes. - * @param request Requests extended from {@link NodesRequest} which contains target nodeIDs. - * @param clusterState Cluster State fetched from {@link ClusterService} - * @return DiscoveryNode[] which is the collection of {@link DiscoveryNode} representation of NodeIDs from the request - **/ - protected DiscoveryNode[] getConcreteNodes(NodesRequest request, ClusterState clusterState) { - assert request.concreteNodes() == null : "request concreteNodes shouldn't be set"; - String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds()); - return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new); - } - /** * Get a backwards compatible transport action name */ From bfce85eb5b2e772348baa670783fe87af525d602 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Sun, 21 Jul 2024 22:25:57 +0530 Subject: [PATCH 09/13] refactor naming Signed-off-by: Pranshu Shukla --- .../action/support/nodes/BaseNodesRequest.java | 10 +++++----- .../action/support/nodes/TransportNodesAction.java | 2 +- .../action/admin/cluster/RestClusterStatsAction.java | 2 +- .../rest/action/admin/cluster/RestNodesInfoAction.java | 2 +- .../action/admin/cluster/RestNodesStatsAction.java | 2 +- .../opensearch/rest/action/cat/RestNodesAction.java | 4 ++-- .../nodes/TransportClusterStatsActionTests.java | 6 +++--- .../support/nodes/TransportNodesInfoActionTests.java | 4 ++-- .../support/nodes/TransportNodesStatsActionTests.java | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index 37659bc7b3435..0598268fd57d0 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -72,7 +72,7 @@ public abstract class BaseNodesRequest * * Setting default behavior as `true` but can be explicitly changed in requests that do not require. */ - private boolean retainDiscoveryNodes = true; + private boolean sendDiscoveryNodes = true; private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30); private TimeValue timeout; @@ -127,12 +127,12 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) { this.concreteNodes = concreteNodes; } - public void retainDiscoveryNodes(boolean value) { - retainDiscoveryNodes = value; + public void sendDiscoveryNodes(boolean value) { + sendDiscoveryNodes = value; } - public boolean retainDiscoveryNodes() { - return retainDiscoveryNodes; + public boolean sendDiscoveryNodes() { + return sendDiscoveryNodes; } @Override diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index f4fe96a59554e..9188c67728f6e 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -240,7 +240,7 @@ class AsyncAction { } this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); - if (request.retainDiscoveryNodes() == false) { + if (request.sendDiscoveryNodes() == false) { // We transfer the ownership of discovery nodes to route the request to into the AsyncAction class. // This reduces the payload of the request and improves the number of concrete nodes in the memory this.concreteNodes = request.concreteNodes(); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java index d136c733876e5..56b90aac47b09 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -66,7 +66,7 @@ public String getName() { public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null)); clusterStatsRequest.timeout(request.param("timeout")); - clusterStatsRequest.retainDiscoveryNodes(false); + clusterStatsRequest.sendDiscoveryNodes(false); return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java index f285add0e8189..27be7c3f1eb21 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java @@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final NodesInfoRequest nodesInfoRequest = prepareRequest(request); nodesInfoRequest.timeout(request.param("timeout")); settingsFilter.addFilterSettingParams(request); - nodesInfoRequest.retainDiscoveryNodes(false); + nodesInfoRequest.sendDiscoveryNodes(false); return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java index 2cf10af94d694..177b3faa51317 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java @@ -232,7 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // If no levels are passed in this results in an empty array. String[] levels = Strings.splitStringByCommaToArray(request.param("level")); nodesStatsRequest.indices().setLevels(levels); - nodesStatsRequest.retainDiscoveryNodes(false); + nodesStatsRequest.sendDiscoveryNodes(false); return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index 1cc8bb111c65c..f06cdd1b68dca 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -125,7 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.timeout(request.param("timeout")); - nodesInfoRequest.retainDiscoveryNodes(false); + nodesInfoRequest.sendDiscoveryNodes(false); nodesInfoRequest.clear() .addMetrics( NodesInfoRequest.Metric.JVM.metricName(), @@ -138,7 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) { public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); nodesStatsRequest.timeout(request.param("timeout")); - nodesStatsRequest.retainDiscoveryNodes(false); + nodesStatsRequest.sendDiscoveryNodes(false); nodesStatsRequest.clear() .indices(true) .addMetrics( diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java index 1391e04664566..6303d4944463e 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java @@ -39,7 +39,7 @@ public class TransportClusterStatsActionTests extends TransportNodesActionTests */ public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() { ClusterStatsRequest request = new ClusterStatsRequest(); - request.retainDiscoveryNodes(true); + request.sendDiscoveryNodes(true); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -74,7 +74,7 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDi */ public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() { ClusterStatsRequest request = new ClusterStatsRequest(); - request.retainDiscoveryNodes(false); + request.sendDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -88,7 +88,7 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionO ClusterStatsRequest request = new ClusterStatsRequest(); Collection discoveryNodes = clusterService.state().getNodes().getNodes().values(); request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new)); - request.retainDiscoveryNodes(false); + request.sendDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java index eaf4013f12b6a..245ae2bdc061b 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java @@ -37,7 +37,7 @@ public class TransportNodesInfoActionTests extends TransportNodesActionTests { */ public void testNodesInfoActionWithRetentionOfDiscoveryNodesList() { NodesInfoRequest request = new NodesInfoRequest(); - request.retainDiscoveryNodes(true); + request.sendDiscoveryNodes(true); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -56,7 +56,7 @@ public void testNodesInfoActionWithRetentionOfDiscoveryNodesList() { */ public void testNodesInfoActionWithoutRetentionOfDiscoveryNodesList() { NodesInfoRequest request = new NodesInfoRequest(); - request.retainDiscoveryNodes(false); + request.sendDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java index 3d192e2a35633..9af5b1ef33ac2 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java @@ -36,7 +36,7 @@ public class TransportNodesStatsActionTests extends TransportNodesActionTests { */ public void testNodesStatsActionWithRetentionOfDiscoveryNodesList() { NodesStatsRequest request = new NodesStatsRequest(); - request.retainDiscoveryNodes(true); + request.sendDiscoveryNodes(true); Map> combinedSentRequest = performNodesStatsAction(request); assertNotNull(combinedSentRequest); @@ -55,7 +55,7 @@ public void testNodesStatsActionWithRetentionOfDiscoveryNodesList() { */ public void testNodesStatsActionWithoutRetentionOfDiscoveryNodesList() { NodesStatsRequest request = new NodesStatsRequest(); - request.retainDiscoveryNodes(false); + request.sendDiscoveryNodes(false); Map> combinedSentRequest = performNodesStatsAction(request); assertNotNull(combinedSentRequest); From 9da507b5fabdf7c1589b22afd49b882568be1e4f Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Sun, 21 Jul 2024 22:28:42 +0530 Subject: [PATCH 10/13] Update ChangeLog Signed-off-by: Pranshu Shukla --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34d1491233ad5..ab0c80e37e14c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added -- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749)) - Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724)) - [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/)) - Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865)) @@ -22,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659))) - Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668)) - Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790))) +- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) From 3f7bca13c2e618abe17a49e4960b91e376e3d12a Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Mon, 22 Jul 2024 09:47:09 +0530 Subject: [PATCH 11/13] minor refactoring of AsyncAction Signed-off-by: Pranshu Shukla --- .../action/support/nodes/TransportNodesAction.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index 9188c67728f6e..8471b01c0bfce 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -239,20 +239,18 @@ class AsyncAction { assert request.concreteNodes() != null; } this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); + this.concreteNodes = request.concreteNodes(); if (request.sendDiscoveryNodes() == false) { - // We transfer the ownership of discovery nodes to route the request to into the AsyncAction class. - // This reduces the payload of the request and improves the number of concrete nodes in the memory - this.concreteNodes = request.concreteNodes(); + // As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we + // remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves + // the number of concrete nodes in the memory. request.setConcreteNodes(null); - } else { - // initializing it separately as we keep the `concreteNodes` as final since we want it to be immutable. - this.concreteNodes = null; } } void start() { - final DiscoveryNode[] nodes = request.concreteNodes() != null ? request.concreteNodes() : concreteNodes; + final DiscoveryNode[] nodes = this.concreteNodes; if (nodes.length == 0) { // nothing to notify threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses))); From a69f7e78ed9097319f3061c12b347248721d190d Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Mon, 22 Jul 2024 12:01:48 +0530 Subject: [PATCH 12/13] change variable naming Signed-off-by: Pranshu Shukla --- .../action/support/nodes/BaseNodesRequest.java | 10 +++++----- .../action/support/nodes/TransportNodesAction.java | 2 +- .../action/admin/cluster/RestClusterStatsAction.java | 2 +- .../rest/action/admin/cluster/RestNodesInfoAction.java | 2 +- .../action/admin/cluster/RestNodesStatsAction.java | 2 +- .../opensearch/rest/action/cat/RestNodesAction.java | 4 ++-- .../nodes/TransportClusterStatsActionTests.java | 6 +++--- .../support/nodes/TransportNodesInfoActionTests.java | 4 ++-- .../support/nodes/TransportNodesStatsActionTests.java | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index 0598268fd57d0..a4f6d8afeaf38 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -72,7 +72,7 @@ public abstract class BaseNodesRequest * * Setting default behavior as `true` but can be explicitly changed in requests that do not require. */ - private boolean sendDiscoveryNodes = true; + private boolean includeDiscoveryNodes = true; private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30); private TimeValue timeout; @@ -127,12 +127,12 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) { this.concreteNodes = concreteNodes; } - public void sendDiscoveryNodes(boolean value) { - sendDiscoveryNodes = value; + public void setIncludeDiscoveryNodes(boolean value) { + includeDiscoveryNodes = value; } - public boolean sendDiscoveryNodes() { - return sendDiscoveryNodes; + public boolean getIncludeDiscoveryNodes() { + return includeDiscoveryNodes; } @Override diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index 8471b01c0bfce..3acd12f632e0f 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -241,7 +241,7 @@ class AsyncAction { this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); this.concreteNodes = request.concreteNodes(); - if (request.sendDiscoveryNodes() == false) { + if (request.getIncludeDiscoveryNodes() == false) { // As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we // remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves // the number of concrete nodes in the memory. diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java index 56b90aac47b09..913db3c81e951 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -66,7 +66,7 @@ public String getName() { public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null)); clusterStatsRequest.timeout(request.param("timeout")); - clusterStatsRequest.sendDiscoveryNodes(false); + clusterStatsRequest.setIncludeDiscoveryNodes(false); return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java index 27be7c3f1eb21..4ac51933ea382 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java @@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final NodesInfoRequest nodesInfoRequest = prepareRequest(request); nodesInfoRequest.timeout(request.param("timeout")); settingsFilter.addFilterSettingParams(request); - nodesInfoRequest.sendDiscoveryNodes(false); + nodesInfoRequest.setIncludeDiscoveryNodes(false); return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java index 177b3faa51317..ed9c0b171aa56 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java @@ -232,7 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // If no levels are passed in this results in an empty array. String[] levels = Strings.splitStringByCommaToArray(request.param("level")); nodesStatsRequest.indices().setLevels(levels); - nodesStatsRequest.sendDiscoveryNodes(false); + nodesStatsRequest.setIncludeDiscoveryNodes(false); return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index f06cdd1b68dca..0330fe627ccd0 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -125,7 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.timeout(request.param("timeout")); - nodesInfoRequest.sendDiscoveryNodes(false); + nodesInfoRequest.setIncludeDiscoveryNodes(false); nodesInfoRequest.clear() .addMetrics( NodesInfoRequest.Metric.JVM.metricName(), @@ -138,7 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) { public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); nodesStatsRequest.timeout(request.param("timeout")); - nodesStatsRequest.sendDiscoveryNodes(false); + nodesStatsRequest.setIncludeDiscoveryNodes(false); nodesStatsRequest.clear() .indices(true) .addMetrics( diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java index 6303d4944463e..f8e14b477b8ef 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java @@ -39,7 +39,7 @@ public class TransportClusterStatsActionTests extends TransportNodesActionTests */ public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() { ClusterStatsRequest request = new ClusterStatsRequest(); - request.sendDiscoveryNodes(true); + request.setIncludeDiscoveryNodes(true); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -74,7 +74,7 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDi */ public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() { ClusterStatsRequest request = new ClusterStatsRequest(); - request.sendDiscoveryNodes(false); + request.setIncludeDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -88,7 +88,7 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionO ClusterStatsRequest request = new ClusterStatsRequest(); Collection discoveryNodes = clusterService.state().getNodes().getNodes().values(); request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new)); - request.sendDiscoveryNodes(false); + request.setIncludeDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java index 245ae2bdc061b..e9e09d0dbbbf9 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java @@ -37,7 +37,7 @@ public class TransportNodesInfoActionTests extends TransportNodesActionTests { */ public void testNodesInfoActionWithRetentionOfDiscoveryNodesList() { NodesInfoRequest request = new NodesInfoRequest(); - request.sendDiscoveryNodes(true); + request.setIncludeDiscoveryNodes(true); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -56,7 +56,7 @@ public void testNodesInfoActionWithRetentionOfDiscoveryNodesList() { */ public void testNodesInfoActionWithoutRetentionOfDiscoveryNodesList() { NodesInfoRequest request = new NodesInfoRequest(); - request.sendDiscoveryNodes(false); + request.setIncludeDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java index 9af5b1ef33ac2..c7c420e353e1a 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java @@ -36,7 +36,7 @@ public class TransportNodesStatsActionTests extends TransportNodesActionTests { */ public void testNodesStatsActionWithRetentionOfDiscoveryNodesList() { NodesStatsRequest request = new NodesStatsRequest(); - request.sendDiscoveryNodes(true); + request.setIncludeDiscoveryNodes(true); Map> combinedSentRequest = performNodesStatsAction(request); assertNotNull(combinedSentRequest); @@ -55,7 +55,7 @@ public void testNodesStatsActionWithRetentionOfDiscoveryNodesList() { */ public void testNodesStatsActionWithoutRetentionOfDiscoveryNodesList() { NodesStatsRequest request = new NodesStatsRequest(); - request.sendDiscoveryNodes(false); + request.setIncludeDiscoveryNodes(false); Map> combinedSentRequest = performNodesStatsAction(request); assertNotNull(combinedSentRequest); From 3a8e7ef3bca08235508b74dcc45480e90cf0b275 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Mon, 22 Jul 2024 13:00:54 +0530 Subject: [PATCH 13/13] Retry Build Signed-off-by: Pranshu Shukla