Skip to content

Commit

Permalink
FetchData changes for primaries and replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Jul 25, 2023
1 parent e1da84d commit 61f8c61
Showing 1 changed file with 108 additions and 3 deletions.
111 changes: 108 additions & 3 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
+import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;


import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
Expand All @@ -78,11 +84,13 @@ public class GatewayAllocator implements ExistingShardsAllocator {

private final PrimaryShardAllocator primaryShardAllocator;
private final ReplicaShardAllocator replicaShardAllocator;
private final PrimaryShardBatchAllocator primaryBatchShardAllocator;
private final ReplicaShardBatchAllocator replicaBatchShardAllocator;

private final ConcurrentMap<
ShardId,
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections
.newConcurrentMap();
.newConcurrentMap();
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata>> asyncFetchStore =
ConcurrentCollections.newConcurrentMap();
private Set<String> lastSeenEphemeralIds = Collections.emptySet();
Expand All @@ -96,6 +104,8 @@ public GatewayAllocator(
this.rerouteService = rerouteService;
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
this.primaryBatchShardAllocator = new InternalPrimaryBatchShardAllocator();
this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator();
}

@Override
Expand Down Expand Up @@ -303,6 +313,57 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
}
}

class InternalPrimaryBatchShardAllocator extends PrimaryShardBatchAllocator {
@Override
@SuppressWarnings("unchecked")
protected AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation) {
ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting;
if (shardRouting == null) {
return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap());
}

String batchId = startedShardBatchLookup.getOrDefault(shardRouting.shardId(), null);
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
}


if (batchIdToStartedShardBatch.containsKey(batchId) == false) {
logger.debug("Batch {} has no started shard batch", batchId);
throw new IllegalStateException("Batch " + batchId + " has no started shard batch");
}

ShardsBatch shardsBatch = batchIdToStartedShardBatch.get(batchId);
// remove in eligible shards which allocator is not responsible for
inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch);

if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap());
}

Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();

for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncBatchShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncBatchShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData(
allocation.nodes(),
shardToIgnoreNodes
);

if (shardBatchState.hasData()) {
shardBatchState.processAllocation(allocation);
}
return (AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) shardBatchState;
}
}

class InternalReplicaShardAllocator extends ReplicaShardAllocator {

private final TransportNodesListShardStoreMetadata storeAction;
Expand Down Expand Up @@ -335,10 +396,54 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS
}
return shardStores;
}
}

class InternalReplicaBatchShardAllocator extends ReplicaShardBatchAllocator {

@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
return asyncFetchStore.get(shard.shardId()) != null;
@SuppressWarnings("unchecked")
protected AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation) {

ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting;
if (shardRouting == null) {
return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap());
}

String batchId = storeShardBatchLookup.getOrDefault(shardRouting.shardId(), null);
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
}

if (batchIdToStoreShardBatch.containsKey(batchId) == false) {
logger.debug("Batch {} has no store shard batch", batchId);
throw new IllegalStateException("Batch " + batchId + " has no shard store batch");
}

ShardsBatch shardsBatch = batchIdToStoreShardBatch.get(batchId);
// remove in eligible shards which allocator is not responsible for
inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch);

if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap());
}
Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();
for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncBatchShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncBatchShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData(
allocation.nodes(),
shardToIgnoreNodes
);
if (shardBatchState.hasData()) {
shardBatchState.processAllocation(allocation);
}
return (AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch>) shardBatchState;
}
}
}

0 comments on commit 61f8c61

Please sign in to comment.