Skip to content

Commit

Permalink
Code style fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Swetha Guptha committed May 29, 2024
1 parent ce7373d commit 44173b8
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,15 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

int primariesInRecovery = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node.nodeId());

logger.debug( "ThrottlingAllocationDecider decision, throttle: [{}] primary recovery limit [{}],"
+ " primaries in recovery [{}] invoked for [{}] on node [{}]",
primariesInRecovery >= primariesInitialRecoveries, primariesInitialRecoveries,
primariesInRecovery, shardRouting, node.node() );
logger.debug(
"ThrottlingAllocationDecider decision, throttle: [{}] primary recovery limit [{}],"
+ " primaries in recovery [{}] invoked for [{}] on node [{}]",
primariesInRecovery >= primariesInitialRecoveries,
primariesInitialRecoveries,
primariesInRecovery,
shardRouting,
node.node()
);
if (primariesInRecovery >= primariesInitialRecoveries) {
// TODO: Should index creation not be throttled for primary shards?
return allocation.decision(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
if (decision != null) {
return decision;
}
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(List.of(unassignedShard), Collections.emptyList(), allocation);
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(
List.of(unassignedShard),
Collections.emptyList(),
allocation
);
List<NodeGatewayStartedShard> nodeGatewayStartedShards = adaptToNodeShardStates(unassignedShard, shardsState);
return getAllocationDecision(unassignedShard, allocation, nodeGatewayStartedShards, logger);
}
Expand Down Expand Up @@ -96,9 +100,7 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);

Set<ShardId> shardIdsFromBatch = shardRoutings.stream()
.map(shardRouting -> shardRouting.shardId())
.collect(Collectors.toSet());
Set<ShardId> shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toSet());
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
);
}

final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(List.of(unassignedShard), Collections.emptyList(), allocation);
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(
List.of(unassignedShard),
Collections.emptyList(),
allocation
);
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState);
return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger);
}
Expand Down Expand Up @@ -166,14 +170,14 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
// only fetch data for eligible shards
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);

List<ShardId> shardIdsFromBatch = shardRoutings.stream()
.map(shardRouting -> shardRouting.shardId())
.collect(Collectors.toList());
List<ShardId> shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toList());
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();

if (shardIdsFromBatch.contains(unassignedShard.shardId())) {
// There will be only one entry for the shard in the unassigned shards batch
// for a shard with multiple unassigned replicas, hence we are comparing the shard ids
// instead of ShardRouting in-order to evaluate shard assignment for all unassigned replicas of a shard.
if (!unassignedShard.primary() && shardIdsFromBatch.contains(unassignedShard.shardId())) {
logger.info("Executing allocation decision for shard {}", shardIdsFromBatch);
AllocateUnassignedDecision allocateUnassignedDecision;
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ private void allocateAllUnassignedBatch(final RoutingAllocation allocation) {

public void testAllocateUnassignedBatch() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(),
clusterSettings, random());
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random());
setUpShards(2);
final RoutingAllocation routingAllocation = routingAllocationWithMultiplePrimaries(
allocationDeciders,
CLUSTER_RECOVERED,
2,
0,
"allocId-0", "allocId-1"
"allocId-0",
"allocId-1"
);

for (ShardId shardId : shardsInBatch) {
Expand All @@ -117,16 +117,21 @@ public void testAllocateUnassignedBatch() {

public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder()
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1)
.build(), clusterSettings, random());
AllocationDeciders allocationDeciders = randomAllocationDeciders(
Settings.builder()
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1)
.build(),
clusterSettings,
random()
);
setUpShards(2);
final RoutingAllocation routingAllocation = routingAllocationWithMultiplePrimaries(
allocationDeciders,
CLUSTER_RECOVERED,
2,
0,
"allocId-0", "allocId-1"
"allocId-0",
"allocId-1"
);

for (ShardId shardId : shardsInBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,17 +645,38 @@ public void testDoNotCancelForBrokenNode() {

public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() throws InterruptedException {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder()
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1)
.build(), clusterSettings, random());
AllocationDeciders allocationDeciders = randomAllocationDeciders(
Settings.builder()
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1)
.build(),
clusterSettings,
random()
);
setUpShards(2);
final RoutingAllocation routingAllocation = routingAllocationWithMultipleShards(allocationDeciders, 2, 1);
for (ShardId shardIdFromBatch: shardsInBatch) {
testBatchAllocator
.addShardData(node1, shardIdFromBatch, "MATCH", null, new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
.addShardData(node2, shardIdFromBatch, "NO_MATCH", null, new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
.addShardData(node3, shardIdFromBatch, "MATCH", null, new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
for (ShardId shardIdFromBatch : shardsInBatch) {
testBatchAllocator.addShardData(
node1,
shardIdFromBatch,
"MATCH",
null,
new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)
)
.addShardData(
node2,
shardIdFromBatch,
"NO_MATCH",
null,
new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)
)
.addShardData(
node3,
shardIdFromBatch,
"MATCH",
null,
new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)
);
}
allocateAllUnassignedBatch(routingAllocation);
// Verify the throttling decider was throttled, incoming recoveries on a node should be
Expand Down Expand Up @@ -724,7 +745,8 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide
);
}

private RoutingAllocation routingAllocationWithMultipleShards(AllocationDeciders deciders, int primaries, int replicas) throws InterruptedException {
private RoutingAllocation routingAllocationWithMultipleShards(AllocationDeciders deciders, int primaries, int replicas)
throws InterruptedException {
return routingAllocationWithMultipleShards(deciders, Settings.EMPTY, primaries, replicas, UnassignedInfo.Reason.CLUSTER_RECOVERED);
}

Expand All @@ -733,11 +755,15 @@ private RoutingAllocation routingAllocationWithMultipleShards(
Settings settings,
int primaries,
int replicas,
UnassignedInfo.Reason reason) throws InterruptedException {
UnassignedInfo.Reason reason
) throws InterruptedException {
Map<ShardId, ShardRouting> shardIdShardRoutingMap = new HashMap<>();
Index index = null;
for (ShardId shardIdFromBatch : shardsInBatch) {
shardIdShardRoutingMap.put(shardIdFromBatch, TestShardRouting.newShardRouting(shardIdFromBatch, node1.getId(), true, ShardRoutingState.STARTED));
shardIdShardRoutingMap.put(
shardIdFromBatch,
TestShardRouting.newShardRouting(shardIdFromBatch, node1.getId(), true, ShardRoutingState.STARTED)
);
if (index == null) {
index = shardIdFromBatch.getIndex();
}
Expand All @@ -747,18 +773,21 @@ private RoutingAllocation routingAllocationWithMultipleShards(
.numberOfShards(primaries)
.numberOfReplicas(replicas);
for (ShardId shardIdFromBatch : shardsInBatch) {
indexMetadata.putInSyncAllocationIds(shardIdFromBatch.id(), Sets.newHashSet(shardIdShardRoutingMap.get(shardIdFromBatch).allocationId().getId()));
indexMetadata.putInSyncAllocationIds(
shardIdFromBatch.id(),
Sets.newHashSet(shardIdShardRoutingMap.get(shardIdFromBatch).allocationId().getId())
);
}
Metadata metadata = Metadata.builder().put(indexMetadata).build();
// mark shard as delayed if reason is NODE_LEFT
boolean delayed = reason == UnassignedInfo.Reason.NODE_LEFT
&& UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings).nanos() > 0;
int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0;
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
for (ShardId shardIdFromBatch : shardsInBatch){
for (ShardId shardIdFromBatch : shardsInBatch) {
IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardIdFromBatch);
indexShardRoutingTableBuilder.addShard(shardIdShardRoutingMap.get(shardIdFromBatch));
for (int i = 0 ; i < replicas; i++) {
for (int i = 0; i < replicas; i++) {
indexShardRoutingTableBuilder.addShard(
ShardRouting.newUnassigned(
shardIdFromBatch,
Expand Down Expand Up @@ -907,13 +936,17 @@ public TestBatchAllocator addData(
node,
new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch(
node,
Map.of(shardId, new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata(
new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(
shardId,
new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()),
peerRecoveryRetentionLeases
), storeFileFetchException
))
Map.of(
shardId,
new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata(
new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(
shardId,
new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()),
peerRecoveryRetentionLeases
),
storeFileFetchException
)
)
)
);
return this;
Expand Down Expand Up @@ -956,15 +989,19 @@ public TestBatchAllocator addShardData(
new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()),
peerRecoveryRetentionLeases
),
storeFileFetchException);
Map<ShardId, TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata> shardIdNodeStoreFilesMetadataHashMap = new HashMap<>();
storeFileFetchException
);
Map<ShardId, TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata> shardIdNodeStoreFilesMetadataHashMap =
new HashMap<>();
if (data.containsKey(node)) {
NodeStoreFilesMetadataBatch nodeStoreFilesMetadataBatch = data.get(node);
shardIdNodeStoreFilesMetadataHashMap.putAll(nodeStoreFilesMetadataBatch.getNodeStoreFilesMetadataBatch());
}
shardIdNodeStoreFilesMetadataHashMap.put(shardId, nodeStoreFilesMetadata);
data.put(node, new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch(
node, shardIdNodeStoreFilesMetadataHashMap));
data.put(
node,
new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch(node, shardIdNodeStoreFilesMetadataHashMap)
);

return this;
}
Expand Down

0 comments on commit 44173b8

Please sign in to comment.