Skip to content

Commit

Permalink
Fix gateway allocator error when storeDatq is still fetching
Browse files Browse the repository at this point in the history
Signed-off-by: sudarshan baliga <[email protected]>
  • Loading branch information
sudarshan-baliga committed Jul 20, 2023
1 parent 0919259 commit 7b84be5
Show file tree
Hide file tree
Showing 9 changed files with 836 additions and 639 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -201,6 +202,7 @@ void asyncFetchShardPerNode(final DiscoveryNode[] nodes, long fetchingRound) {
action.list(nodes, shardsToCustomDataPathMap, new ActionListener<BaseNodesResponse<T>>() {
@Override
public void onResponse(BaseNodesResponse<T> tBaseNodesResponse) {
logger.info("Got the response ");
processTestAsyncFetch(tBaseNodesResponse.getNodes(),tBaseNodesResponse.failures(), fetchingRound);
}

Expand All @@ -211,6 +213,7 @@ public void onFailure(Exception e) {
for (final DiscoveryNode node : nodes) {
failures.add(new FailedNodeException(node.getId(), "Total failure in fetching", e));
}
logger.error("sdarb5 we got filures for {}", Arrays.toString(nodes));
processTestAsyncFetch(null, failures, fetchingRound);
}
});
Expand All @@ -231,7 +234,7 @@ protected synchronized void processTestAsyncFetch(List<T> responses, List<Failed
return;
}

logger.trace("TEST-processing fetched results");
logger.info("sdarb6 TEST-processing fetched results {}", failures);

if (responses != null) {
for (T response : responses) {
Expand Down
46 changes: 27 additions & 19 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.store.TransportNodesBatchListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;

import java.util.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -90,25 +93,25 @@ public class GatewayAllocator implements ExistingShardsAllocator {


private Map<DiscoveryNode, TransportNodesBulkListGatewayStartedShards.BulkOfNodeGatewayStartedShards> shardsPerNode= ConcurrentCollections.newConcurrentMap();
private Map<DiscoveryNode, TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> shardStoresPerNode= ConcurrentCollections.newConcurrentMap();
private Map<DiscoveryNode, TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch> shardStoresPerNode= ConcurrentCollections.newConcurrentMap();


private AsyncShardsFetchPerNode<TransportNodesBulkListGatewayStartedShards.BulkOfNodeGatewayStartedShards> fetchShardsFromNodes=null;
private AsyncShardsFetchPerNode<TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> fetchShardStoreFromNodes=null;
private AsyncShardsFetchPerNode<TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch> fetchShardStoreFromNodes=null;



private Set<String> lastSeenEphemeralIds = Collections.emptySet();
TransportNodesBulkListGatewayStartedShards testAction;
TransportNodesBatchListShardStoreMetadata testStoreAction;
TransportNodesListShardStoreMetadataBatch testStoreAction;

@Inject
public GatewayAllocator(
RerouteService rerouteService,
TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetadata storeAction,
TransportNodesBulkListGatewayStartedShards testAction,
TransportNodesBatchListShardStoreMetadata testStoreAction
TransportNodesListShardStoreMetadataBatch testStoreAction
) {
this.rerouteService = rerouteService;
this.primaryShardAllocator = new TestInternalPrimaryShardAllocator(testAction);
Expand Down Expand Up @@ -159,7 +162,7 @@ public void applyStartedShards(final List<ShardRouting> startedShards, final Rou
// clean async object and cache for per DiscoverNode if all shards are assigned and none are ignore list
if (allocation.routingNodes().unassigned().isEmpty() && allocation.routingNodes().unassigned().isIgnoredEmpty()){
Releasables.close(fetchShardsFromNodes);
Releasables.close(fetchShardsFromNodes);
Releasables.close(fetchShardStoreFromNodes);
shardsPerNode.clear();
shardStoresPerNode.clear();
fetchShardsFromNodes =null;
Expand All @@ -177,6 +180,7 @@ public void applyFailedShards(final List<FailedShard> failedShards, final Routin
// clean async object and cache for per DiscoverNode if all shards are assigned and none are ignore list
if (allocation.routingNodes().unassigned().isEmpty() && allocation.routingNodes().unassigned().isIgnoredEmpty()){
Releasables.close(fetchShardsFromNodes);
Releasables.close(fetchShardStoreFromNodes);
shardsPerNode.clear();
shardStoresPerNode.clear();
}
Expand Down Expand Up @@ -251,7 +255,7 @@ private synchronized Map<DiscoveryNode, TransportNodesBulkListGatewayStartedShar
}
}

private synchronized Map<DiscoveryNode, TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> collectShardsStorePerNode(RoutingAllocation allocation) {
private synchronized Map<DiscoveryNode, TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch> collectShardsStorePerNode(RoutingAllocation allocation) {
logger.info("sdarbStore Collecting of total shards ={}, over transport");
Map<ShardId, String> batchOfUnassignedShardsWithCustomDataPath = getBatchOfUnassignedShardsWithCustomDataPath(allocation, false);
if (fetchShardStoreFromNodes == null) {
Expand All @@ -272,11 +276,11 @@ private synchronized Map<DiscoveryNode, TransportNodesBatchListShardStoreMetadat
}
}

AsyncShardsFetchPerNode.TestFetchResult<TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> shardStoreTestFetchResult = fetchShardStoreFromNodes.testFetchData(allocation.nodes());
AsyncShardsFetchPerNode.TestFetchResult<TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch> shardStoreTestFetchResult = fetchShardStoreFromNodes.testFetchData(allocation.nodes());

if (shardStoreTestFetchResult.getNodesToShards()==null)
{
logger.info("sdarbStore Fetching probably still going on some nodes for number of shards={}, current fetch = {}",fetchShardsFromNodes.shardsToCustomDataPathMap.size(),fetchShardsFromNodes.cache.size());
logger.info("sdarbStore Fetching probably still going on some nodes for number of shards={}, current fetch = {}",fetchShardStoreFromNodes.shardsToCustomDataPathMap.size(),fetchShardStoreFromNodes.cache.size());
return null;
}
else {
Expand Down Expand Up @@ -500,27 +504,31 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
class TestInternalReplicaShardAllocator extends ReplicaShardAllocator {


private final TransportNodesBatchListShardStoreMetadata storeAction;
private final TransportNodesListShardStoreMetadataBatch storeAction;

TestInternalReplicaShardAllocator(TransportNodesBatchListShardStoreMetadata storeAction) {
TestInternalReplicaShardAllocator(TransportNodesListShardStoreMetadataBatch storeAction) {
this.storeAction = storeAction;
}
@Override
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation) {
ShardId shardId = shard.shardId();
Map<DiscoveryNode, TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> discoveryNodeListOfNodeGatewayStartedShardsMap = shardStoresPerNode;
Map<DiscoveryNode, TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch> discoveryNodeListOfNodeGatewayStartedShardsMap = shardStoresPerNode;

if (shardsPerNode.isEmpty()) {
return new AsyncShardFetch.FetchResult<>(shardId, null, Collections.emptySet());
}
HashMap<DiscoveryNode, TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> dataToAdapt = new HashMap<>();
for (DiscoveryNode node : discoveryNodeListOfNodeGatewayStartedShardsMap.keySet()) {

TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch shardsStoreOnThatNode = discoveryNodeListOfNodeGatewayStartedShardsMap.get(node);
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch shardsStoreOnThatNode = discoveryNodeListOfNodeGatewayStartedShardsMap.get(node);
if (shardsStoreOnThatNode.getNodeStoreFilesMetadataBatch().containsKey(shardId)) {
TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadata nodeGatewayStartedShardsFromAdapt = shardsStoreOnThatNode.getNodeStoreFilesMetadataBatch().get(shardId);
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeGatewayStartedShardsFromAdapt = shardsStoreOnThatNode.getNodeStoreFilesMetadataBatch().get(shardId);
logger.error("Sdarb exceptio nis {}", nodeGatewayStartedShardsFromAdapt.getStoreFileFetchException());
if(nodeGatewayStartedShardsFromAdapt.getStoreFileFetchException() != null) {
logger.error("sdarb There was an error while fetching replica store data for shard {} on node {} so skipping this store data", shard, node);
continue;
}
// construct a object to adapt
TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata nodeGatewayStartedShardsToAdapt = new TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata(node, new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(
TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata nodeGatewayStartedShardsToAdapt = new TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata(node, new TransportNodesListShardStoreMetadata.StoreFilesMetadata(
nodeGatewayStartedShardsFromAdapt.storeFilesMetadata().shardId(),
nodeGatewayStartedShardsFromAdapt.storeFilesMetadata().getMetadataSnapshot(),
nodeGatewayStartedShardsFromAdapt.storeFilesMetadata().peerRecoveryRetentionLeases()
Expand Down Expand Up @@ -575,7 +583,7 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS
@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
boolean fetchingDone = false;
for(Map.Entry<String, AsyncShardsFetchPerNode.NodeEntry<TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch>> asyncFetchStore : fetchShardStoreFromNodes.cache.entrySet()) {
for(Map.Entry<String, AsyncShardsFetchPerNode.NodeEntry<TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch>> asyncFetchStore : fetchShardStoreFromNodes.cache.entrySet()) {
fetchingDone = fetchingDone | asyncFetchStore.getValue().isFetching();
}
logger.info("sdarbStore fetchingDone {}", fetchingDone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,16 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;

import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand Down Expand Up @@ -101,7 +107,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
Expand Down Expand Up @@ -218,7 +224,7 @@ public AllocateUnassignedDecision makeAllocationDecision(
}
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
Expand Down Expand Up @@ -352,7 +358,7 @@ private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private static TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata findStore(
private static TransportNodesListShardStoreMetadata.StoreFilesMetadata findStore(
DiscoveryNode node,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data
) {
Expand All @@ -368,7 +374,7 @@ private MatchingNodes findMatchingNodes(
RoutingAllocation allocation,
boolean noMatchFailedNodes,
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data,
boolean explain
) {
Expand All @@ -381,7 +387,7 @@ private MatchingNodes findMatchingNodes(
&& shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
continue;
}
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
// we don't have any files at all, it is an empty index
if (storeFilesMetadata.isEmpty()) {
continue;
Expand Down Expand Up @@ -437,8 +443,8 @@ private MatchingNodes findMatchingNodes(
}

private static long computeMatchingBytes(
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata
) {
long sizeMatched = 0;
for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) {
Expand All @@ -451,18 +457,18 @@ private static long computeMatchingBytes(
}

private static boolean hasMatchingSyncId(
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
) {
String primarySyncId = primaryStore.syncId();
return primarySyncId != null && primarySyncId.equals(replicaStore.syncId());
}

private static MatchingNode computeMatchingNode(
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
DiscoveryNode replicaNode,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
) {
final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode);
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode);
Expand All @@ -473,7 +479,7 @@ private static MatchingNode computeMatchingNode(
}

private static boolean canPerformOperationBasedRecovery(
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores,
DiscoveryNode targetNode
) {
Expand Down
Loading

0 comments on commit 7b84be5

Please sign in to comment.