Skip to content

Commit

Permalink
Moved ShardRouting to ShardId as key in ShardsBatch
Browse files Browse the repository at this point in the history
ShardRouting Equals method have nodeId and unassignedInfo for equals.
The state changes after assignment so removing it from batcher class becomes difficult
Therefore added ShardsId as map key in Shardsbatch class and new EntryClass for it to hold the
values for it
  • Loading branch information
Gaurav614 committed Jul 25, 2023
1 parent 160b72c commit 269c4f2
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -559,14 +559,14 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
}
}


final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
final ShardRouting shardRouting = primaryIterator.next();
if (shardRouting.primary()) {
getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, primaryIterator);
}
}
//
// final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
// while (primaryIterator.hasNext()) {
// final ShardRouting shardRouting = primaryIterator.next();
// if (shardRouting.primary()) {
// getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, primaryIterator);
// }
// }

for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public synchronized int getNumberOfInFlightFetches() {
/**
* Fetches the data for the relevant batch of shards. If there any ongoing async fetches going on, or new ones have
* been initiated by this call, the result will have no data.
* <p>
*/
public synchronized AsyncBatchShardFetch.FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId, Set<String>> ignoreNodes) {
if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
/**
* This class is responsible for fetching shard data from nodes. It is analogous to AsyncShardFetch class except
* that we fetch batch of shards in this class from single transport request to a node.
* @param <T>
*
* @opensearch.internal
*/
Expand Down
116 changes: 79 additions & 37 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public void cleanCaches() {
asyncFetchStarted.clear();
Releasables.close(asyncFetchStore.values());
asyncFetchStore.clear();
batchIdToStartedShardBatch.clear();
batchIdToStoreShardBatch.clear();
startedShardBatchLookup.clear();
storeShardBatchLookup.clear();
}

// for tests
Expand Down Expand Up @@ -161,16 +165,16 @@ public void applyStartedShards(final List<ShardRouting> startedShards, final Rou
for (ShardRouting startedShard : startedShards) {
Releasables.close(asyncFetchStarted.remove(startedShard.shardId()));
Releasables.close(asyncFetchStore.remove(startedShard.shardId()));
safelyRemoveShardFromBatch(startedShard);
}

// ToDo: add new map clearance logic here
}

@Override
public void applyFailedShards(final List<FailedShard> failedShards, final RoutingAllocation allocation) {
for (FailedShard failedShard : failedShards) {
Releasables.close(asyncFetchStarted.remove(failedShard.getRoutingEntry().shardId()));
Releasables.close(asyncFetchStore.remove(failedShard.getRoutingEntry().shardId()));
safelyRemoveShardFromBatch(failedShard.getRoutingEntry());
}
}

Expand Down Expand Up @@ -209,9 +213,9 @@ public void allocateUnassignedBatch(final RoutingAllocation allocation, boolean
assert primaryBatchShardAllocator != null;
assert replicaBatchShardAllocator != null;
if (primary) {
batchIdToStartedShardBatch.values().forEach(shardsBatch -> primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShards(), allocation));
batchIdToStartedShardBatch.values().forEach(shardsBatch -> primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation));
} else {
batchIdToStoreShardBatch.values().forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShards(), allocation));
batchIdToStoreShardBatch.values().forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation));
}
}

Expand All @@ -229,17 +233,20 @@ private void createBatches(RoutingAllocation allocation, boolean primary) {
});
Iterator<ShardRouting> iterator = shardsToBatch.iterator();
long batchSize = MAX_BATCH_SIZE;
Map<ShardRouting, String> addToCurrentBatch = new HashMap<>();
Map<ShardId, ShardBatchEntry> addToCurrentBatch = new HashMap<>();
while (iterator.hasNext()) {
ShardRouting currentShard = iterator.next();
if (batchSize > 0) {
addToCurrentBatch.put(currentShard, IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings()));
ShardBatchEntry shardBatchEntry = new ShardBatchEntry(IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings())
, currentShard);
addToCurrentBatch.put(currentShard.shardId(), shardBatchEntry);
batchSize--;
iterator.remove();
}
// add to batch if batch size full or last shard in unassigned list
if (batchSize == 0 || iterator.hasNext() == false) {
String batchUUId = UUIDs.base64UUID();

ShardsBatch shardsBatch = new ShardsBatch(batchUUId, addToCurrentBatch, primary);
// add the batch to list of current batches
addBatch(shardsBatch, primary);
Expand All @@ -258,16 +265,31 @@ private void addBatch(ShardsBatch shardsBatch, boolean primary) {
batches.put(shardsBatch.getBatchId(), shardsBatch);
}

private void addShardsIdsToLookup(Set<ShardRouting> shards, String batchId, boolean primary) {
private void addShardsIdsToLookup(Set<ShardId> shards, String batchId, boolean primary) {
ConcurrentMap<ShardId, String> lookupMap = primary ? startedShardBatchLookup : storeShardBatchLookup;
shards.forEach(shardRouting -> {
if(lookupMap.containsKey(shardRouting.shardId())){
throw new IllegalStateException("Shard is already Batched. ShardId = " + shardRouting.shardId() + "Batch Id="+ lookupMap.get(shardRouting.shardId()));
shards.forEach(shardId -> {
if(lookupMap.containsKey(shardId)){
throw new IllegalStateException("Shard is already Batched. ShardId = " + shardId + "Batch Id="+ lookupMap.get(shardId));
}
lookupMap.put(shardRouting.shardId(), batchId);
lookupMap.put(shardId, batchId);
});
}

private void safelyRemoveShardFromBatch(ShardRouting shardRouting) {
String batchId = shardRouting.primary() ? startedShardBatchLookup.get(shardRouting.shardId()) : storeShardBatchLookup.get(shardRouting.shardId());
if (batchId == null) {
return;
}
ConcurrentMap<String, ShardsBatch> batches = shardRouting.primary() ? batchIdToStartedShardBatch : batchIdToStoreShardBatch;
ShardsBatch batch = batches.get(batchId);
batch.removeFromBatch(shardRouting);
// remove the batch if it is empty
if (batch.getBatchedShards().isEmpty()) {
Releasables.close(batch.getAsyncFetcher());
batches.remove(batchId);
}
}

// allow for testing infra to change shard allocators implementation
protected static void innerAllocatedUnassigned(
RoutingAllocation allocation,
Expand Down Expand Up @@ -316,6 +338,7 @@ private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) {
Sets.difference(newEphemeralIds, lastSeenEphemeralIds)
)
);

asyncFetchStore.values().forEach(fetch -> clearCacheForPrimary(fetch, allocation));
// recalc to also (lazily) clear out old nodes.
this.lastSeenEphemeralIds = newEphemeralIds;
Expand Down Expand Up @@ -443,29 +466,29 @@ protected AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShard
RoutingAllocation allocation) {
// get batch id for anyone given shard. We are assuming all shards will have same batch Id
ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting;
if (shardRouting == null) {
new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap());
return new AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>(null, Collections.emptyMap());
}

assert shardRouting != null;
String batchId = startedShardBatchLookup.getOrDefault(shardRouting.shardId(), null);
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id");
}


if (batchIdToStartedShardBatch.containsKey(batchId) == false) {
logger.debug("Batch {} has no started shard batch", batchId);
throw new IllegalStateException("Batch " + batchId + " has no started shard batch");
}

ShardsBatch shardsBatch = batchIdToStartedShardBatch.get(batchId);
// remove in eligible shards which allocator is not responsible for
inEligibleShards.forEach(shardsBatch::removeFromBatch);
inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch);

if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap());
return new AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>(null, Collections.emptyMap());
}

Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();
Expand Down Expand Up @@ -597,19 +620,15 @@ private class ShardsBatch {

private final AsyncBatchShardFetch<? extends BaseNodeResponse> asyncBatch;

public Map<ShardRouting, String> getShardsToCustomDataPathMap() {
return shardsToCustomDataPathMap;
}

private Map<ShardRouting, String> shardsToCustomDataPathMap;
private final Map<ShardId, ShardBatchEntry> batchInfo;

private ShardsBatch(String uuid, Map<ShardRouting, String> shardsToCustomDataPathMap, boolean primary) {
this.batchId = uuid;
this.shardsToCustomDataPathMap = shardsToCustomDataPathMap;

Map<ShardId, String> shardIdsMap = shardsToCustomDataPathMap.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey().shardId(),
Map.Entry::getValue
public ShardsBatch(String batchId, Map<ShardId, ShardBatchEntry> shardsWithInfo, boolean primary) {
this.batchId = batchId;
this.batchInfo = new HashMap<>(shardsWithInfo);
// create a ShardId -> customDataPath map for async fetch
Map<ShardId, String> shardIdsMap = batchInfo.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().getCustomDataPath()
));
this.primary = primary;
if (primary) {
Expand All @@ -630,32 +649,35 @@ private ShardsBatch(String uuid, Map<ShardRouting, String> shardsToCustomDataPat
}
}

void removeFromBatch(ShardRouting shard) {
shardsToCustomDataPathMap.remove(shard);
private void removeFromBatch(ShardRouting shard) {

batchInfo.remove(shard.shardId());
asyncBatch.shardsToCustomDataPathMap.remove(shard.shardId());

assert shard.primary() == primary : "Illegal call to delete shard from batch";
// remove from lookup
if (this.primary) {
startedShardBatchLookup.remove(shard.shardId());
} else {
storeShardBatchLookup.remove(shard.shardId());
}
// assert that fetcher and shards are the same as batched shards
assert getBatchedShardIds().equals(asyncBatch.shardsToCustomDataPathMap.keySet());
assert batchInfo.size() == asyncBatch.shardsToCustomDataPathMap.size() : "Shards size is not equal to fetcher size";
}

Set<ShardRouting> getBatchedShards() {
return shardsToCustomDataPathMap.keySet();
Set<ShardRouting> getBatchedShardRoutings() {
return batchInfo.values().stream().map(ShardBatchEntry::getShardRouting).collect(Collectors.toSet());
}

Set<ShardId> getBatchedShardIds(){
return shardsToCustomDataPathMap.keySet().stream().map(ShardRouting::shardId).collect(Collectors.toSet());
Set<ShardId> getBatchedShards() {
return batchInfo.keySet();
}

public String getBatchId() {
return batchId;
}

AsyncBatchShardFetch<? extends BaseNodeResponse> getAsyncFetcher(){
AsyncBatchShardFetch<? extends BaseNodeResponse> getAsyncFetcher() {
return asyncBatch;
}

Expand All @@ -680,6 +702,26 @@ public int hashCode() {
public String toString() {
return "batchId: " + batchId;
}

}

private class ShardBatchEntry {

private final String customDataPath;
private final ShardRouting shardRouting;

public ShardBatchEntry(String customDataPath, ShardRouting shardRouting) {
this.customDataPath = customDataPath;
this.shardRouting = shardRouting;
}

public ShardRouting getShardRouting() {
return shardRouting;
}

public String getCustomDataPath() {
return customDataPath;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned

/**
* Build allocation decisions for all the shards present in the batch identified by batchId.
* @param batchId unique id for this set of shards maintained by GatewayAllocator
* @param shards set of shards given for allocation
* @param allocation current allocation of all the shards
* @param logger logger used for logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ protected NodesGatewayStartedShardsBatch newResponse(
* This function is similar to nodeoperation method of {@link TransportNodesListGatewayStartedShards} we loop over
* the shards here to fetch the shard result in bulk.
*
* @param request
* @param request Request
* @return NodeGatewayStartedShardsBatch
*/
@Override
Expand Down

0 comments on commit 269c4f2

Please sign in to comment.