Skip to content

Commit

Permalink
handle comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Bansal <[email protected]>
  • Loading branch information
sumitasr committed Sep 25, 2024
1 parent 66c962b commit e0972d7
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.rest.RequestLimitSettings.BlockAction.CAT_SHARDS;
import java.util.Objects;

import static org.opensearch.rest.RequestLimitSettings.BlockEntity.SHARDS;

/**
* Perform cat shards action
Expand Down Expand Up @@ -85,10 +87,7 @@ protected void innerOnFailure(Exception e) {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
if (shardsRequest.isRequestLimitCheckSupported()
&& requestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState(), CAT_SHARDS)) {
listener.onFailure(new CircuitBreakingException("Too many shards requested.", CircuitBreaker.Durability.TRANSIENT));
}
validateRequestLimit(shardsRequest, clusterStateResponse, listener);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
Expand Down Expand Up @@ -123,4 +122,24 @@ public void onFailure(Exception e) {
}

}

private void validateRequestLimit(
final CatShardsRequest shardsRequest,
final ClusterStateResponse clusterStateResponse,
final ActionListener<CatShardsResponse> listener
) {
if (shardsRequest.isRequestLimitCheckSupported()
&& Objects.nonNull(clusterStateResponse)
&& Objects.nonNull(clusterStateResponse.getState())) {
int limit = requestLimitSettings.getCatShardsLimit();
if (RequestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState().getRoutingTable(), SHARDS, limit)) {
listener.onFailure(
new CircuitBreakingException(
"Too many shards requested. Can not request shards beyond {" + limit + "}",
CircuitBreaker.Durability.TRANSIENT
)
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ public void apply(Settings value, Settings current, Settings previous) {
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,

// Settings to be used for limiting rest requests
RequestLimitSettings.CAT_INDICES_LIMIT_SETTING,
RequestLimitSettings.CAT_SHARDS_LIMIT_SETTING,
RequestLimitSettings.CAT_SEGMENTS_LIMIT_SETTING
Expand Down
96 changes: 63 additions & 33 deletions server/src/main/java/org/opensearch/rest/RequestLimitSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

package org.opensearch.rest;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
Expand All @@ -29,12 +29,11 @@
public class RequestLimitSettings {

/**
* Enum to represent action names against whom we need to perform limit checks.
* Enum to represent entity against which we need to perform limit checks.
*/
public enum BlockAction {
CAT_INDICES,
CAT_SHARDS,
CAT_SEGMENTS
public enum BlockEntity {
INDICES,
SHARDS
}

private volatile int catIndicesLimit;
Expand Down Expand Up @@ -72,34 +71,58 @@ public enum BlockAction {
);

public RequestLimitSettings(ClusterSettings clusterSettings, Settings settings) {
setCatShardsLimitSetting(CAT_SHARDS_LIMIT_SETTING.get(settings));
setCatIndicesLimitSetting(CAT_INDICES_LIMIT_SETTING.get(settings));
setCatSegmentsLimitSetting(CAT_SEGMENTS_LIMIT_SETTING.get(settings));
setCatShardsLimit(CAT_SHARDS_LIMIT_SETTING.get(settings));
setCatIndicesLimit(CAT_INDICES_LIMIT_SETTING.get(settings));
setCatSegmentsLimit(CAT_SEGMENTS_LIMIT_SETTING.get(settings));

clusterSettings.addSettingsUpdateConsumer(CAT_SHARDS_LIMIT_SETTING, this::setCatShardsLimitSetting);
clusterSettings.addSettingsUpdateConsumer(CAT_INDICES_LIMIT_SETTING, this::setCatIndicesLimitSetting);
clusterSettings.addSettingsUpdateConsumer(CAT_SEGMENTS_LIMIT_SETTING, this::setCatSegmentsLimitSetting);
clusterSettings.addSettingsUpdateConsumer(CAT_SHARDS_LIMIT_SETTING, this::setCatShardsLimit);
clusterSettings.addSettingsUpdateConsumer(CAT_INDICES_LIMIT_SETTING, this::setCatIndicesLimit);
clusterSettings.addSettingsUpdateConsumer(CAT_SEGMENTS_LIMIT_SETTING, this::setCatSegmentsLimit);
}

/**
* Method to check if the circuit breaker limit has reached for an action.
* The limits are controlled via dynamic settings.
*
* @param clusterState {@link ClusterState}
* @param actionToCheck {@link BlockAction}
* @param metadata {@link Metadata}
* @param blockEntity {@link BlockEntity}
* @param limit Integer limit on block entity
* @return True/False
*/
public boolean isCircuitLimitBreached(final ClusterState clusterState, final BlockAction actionToCheck) {
if (Objects.isNull(clusterState)) return false;
switch (actionToCheck) {
case CAT_INDICES:
if (catIndicesLimit <= 0) return false;
int indicesCount = chainWalk(() -> clusterState.getMetadata().getIndices().size(), 0);
if (indicesCount > catIndicesLimit) return true;
public static boolean isCircuitLimitBreached(final Metadata metadata, final BlockEntity blockEntity, final int limit) {
if (Objects.isNull(metadata)) return false;
if (limit <= 0) return false;
switch (blockEntity) {
case INDICES:
int indicesCount = chainWalk(() -> metadata.getIndices().size(), 0);
if (indicesCount > limit) return true;
break;
case CAT_SHARDS:
if (catShardsLimit <= 0) return false;
final RoutingTable routingTable = clusterState.getRoutingTable();
case SHARDS:
int shardsCount = metadata.getTotalNumberOfShards();
if (shardsCount > limit) return true;
break;
}
return false;
}

/**
* Method to check if the circuit breaker limit has reached for an action.
* The limits are controlled via dynamic settings.
*
* @param routingTable {@link RoutingTable}
* @param blockEntity {@link BlockEntity}
* @param limit Integer limit on block entity
* @return True/False
*/
public static boolean isCircuitLimitBreached(final RoutingTable routingTable, final BlockEntity blockEntity, final int limit) {
if (Objects.isNull(routingTable)) return false;
if (limit <= 0) return false;
switch (blockEntity) {
case INDICES:
int indicesCount = chainWalk(() -> routingTable.getIndicesRouting().size(), 0);
if (indicesCount > limit) return true;
break;
case SHARDS:
final Map<String, IndexRoutingTable> indexRoutingTableMap = routingTable.getIndicesRouting();
int totalShards = 0;
for (final Map.Entry<String, IndexRoutingTable> entry : indexRoutingTableMap.entrySet()) {
Expand All @@ -108,31 +131,38 @@ public boolean isCircuitLimitBreached(final ClusterState clusterState, final Blo
.entrySet()) {
totalShards += indexShardRoutingTableEntry.getValue().getShards().size();
// Fail fast if catShardsLimit value is breached and avoid unnecessary computation.
if (totalShards > catShardsLimit) return true;
if (totalShards > limit) return true;
}
}
break;
case CAT_SEGMENTS:
if (catSegmentsLimit <= 0) return false;
int indicesCountForCatSegment = chainWalk(() -> clusterState.getRoutingTable().getIndicesRouting().size(), 0);
if (indicesCountForCatSegment > catSegmentsLimit) return true;
break;
}
return false;
}

private void setCatShardsLimitSetting(final int catShardsLimit) {
private void setCatShardsLimit(final int catShardsLimit) {
this.catShardsLimit = catShardsLimit;
}

private void setCatIndicesLimitSetting(final int catIndicesLimit) {
private void setCatIndicesLimit(final int catIndicesLimit) {
this.catIndicesLimit = catIndicesLimit;
}

private void setCatSegmentsLimitSetting(final int catSegmentsLimit) {
private void setCatSegmentsLimit(final int catSegmentsLimit) {
this.catSegmentsLimit = catSegmentsLimit;
}

public int getCatShardsLimit() {
return this.catShardsLimit;
}

public int getCatIndicesLimit() {
return this.catIndicesLimit;
}

public int getCatSegmentsLimit() {
return this.catSegmentsLimit;
}

// TODO: Evaluate if we can move this to common util.
private static <T> T chainWalk(Supplier<T> supplier, T defaultValue) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.function.Function;
Expand All @@ -83,7 +84,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
import static org.opensearch.rest.RequestLimitSettings.BlockAction.CAT_INDICES;
import static org.opensearch.rest.RequestLimitSettings.BlockEntity.INDICES;
import static org.opensearch.rest.RestRequest.Method.GET;

/**
Expand Down Expand Up @@ -190,12 +191,7 @@ public void onResponse(final GetSettingsResponse getSettingsResponse) {
new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
if (isRequestLimitCheckSupported()
&& requestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState(), CAT_INDICES)) {
listener.onFailure(
new CircuitBreakingException("Too many indices requested.", CircuitBreaker.Durability.TRANSIENT)
);
}
validateRequestLimit(clusterStateResponse, listener);
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(
request,
4,
Expand Down Expand Up @@ -239,6 +235,20 @@ public void onFailure(final Exception e) {
};
}

private void validateRequestLimit(final ClusterStateResponse clusterStateResponse, final ActionListener<Table> listener) {
if (isRequestLimitCheckSupported() && Objects.nonNull(clusterStateResponse) && Objects.nonNull(clusterStateResponse.getState())) {
int limit = requestLimitSettings.getCatIndicesLimit();
if (RequestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState().getMetadata(), INDICES, limit)) {
listener.onFailure(
new CircuitBreakingException(
"Too many indices requested. Can not request indices beyond {" + limit + "}",
CircuitBreaker.Durability.TRANSIENT
)
);
}
}
}

/**
* We're using the Get Settings API here to resolve the authorized indices for the user.
* This is because the Cluster State and Cluster Health APIs do not filter output based
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.opensearch.rest.RequestLimitSettings.BlockAction.CAT_SEGMENTS;
import static org.opensearch.rest.RequestLimitSettings.BlockEntity.INDICES;
import static org.opensearch.rest.RestRequest.Method.GET;

/**
Expand Down Expand Up @@ -111,10 +112,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) {
if (isRequestLimitCheckSupported()
&& requestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState(), CAT_SEGMENTS)) {
throw new CircuitBreakingException("Segments from too many indices requested.", CircuitBreaker.Durability.TRANSIENT);
}
validateRequestLimit(clusterStateResponse);
final IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest();
indicesSegmentsRequest.indices(indices);
client.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener<IndicesSegmentResponse>(channel) {
Expand All @@ -129,6 +127,18 @@ public RestResponse buildResponse(final IndicesSegmentResponse indicesSegmentRes
});
}

private void validateRequestLimit(final ClusterStateResponse clusterStateResponse) {
if (isRequestLimitCheckSupported() && Objects.nonNull(clusterStateResponse) && Objects.nonNull(clusterStateResponse.getState())) {
int limit = requestLimitSettings.getCatSegmentsLimit();
if (RequestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState().getRoutingTable(), INDICES, limit)) {
throw new CircuitBreakingException(
"Segments from too many indices requested. Can not request indices beyond {" + limit + "}",
CircuitBreaker.Durability.TRANSIENT
);
}
}
}

@Override
protected void documentation(StringBuilder sb) {
sb.append("/_cat/segments\n");
Expand Down
Loading

0 comments on commit e0972d7

Please sign in to comment.