Skip to content

Commit

Permalink
Evaluate and execute decisions for shards in a batch together
Browse files Browse the repository at this point in the history
  • Loading branch information
Swetha Guptha committed May 22, 2024
1 parent c0aa1f6 commit 828b854
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,16 @@ public void allocateUnassigned(
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shardRoutings, allocation, logger);
assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for "
+ "some shards";
// get all unassigned shards iterator
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();
try {
if (decisionMap.isEmpty() == false) {
if (decisionMap.containsKey(shard)) {
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
}
} else {
// no need to keep iterating the unassigned shards, if we don't have anything in decision map
break;
}
} catch (Exception e) {
logger.error("Failed to execute decision for shard {} while initializing {}", shard, e);
throw e;
ShardRouting shardRouting = iterator.next();
if (shardRoutings.contains(shardRouting)) {
allocateUnassigned(shardRouting, allocation, iterator);
}
}
}

private void executeDecision(
protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
Expand All @@ -24,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is
Expand Down Expand Up @@ -60,51 +62,42 @@ protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedS
}

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
}

/**
* Build allocation decisions for all the shards present in the batch identified by batchId.
*
* @param shards set of shards given for allocation
* @param allocation current allocation of all the shards
* @param logger logger used for logging
* @return shard to allocation decision map
*/
@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> inEligibleShards = new ArrayList<>();
// identify ineligible shards
for (ShardRouting shard : shards) {
for (ShardRouting shard : shardRoutings) {
AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation);
if (decision != null) {
ineligibleShardAllocationDecisions.put(shard.shardId(), decision);
inEligibleShards.add(shard);
shardAllocationDecisions.put(shard, decision);
} else {
eligibleShards.add(shard);
}
}
// Do not call fetchData if there are no eligible shards
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}

// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);

// process the received data
for (ShardRouting unassignedShard : eligibleShards) {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
// get allocation decision for this shard
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
List<ShardId> shardIdsFromBatch = shardRoutings.stream()
.map(shardRouting -> shardRouting.shardId())
.collect(Collectors.toList());
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocateUnassignedDecision;

if (shardIdsFromBatch.contains(unassignedShard.shardId())) {
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
allocateUnassignedDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
} else {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
allocateUnassignedDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
}
executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator);
}
}
return shardAllocationDecisions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
Expand All @@ -29,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Allocates replica shards in a batch mode
Expand Down Expand Up @@ -97,26 +99,17 @@ protected FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadat
}

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
}

@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
final boolean explain = allocation.debugDecision();
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> ineligibleShards = new ArrayList<>();
HashMap<ShardRouting, Tuple<Decision, Map<String, NodeAllocationResult>>> nodeAllocationDecisions = new HashMap<>();
for (ShardRouting shard : shards) {
for (ShardRouting shard : shardRoutings) {
if (!isResponsibleFor(shard)) {
// this allocator n is not responsible for allocating this shard
ineligibleShards.add(shard);
shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN);
ineligibleShardAllocationDecisions.put(shard.shardId(), AllocateUnassignedDecision.NOT_TAKEN);
continue;
}

Expand All @@ -126,8 +119,9 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
shardAllocationDecisions.put(
shard,
ineligibleShards.add(shard);
ineligibleShardAllocationDecisions.put(
shard.shardId(),
AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
Expand All @@ -142,27 +136,33 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
eligibleShards.add(shard);
}

// Do not call fetchData if there are no eligible shards
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}
// only fetch data for eligible shards
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);

for (ShardRouting unassignedShard : eligibleShards) {
Tuple<Decision, Map<String, NodeAllocationResult>> result = nodeAllocationDecisions.get(unassignedShard);
shardAllocationDecisions.put(
unassignedShard,
getAllocationDecision(
unassignedShard,
allocation,
convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState),
result,
logger
)
);
List<ShardId> shardIdsFromBatch = shardRoutings.stream()
.map(shardRouting -> shardRouting.shardId())
.collect(Collectors.toList());
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();

if (shardIdsFromBatch.contains(unassignedShard.shardId())) {
AllocateUnassignedDecision allocateUnassignedDecision;
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
allocateUnassignedDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
} else {
Tuple<Decision, Map<String, NodeAllocationResult>> result = nodeAllocationDecisions.get(unassignedShard);
allocateUnassignedDecision = getAllocationDecision(
unassignedShard,
allocation,
convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState),
result,
logger
);
}
executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator);
}
}
return shardAllocationDecisions;
}

private Map<DiscoveryNode, StoreFilesMetadata> convertToNodeStoreFilesMetadataMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,6 @@ public void buildTestAllocator() {
this.batchAllocator = new TestBatchAllocator();
}

private void allocateAllUnassigned(final RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
batchAllocator.allocateUnassigned(iterator.next(), allocation, iterator);
}
}

private void allocateAllUnassignedBatch(final RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
List<ShardRouting> shardsToBatch = new ArrayList<>();
Expand All @@ -90,20 +83,6 @@ private void allocateAllUnassignedBatch(final RoutingAllocation allocation) {
batchAllocator.allocateUnassignedBatch(shardsToBatch, allocation);
}

public void testMakeAllocationDecisionDataFetching() {
final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1");

List<ShardRouting> shards = new ArrayList<>();
allocateAllUnassignedBatch(allocation);
ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();
shards.add(shard);
HashMap<ShardRouting, AllocateUnassignedDecision> allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger);
// verify we get decisions for all the shards
assertEquals(shards.size(), allDecisions.size());
assertEquals(shards, new ArrayList<>(allDecisions.keySet()));
assertEquals(AllocationDecision.AWAITING_INFO, allDecisions.get(shard).getAllocationDecision());
}

public void testMakeAllocationDecisionForReplicaShard() {
final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1");

Expand All @@ -116,35 +95,20 @@ public void testMakeAllocationDecisionForReplicaShard() {
assertFalse(allDecisions.get(replicaShards.get(0)).isDecisionTaken());
}

public void testMakeAllocationDecisionDataFetched() {
final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1");

List<ShardRouting> shards = new ArrayList<>();
ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();
shards.add(shard);
batchAllocator.addData(node1, "allocId1", true, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()));
HashMap<ShardRouting, AllocateUnassignedDecision> allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger);
// verify we get decisions for all the shards
assertEquals(shards.size(), allDecisions.size());
assertEquals(shards, new ArrayList<>(allDecisions.keySet()));
assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision());
}

public void testMakeAllocationDecisionDataFetchedMultipleShards() {
public void testAllocateUnassignedBatch() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(),
clusterSettings, random());
setUpShards(2);
final RoutingAllocation allocation = routingAllocationWithMultiplePrimaries(
noAllocationDeciders(),
final RoutingAllocation routingAllocation = routingAllocationWithMultiplePrimaries(
allocationDeciders,
CLUSTER_RECOVERED,
2,
0,
"allocId-0",
"allocId-1"
"allocId-0", "allocId-1"
);
List<ShardRouting> shards = new ArrayList<>();

for (ShardId shardId : shardsInBatch) {
ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();
allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard().recoverySource();
shards.add(shard);
batchAllocator.addShardData(
node1,
"allocId-" + shardId.id(),
Expand All @@ -154,16 +118,18 @@ public void testMakeAllocationDecisionDataFetchedMultipleShards() {
null
);
}
HashMap<ShardRouting, AllocateUnassignedDecision> allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger);
// verify we get decisions for all the shards
assertEquals(shards.size(), allDecisions.size());
assertEquals(new HashSet<>(shards), allDecisions.keySet());
for (ShardRouting shard : shards) {
assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision());
}

allocateAllUnassignedBatch(routingAllocation);

assertEquals(0, routingAllocation.routingNodes().unassigned().size());
List<ShardRouting> initializingShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
assertEquals(2, initializingShards.size());
assertTrue(shardsInBatch.contains(initializingShards.get(0).shardId()));
assertTrue(shardsInBatch.contains(initializingShards.get(1).shardId()));
assertEquals(2, routingAllocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()));
}

public void testAllocateUnassignedBatchThrottlingAllocationDeciderNotHonoured() {
public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder()
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1)
Expand All @@ -177,10 +143,7 @@ public void testAllocateUnassignedBatchThrottlingAllocationDeciderNotHonoured()
"allocId-0", "allocId-1"
);

List<ShardRouting> shards = new ArrayList<>();
for (ShardId shardId : shardsInBatch) {
ShardRouting shard = routingAllocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();
shards.add(shard);
batchAllocator.addShardData(
node1,
"allocId-" + shardId.id(),
Expand All @@ -194,10 +157,9 @@ public void testAllocateUnassignedBatchThrottlingAllocationDeciderNotHonoured()
allocateAllUnassignedBatch(routingAllocation);

// Verify the throttling decider was not throttled, recovering shards on node greater than initial concurrent recovery setting
assertFalse(routingAllocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId())
< 1);
assertEquals(1, routingAllocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()));
List<ShardRouting> initializingShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
assertEquals(2, initializingShards.size());
assertEquals(1, initializingShards.size());
Set<String> nodesWithInitialisingShards = initializingShards.stream().map(ShardRouting::currentNodeId).collect(Collectors.toSet());
assertEquals(1, nodesWithInitialisingShards.size());
assertEquals(Collections.singleton(node1.getId()), nodesWithInitialisingShards);
Expand Down
Loading

0 comments on commit 828b854

Please sign in to comment.