From 44173b8362e2d136fc884c4d2c5d53ca5fececfb Mon Sep 17 00:00:00 2001 From: Swetha Guptha Date: Wed, 29 May 2024 21:30:38 +0530 Subject: [PATCH] Code style fix --- .../decider/ThrottlingAllocationDecider.java | 13 ++- .../gateway/PrimaryShardBatchAllocator.java | 10 ++- .../gateway/ReplicaShardBatchAllocator.java | 16 ++-- .../PrimaryShardBatchAllocatorTests.java | 19 ++-- .../ReplicaShardBatchAllocatorTests.java | 89 +++++++++++++------ 5 files changed, 100 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 2cf1121e9ede9..bf7ca4aa1c159 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -177,10 +177,15 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing int primariesInRecovery = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node.nodeId()); - logger.debug( "ThrottlingAllocationDecider decision, throttle: [{}] primary recovery limit [{}]," - + " primaries in recovery [{}] invoked for [{}] on node [{}]", - primariesInRecovery >= primariesInitialRecoveries, primariesInitialRecoveries, - primariesInRecovery, shardRouting, node.node() ); + logger.debug( + "ThrottlingAllocationDecider decision, throttle: [{}] primary recovery limit [{}]," + + " primaries in recovery [{}] invoked for [{}] on node [{}]", + primariesInRecovery >= primariesInitialRecoveries, + primariesInitialRecoveries, + primariesInRecovery, + shardRouting, + node.node() + ); if (primariesInRecovery >= primariesInitialRecoveries) { // TODO: Should index creation not be throttled for primary shards? return allocation.decision( diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index 1fa2f68511c40..7518d74ff17b8 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -68,7 +68,11 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned if (decision != null) { return decision; } - final FetchResult shardsState = fetchData(List.of(unassignedShard), Collections.emptyList(), allocation); + final FetchResult shardsState = fetchData( + List.of(unassignedShard), + Collections.emptyList(), + allocation + ); List nodeGatewayStartedShards = adaptToNodeShardStates(unassignedShard, shardsState); return getAllocationDecision(unassignedShard, allocation, nodeGatewayStartedShards, logger); } @@ -96,9 +100,7 @@ public void allocateUnassignedBatch(List shardRoutings, RoutingAll // only fetch data for eligible shards final FetchResult shardsState = fetchData(eligibleShards, inEligibleShards, allocation); - Set shardIdsFromBatch = shardRoutings.stream() - .map(shardRouting -> shardRouting.shardId()) - .collect(Collectors.toSet()); + Set shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toSet()); RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { ShardRouting unassignedShard = iterator.next(); diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index 953681dd0225b..f99dfa6fee14f 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -116,7 +116,11 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned ); } - final FetchResult shardsState = fetchData(List.of(unassignedShard), Collections.emptyList(), allocation); + final FetchResult shardsState = fetchData( + List.of(unassignedShard), + Collections.emptyList(), + allocation + ); Map nodeShardStores = convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState); return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger); } @@ -166,14 +170,14 @@ public void allocateUnassignedBatch(List shardRoutings, RoutingAll // only fetch data for eligible shards final FetchResult shardsState = fetchData(eligibleShards, ineligibleShards, allocation); - List shardIdsFromBatch = shardRoutings.stream() - .map(shardRouting -> shardRouting.shardId()) - .collect(Collectors.toList()); + List shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toList()); RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { ShardRouting unassignedShard = iterator.next(); - - if (shardIdsFromBatch.contains(unassignedShard.shardId())) { + // There will be only one entry for the shard in the unassigned shards batch + // for a shard with multiple unassigned replicas, hence we are comparing the shard ids + // instead of ShardRouting in-order to evaluate shard assignment for all unassigned replicas of a shard. + if (!unassignedShard.primary() && shardIdsFromBatch.contains(unassignedShard.shardId())) { logger.info("Executing allocation decision for shard {}", shardIdsFromBatch); AllocateUnassignedDecision allocateUnassignedDecision; if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) { diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index cf9e43c12617b..729e484575bce 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -83,15 +83,15 @@ private void allocateAllUnassignedBatch(final RoutingAllocation allocation) { public void testAllocateUnassignedBatch() { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), - clusterSettings, random()); + AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random()); setUpShards(2); final RoutingAllocation routingAllocation = routingAllocationWithMultiplePrimaries( allocationDeciders, CLUSTER_RECOVERED, 2, 0, - "allocId-0", "allocId-1" + "allocId-0", + "allocId-1" ); for (ShardId shardId : shardsInBatch) { @@ -117,16 +117,21 @@ public void testAllocateUnassignedBatch() { public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1) - .build(), clusterSettings, random()); + AllocationDeciders allocationDeciders = randomAllocationDeciders( + Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1) + .build(), + clusterSettings, + random() + ); setUpShards(2); final RoutingAllocation routingAllocation = routingAllocationWithMultiplePrimaries( allocationDeciders, CLUSTER_RECOVERED, 2, 0, - "allocId-0", "allocId-1" + "allocId-0", + "allocId-1" ); for (ShardId shardId : shardsInBatch) { diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java index f07d21dd814ba..892aaaad16ee3 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java @@ -645,17 +645,38 @@ public void testDoNotCancelForBrokenNode() { public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() throws InterruptedException { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1) - .build(), clusterSettings, random()); + AllocationDeciders allocationDeciders = randomAllocationDeciders( + Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1) + .build(), + clusterSettings, + random() + ); setUpShards(2); final RoutingAllocation routingAllocation = routingAllocationWithMultipleShards(allocationDeciders, 2, 1); - for (ShardId shardIdFromBatch: shardsInBatch) { - testBatchAllocator - .addShardData(node1, shardIdFromBatch, "MATCH", null, new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) - .addShardData(node2, shardIdFromBatch, "NO_MATCH", null, new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) - .addShardData(node3, shardIdFromBatch, "MATCH", null, new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + for (ShardId shardIdFromBatch : shardsInBatch) { + testBatchAllocator.addShardData( + node1, + shardIdFromBatch, + "MATCH", + null, + new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION) + ) + .addShardData( + node2, + shardIdFromBatch, + "NO_MATCH", + null, + new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION) + ) + .addShardData( + node3, + shardIdFromBatch, + "MATCH", + null, + new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION) + ); } allocateAllUnassignedBatch(routingAllocation); // Verify the throttling decider was throttled, incoming recoveries on a node should be @@ -724,7 +745,8 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide ); } - private RoutingAllocation routingAllocationWithMultipleShards(AllocationDeciders deciders, int primaries, int replicas) throws InterruptedException { + private RoutingAllocation routingAllocationWithMultipleShards(AllocationDeciders deciders, int primaries, int replicas) + throws InterruptedException { return routingAllocationWithMultipleShards(deciders, Settings.EMPTY, primaries, replicas, UnassignedInfo.Reason.CLUSTER_RECOVERED); } @@ -733,11 +755,15 @@ private RoutingAllocation routingAllocationWithMultipleShards( Settings settings, int primaries, int replicas, - UnassignedInfo.Reason reason) throws InterruptedException { + UnassignedInfo.Reason reason + ) throws InterruptedException { Map shardIdShardRoutingMap = new HashMap<>(); Index index = null; for (ShardId shardIdFromBatch : shardsInBatch) { - shardIdShardRoutingMap.put(shardIdFromBatch, TestShardRouting.newShardRouting(shardIdFromBatch, node1.getId(), true, ShardRoutingState.STARTED)); + shardIdShardRoutingMap.put( + shardIdFromBatch, + TestShardRouting.newShardRouting(shardIdFromBatch, node1.getId(), true, ShardRoutingState.STARTED) + ); if (index == null) { index = shardIdFromBatch.getIndex(); } @@ -747,7 +773,10 @@ private RoutingAllocation routingAllocationWithMultipleShards( .numberOfShards(primaries) .numberOfReplicas(replicas); for (ShardId shardIdFromBatch : shardsInBatch) { - indexMetadata.putInSyncAllocationIds(shardIdFromBatch.id(), Sets.newHashSet(shardIdShardRoutingMap.get(shardIdFromBatch).allocationId().getId())); + indexMetadata.putInSyncAllocationIds( + shardIdFromBatch.id(), + Sets.newHashSet(shardIdShardRoutingMap.get(shardIdFromBatch).allocationId().getId()) + ); } Metadata metadata = Metadata.builder().put(indexMetadata).build(); // mark shard as delayed if reason is NODE_LEFT @@ -755,10 +784,10 @@ private RoutingAllocation routingAllocationWithMultipleShards( && UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings).nanos() > 0; int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - for (ShardId shardIdFromBatch : shardsInBatch){ + for (ShardId shardIdFromBatch : shardsInBatch) { IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardIdFromBatch); indexShardRoutingTableBuilder.addShard(shardIdShardRoutingMap.get(shardIdFromBatch)); - for (int i = 0 ; i < replicas; i++) { + for (int i = 0; i < replicas; i++) { indexShardRoutingTableBuilder.addShard( ShardRouting.newUnassigned( shardIdFromBatch, @@ -907,13 +936,17 @@ public TestBatchAllocator addData( node, new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch( node, - Map.of(shardId, new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata( - new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( - shardId, - new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), - peerRecoveryRetentionLeases - ), storeFileFetchException - )) + Map.of( + shardId, + new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata( + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( + shardId, + new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), + peerRecoveryRetentionLeases + ), + storeFileFetchException + ) + ) ) ); return this; @@ -956,15 +989,19 @@ public TestBatchAllocator addShardData( new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), peerRecoveryRetentionLeases ), - storeFileFetchException); - Map shardIdNodeStoreFilesMetadataHashMap = new HashMap<>(); + storeFileFetchException + ); + Map shardIdNodeStoreFilesMetadataHashMap = + new HashMap<>(); if (data.containsKey(node)) { NodeStoreFilesMetadataBatch nodeStoreFilesMetadataBatch = data.get(node); shardIdNodeStoreFilesMetadataHashMap.putAll(nodeStoreFilesMetadataBatch.getNodeStoreFilesMetadataBatch()); } shardIdNodeStoreFilesMetadataHashMap.put(shardId, nodeStoreFilesMetadata); - data.put(node, new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch( - node, shardIdNodeStoreFilesMetadataHashMap)); + data.put( + node, + new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch(node, shardIdNodeStoreFilesMetadataHashMap) + ); return this; }