Skip to content

Commit

Permalink
Added processExistingRecoveries method to RSA
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jul 26, 2023
1 parent 16a5057 commit 1f2899f
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 12 deletions.
18 changes: 14 additions & 4 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,20 @@ public void beforeAllocation(final RoutingAllocation allocation) {

@Override
public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) {
assert replicaShardAllocator != null;
if (allocation.routingNodes().hasInactiveShards()) {
// cancel existing recoveries if we have a better match
replicaShardAllocator.processExistingRecoveries(allocation);
// ToDo: fetch from settings
boolean batchMode = true;
if (batchMode) {
assert replicaBatchShardAllocator != null;
List<Set<ShardRouting>> storedShardBatches = batchIdToStoreShardBatch.values().stream()
.map(ShardsBatch::getBatchedShardRoutings)
.collect(Collectors.toList());
replicaBatchShardAllocator.processExistingRecoveries(allocation, storedShardBatches);
} else {
assert replicaShardAllocator != null;
if (allocation.routingNodes().hasInactiveShards()) {
// cancel existing recoveries if we have a better match
replicaShardAllocator.processExistingRecoveries(allocation);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,14 @@
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand All @@ -49,9 +46,100 @@ public abstract class ReplicaShardBatchAllocator extends BaseGatewayShardAllocat
* match. Today, a better match is one that can perform a no-op recovery while the previous recovery
* has to copy segment files.
*/
public void processExistingRecoveries(RoutingAllocation allocation) {
// ToDo: complete this method
throw new UnsupportedOperationException("Not yet implemented");
public void processExistingRecoveries(RoutingAllocation allocation, List<Set<ShardRouting>> shardBatches) {
Metadata metadata = allocation.metadata();
RoutingNodes routingNodes = allocation.routingNodes();
List<Runnable> shardCancellationActions = new ArrayList<>();
for (Set<ShardRouting> shardBatch : shardBatches) {
Set<ShardRouting> eligibleFetchShards = new HashSet<>();
Set<ShardRouting> ineligibleShards = new HashSet<>();
for (ShardRouting shard : shardBatch) {
if (shard.primary()) {
ineligibleShards.add(shard);
continue;
}
if (shard.initializing() == false) {
ineligibleShards.add(shard);
continue;
}
if (shard.relocatingNodeId() != null) {
ineligibleShards.add(shard);
continue;
}

// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) {
ineligibleShards.add(shard);
continue;
}
eligibleFetchShards.add(shard);
}
AsyncBatchShardFetch.FetchResult <NodeStoreFilesMetadataBatch> shardState = fetchData(eligibleFetchShards, ineligibleShards, allocation);
if (shardState.hasData()) {
logger.trace("{}: fetching new stores for initializing shard batch", eligibleFetchShards);
continue; // still fetching
}
for (ShardRouting shard: eligibleFetchShards) {
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId());
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore= findStore(primaryNode, shardState, shard);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
continue;
}
ReplicaShardAllocator.MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, shardState, false);
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
if (currentNode.equals(nodeWithHighestMatch) == false
&& matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch)
&& canPerformOperationBasedRecovery(primaryStore, shardState, currentNode, shard) == false) {
// we found a better match that can perform noop recovery, cancel the existing allocation.
logger.debug(
"cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]",
currentNode,
nodeWithHighestMatch
);
final Set<String> failedNodeIds = shard.unassignedInfo() == null
? Collections.emptySet()
: shard.unassignedInfo().getFailedNodeIds();
UnassignedInfo unassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to ["
+ currentNode
+ "] cancelled, can perform a noop recovery on ["
+ nodeWithHighestMatch
+ "]",
null,
0,
allocation.getCurrentNanoTime(),
System.currentTimeMillis(),
false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
failedNodeIds
);
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
shardCancellationActions.add(
() -> routingNodes.failShard(
logger,
shard,
unassignedInfo,
metadata.getIndexSafe(shard.index()),
allocation.changes()
)
);
}
}
}
}
for (Runnable action : shardCancellationActions) {
action.run();
}
}

private static boolean isResponsibleFor(final ShardRouting shard) {
Expand Down Expand Up @@ -397,6 +485,35 @@ private static TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata find
if (nodeFilesStore == null) {
return null;
}
return nodeFilesStore.getNodeStoreFilesMetadataBatch().get(shard.shardId()).storeFilesMetadata();
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeFileStoreMetadata = nodeFilesStore.getNodeStoreFilesMetadataBatch().get(shard.shardId());
if (nodeFileStoreMetadata.getStoreFileFetchException() != null) {
// Do we need to throw an exception here, to handle this case differently?
return null;
}
return nodeFileStoreMetadata.storeFilesMetadata();
}

private static boolean canPerformOperationBasedRecovery(
TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore,
FetchResult<NodeStoreFilesMetadataBatch> data,
DiscoveryNode targetNode,
ShardRouting shard
) {
final NodeStoreFilesMetadataBatch nodeFilesStore = data.getData().get(targetNode);
if (nodeFilesStore == null) {
return false;
}
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeFileStoreMetadata = nodeFilesStore.getNodeStoreFilesMetadataBatch().get(shard.shardId());
if (nodeFileStoreMetadata.getStoreFileFetchException() != null) {
return false;
}
TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata targetNodeStore = nodeFileStoreMetadata.storeFilesMetadata();
if (targetNodeStore == null || targetNodeStore.isEmpty()) {
return false;
}
if (hasMatchingSyncId(primaryStore, targetNodeStore)) {
return true;
}
return primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(targetNode) >= 0;
}
}

0 comments on commit 1f2899f

Please sign in to comment.