diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index cdcf813d9ede0..86906d2703be8 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -55,9 +55,15 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; ++import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; + import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -78,11 +84,13 @@ public class GatewayAllocator implements ExistingShardsAllocator { private final PrimaryShardAllocator primaryShardAllocator; private final ReplicaShardAllocator replicaShardAllocator; + private final PrimaryShardBatchAllocator primaryBatchShardAllocator; + private final ReplicaShardBatchAllocator replicaBatchShardAllocator; private final ConcurrentMap< ShardId, AsyncShardFetch> asyncFetchStarted = ConcurrentCollections - .newConcurrentMap(); + .newConcurrentMap(); private final ConcurrentMap> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); private Set lastSeenEphemeralIds = Collections.emptySet(); @@ -96,6 +104,8 @@ public GatewayAllocator( this.rerouteService = rerouteService; this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); + this.primaryBatchShardAllocator = new InternalPrimaryBatchShardAllocator(); + this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator(); } @Override @@ -303,6 +313,57 @@ protected AsyncShardFetch.FetchResult fetchData(Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation) { + ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null; + shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; + if (shardRouting == null) { + return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + + String batchId = startedShardBatchLookup.getOrDefault(shardRouting.shardId(), null); + if (batchId == null) { + logger.debug("Shard {} has no batch id", shardRouting); + throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); + } + + + if (batchIdToStartedShardBatch.containsKey(batchId) == false) { + logger.debug("Batch {} has no started shard batch", batchId); + throw new IllegalStateException("Batch " + batchId + " has no started shard batch"); + } + + ShardsBatch shardsBatch = batchIdToStartedShardBatch.get(batchId); + // remove in eligible shards which allocator is not responsible for + inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch); + + if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) { + logger.debug("Batch {} is empty", batchId); + return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + + Map> shardToIgnoreNodes = new HashMap<>(); + + for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) { + shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); + } + AsyncBatchShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); + AsyncBatchShardFetch.FetchResult shardBatchState = asyncFetcher.fetchData( + allocation.nodes(), + shardToIgnoreNodes + ); + + if (shardBatchState.hasData()) { + shardBatchState.processAllocation(allocation); + } + return (AsyncBatchShardFetch.FetchResult) shardBatchState; + } + } + class InternalReplicaShardAllocator extends ReplicaShardAllocator { private final TransportNodesListShardStoreMetadata storeAction; @@ -335,10 +396,54 @@ protected AsyncShardFetch.FetchResult fetchData(Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation) { + + ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null; + shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; + if (shardRouting == null) { + return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap()); + } + + String batchId = storeShardBatchLookup.getOrDefault(shardRouting.shardId(), null); + if (batchId == null) { + logger.debug("Shard {} has no batch id", shardRouting); + throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); + } + + if (batchIdToStoreShardBatch.containsKey(batchId) == false) { + logger.debug("Batch {} has no store shard batch", batchId); + throw new IllegalStateException("Batch " + batchId + " has no shard store batch"); + } + + ShardsBatch shardsBatch = batchIdToStoreShardBatch.get(batchId); + // remove in eligible shards which allocator is not responsible for + inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch); + + if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) { + logger.debug("Batch {} is empty", batchId); + return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap()); + } + Map> shardToIgnoreNodes = new HashMap<>(); + for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) { + shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); + } + AsyncBatchShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); + AsyncBatchShardFetch.FetchResult shardBatchState = asyncFetcher.fetchData( + allocation.nodes(), + shardToIgnoreNodes + ); + if (shardBatchState.hasData()) { + shardBatchState.processAllocation(allocation); + } + return (AsyncBatchShardFetch.FetchResult) shardBatchState; } } }