Skip to content

Commit

Permalink
Fix unassigned batch allocation
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Jun 6, 2024
1 parent b9ca5a8 commit 532def4
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -46,9 +45,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -81,38 +78,7 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shardRoutings, allocation, logger);
assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for "
+ "some shards";
// get all unassigned shards iterator
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();
try {
if (decisionMap.isEmpty() == false) {
if (decisionMap.containsKey(shard)) {
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
}
} else {
// no need to keep iterating the unassigned shards, if we don't have anything in decision map
break;
}
} catch (Exception e) {
logger.error("Failed to execute decision for shard {} while initializing {}", shard, e);
throw e;
}
}
}

private void executeDecision(
protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
Expand All @@ -135,8 +101,6 @@ private void executeDecision(
}
}

public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {}

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down Expand Up @@ -165,21 +129,6 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> unassignedShardBatch,
RoutingAllocation allocation,
Logger logger
) {

return (HashMap<ShardRouting, AllocateUnassignedDecision>) unassignedShardBatch.stream()
.collect(
Collectors.toMap(
unassignedShard -> unassignedShard,
unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger)
)
);
}

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
Expand Down Expand Up @@ -61,50 +62,58 @@ protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedS

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
AllocateUnassignedDecision decision = getInEligibleShardDecision(unassignedShard, allocation);
if (decision != null) {
return decision;
}
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(
List.of(unassignedShard),
Collections.emptyList(),
allocation
);
List<NodeGatewayStartedShard> nodeGatewayStartedShards = adaptToNodeShardStates(unassignedShard, shardsState);
return getAllocationDecision(unassignedShard, allocation, nodeGatewayStartedShards, logger);
}

/**
* Build allocation decisions for all the shards present in the batch identified by batchId.
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
*
* @param shards set of shards given for allocation
* @param allocation current allocation of all the shards
* @param logger logger used for logging
* @return shard to allocation decision map
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> inEligibleShards = new ArrayList<>();
// identify ineligible shards
for (ShardRouting shard : shards) {
for (ShardRouting shard : shardRoutings) {
AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation);
if (decision != null) {
ineligibleShardAllocationDecisions.put(shard.shardId(), decision);
inEligibleShards.add(shard);
shardAllocationDecisions.put(shard, decision);
} else {
eligibleShards.add(shard);
}
}
// Do not call fetchData if there are no eligible shards
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}

// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);

// process the received data
for (ShardRouting unassignedShard : eligibleShards) {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
// get allocation decision for this shard
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;

if (shardRoutings.contains(unassignedShard)) {
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
} else {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
allocationDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
}
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
return shardAllocationDecisions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
Expand All @@ -29,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Allocates replica shards in a batch mode
Expand All @@ -42,7 +44,7 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator {
* match. Today, a better match is one that can perform a no-op recovery while the previous recovery
* has to copy segment files.
*
* @param allocation the overall routing allocation
* @param allocation the overall routing allocation
* @param shardBatches a list of shard batches to check for existing recoveries
*/
public void processExistingRecoveries(RoutingAllocation allocation, List<List<ShardRouting>> shardBatches) {
Expand Down Expand Up @@ -98,35 +100,56 @@ protected FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadat

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
if (isResponsibleFor(unassignedShard) == false) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation);
Decision allocateDecision = result.v1();
if (allocateDecision.type() != Decision.Type.YES
&& (allocation.debugDecision() == false || hasInitiatedFetching(unassignedShard) == false)) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard);
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
);
}
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(
List.of(unassignedShard),
Collections.emptyList(),
allocation
);
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState);
return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger);
}

@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
*
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
final boolean explain = allocation.debugDecision();
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> ineligibleShards = new ArrayList<>();
HashMap<ShardRouting, Tuple<Decision, Map<String, NodeAllocationResult>>> nodeAllocationDecisions = new HashMap<>();
for (ShardRouting shard : shards) {
HashMap<ShardRouting, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();

for (ShardRouting shard : shardRoutings) {
if (!isResponsibleFor(shard)) {
// this allocator n is not responsible for allocating this shard
ineligibleShards.add(shard);
shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN);
ineligibleShardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN);
continue;
}

Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shard, allocation);
Decision allocationDecision = result.v1();
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shard))) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
shardAllocationDecisions.put(
ineligibleShards.add(shard);
ineligibleShardAllocationDecisions.put(
shard,
AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
Expand All @@ -135,34 +158,54 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
);
continue;
}
// storing the nodeDecisions in nodeAllocationDecisions if the decision is not YES
// so that we don't have to compute the decisions again
nodeAllocationDecisions.put(shard, result);

eligibleShards.add(shard);
}

// Do not call fetchData if there are no eligible shards
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}
// only fetch data for eligible shards
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);

for (ShardRouting unassignedShard : eligibleShards) {
Tuple<Decision, Map<String, NodeAllocationResult>> result = nodeAllocationDecisions.get(unassignedShard);
shardAllocationDecisions.put(
unassignedShard,
getAllocationDecision(
unassignedShard,
allocation,
convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState),
result,
logger
)
);
List<ShardId> shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toList());
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
// There will be only one entry for the shard in the unassigned shards batch
// for a shard with multiple unassigned replicas, hence we are comparing the shard ids
// instead of ShardRouting in-order to evaluate shard assignment for all unassigned replicas of a shard.
if (!unassignedShard.primary() && shardIdsFromBatch.contains(unassignedShard.shardId())) {
AllocateUnassignedDecision allocateUnassignedDecision;
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard)) {
allocateUnassignedDecision = ineligibleShardAllocationDecisions.get(unassignedShard);
} else {
if (!isResponsibleFor(unassignedShard)) {
allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
} else {
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(
unassignedShard,
allocation
);
Decision allocationDecision = result.v1();
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(unassignedShard))) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard);
allocateUnassignedDecision = AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
);
} else {
allocateUnassignedDecision = getAllocationDecision(
unassignedShard,
allocation,
convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState),
result,
logger
);
}
}
}
executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator);
}
}
return shardAllocationDecisions;
}

private Map<DiscoveryNode, StoreFilesMetadata> convertToNodeStoreFilesMetadataMap(
Expand Down
Loading

0 comments on commit 532def4

Please sign in to comment.