From 37796c127f7893952799a55bd8c1c84cc0560e1f Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Wed, 19 Jul 2023 14:07:44 +0530 Subject: [PATCH] BaseGatewayShardAllocator changes for Assigning the batch of shards Signed-off-by: Gaurav Chandani --- .../gateway/BaseGatewayShardAllocator.java | 29 ++++++++++++++++++- .../gateway/PrimaryShardAllocator.java | 7 +++++ .../gateway/ReplicaShardAllocator.java | 8 +++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index 59ef894958cbe..d085be66e032d 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationDecision; @@ -46,6 +47,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; /** * An abstract class that implements basic functionality for allocating @@ -74,7 +77,26 @@ public void allocateUnassigned( ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler ) { final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger); + executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler); + } + + public void allocateUnassignedBatch(Set shards, RoutingAllocation allocation) { + // make Allocation Decisions for all shards + ConcurrentMap decisionMap = makeAllocationDecision(shards, allocation, logger); + // get all unassigned shards + RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()){ + ShardRouting shard = iterator.next(); + if (shards.contains(shard)) { + executeDecision(shard, decisionMap.get(shard), allocation, iterator); + } + } + } + private void executeDecision(ShardRouting shardRouting, + AllocateUnassignedDecision allocateUnassignedDecision, + RoutingAllocation allocation, + ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) { if (allocateUnassignedDecision.isDecisionTaken() == false) { // no decision was taken by this allocator return; @@ -91,7 +113,6 @@ public void allocateUnassigned( unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes()); } } - protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) { if (shardRouting.primary()) { if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) { @@ -120,6 +141,12 @@ public abstract AllocateUnassignedDecision makeAllocationDecision( Logger logger ); + public abstract ConcurrentMap makeAllocationDecision( + Set shards, + RoutingAllocation allocation, + Logger logger + ); + /** * Builds decisions for all nodes in the cluster, so that the explain API can provide information on * allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data). diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 4dc9396751fc9..dbc94775c730a 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -59,6 +59,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -457,6 +458,12 @@ private static NodesToAllocate buildNodesToAllocate( protected abstract FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); + @Override + // to be override + public ConcurrentMap makeAllocationDecision(Set shards, RoutingAllocation allocation, Logger logger) { + return null; + } + private static class NodeShardsResult { final List orderedAllocationCandidates; final int allocationsFound; diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index c0b831b6fe4d0..0cc70c6c39c0f 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -62,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -495,6 +496,13 @@ private static boolean canPerformOperationBasedRecovery( protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); + @Override + // to be override + public ConcurrentMap makeAllocationDecision(Set shards, RoutingAllocation allocation, Logger logger) { + return null; + } + + /** * Returns a boolean indicating whether fetching shard data has been triggered at any point for the given shard. */