Skip to content

Commit

Permalink
[Backport 2.x] ImmutableOpenMap to j.u.Map (opensearch-project#838)
Browse files Browse the repository at this point in the history
* Metadata members from ImmutableOpenMap to j.u.Map

Signed-off-by: Hailong Cui <[email protected]>

* add field remoteStoreIndexShallowCopy in SM TestUtils (opensearch-project#825)

Signed-off-by: zhichao-aws <[email protected]>

* fix NPE for transform aggregations (opensearch-project#830)

Signed-off-by: Hailong Cui <[email protected]>

* Fix detekt

Signed-off-by: Hailong Cui <[email protected]>

---------

Signed-off-by: Hailong Cui <[email protected]>
Signed-off-by: zhichao-aws <[email protected]>
Co-authored-by: zhichao-aws <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
2 people authored and ronnaksaxena committed Jul 19, 2023
1 parent 403577e commit 4fda78a
Show file tree
Hide file tree
Showing 19 changed files with 57 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class IndexStateManagementHistory(
clusterStateRequest,
object : ActionListener<ClusterStateResponse> {
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
if (!clusterStateResponse.state.metadata.indices.isEmpty) {
if (!clusterStateResponse.state.metadata.indices.isEmpty()) {
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
logger.info("Deleting old history indices viz $indicesToDelete")
deleteAllOldHistoryIndices(indicesToDelete)
Expand All @@ -199,7 +199,10 @@ class IndexStateManagementHistory(
val creationTime = indexMetaData.creationDate

if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis) {
val alias = indexMetaData.aliases.firstOrNull { IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS == it.value.alias }
val alias = indexMetaData.aliases.firstNotNullOfOrNull {
alias ->
IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS == alias.value.alias
}
if (alias != null && historyEnabled) {
// If index has write alias and history is enable, don't delete the index.
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,12 @@ object ManagedIndexRunner :
}

// Check the cluster state for the index metadata
var clusterStateIndexMetadata = getIndexMetadata(managedIndexConfig.index)
val clusterStateIndexMetadata = getIndexMetadata(managedIndexConfig.index)
val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService
val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getCustomIndexUUID(it) }
// If the index metadata is null, the index is not in the cluster state. If the index metadata is not null, but
// the cluster state index uuid differs from the one in the managed index config then the config is referring
// to a different index which does not exist in the cluster. We need to check all of the extensions to confirm an index exists
// to a different index which does not exist in the cluster. We need to check all the extensions to confirm an index exists
if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) {
// If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types
val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE }
Expand Down Expand Up @@ -846,7 +846,7 @@ object ManagedIndexRunner :

val response: ClusterStateResponse = client.admin().cluster().suspendUntil { state(clusterStateRequest, it) }

indexMetaData = response.state.metadata.indices.firstOrNull()?.value
indexMetaData = response.state.metadata.indices[index]
} catch (e: Exception) {
logger.error("Failed to get IndexMetaData from cluster manager cluster state for index=$index", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import java.lang.Exception
* MetadataService starts to move metadata from cluster state to config index
*/
@OpenForTesting
@Suppress("MagicNumber", "ReturnCount", "LongMethod", "ComplexMethod")
@Suppress("MagicNumber", "ReturnCount", "LongMethod", "ComplexMethod", "NestedBlockDepth")
class MetadataService(
private val client: Client,
private val clusterService: ClusterService,
Expand Down Expand Up @@ -111,11 +111,13 @@ class MetadataService(
val indexUuidMap = mutableMapOf<IndexUuid, IndexName>()
clusterStateManagedIndexMetadata.forEach { (indexName, metadata) ->
val indexMetadata = indicesMetadata[indexName]
val currentIndexUuid = indexMetadata.indexUUID
if (currentIndexUuid != metadata?.indexUuid) {
corruptManagedIndices.add(indexMetadata.index)
} else {
indexUuidMap[currentIndexUuid] = indexName
indexMetadata?.let {
val currentIndexUuid = it.indexUUID
if (currentIndexUuid != metadata?.indexUuid) {
corruptManagedIndices.add(it.index)
} else {
indexUuidMap[currentIndexUuid] = indexName
}
}
}
logger.info("Corrupt managed indices with outdated index uuid in metadata: $corruptManagedIndices")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ fun XContentBuilder.buildMetadata(name: String, metadata: ToXContentFragment, pa

// Get the oldest rollover time or null if index was never rolled over
fun IndexMetadata.getOldestRolloverTime(): Instant? {
return this.rolloverInfos.values()
.map { it.value.time }
.minOrNull() // oldest should be min as its epoch time
return this.rolloverInfos.values.minOfOrNull { it.time } // oldest should be min as its epoch time
?.let { Instant.ofEpochMilli(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
val (statsStore, statsDocs, shardStats) = getIndexStats(indexName, client) ?: return this
val indexSize = statsStore.sizeInBytes
// Get stats of current and target shards
val numOriginalShards = context.clusterService.state().metadata.indices[indexName].numberOfShards
val numOriginalShards = context.clusterService.state().metadata.indices[indexName]?.numberOfShards
?: error("numOriginalShards should not be null")
val numTargetShards = getNumTargetShards(numOriginalShards, indexSize)

if (shouldFailTooManyDocuments(statsDocs, numTargetShards)) return this
Expand Down Expand Up @@ -215,7 +216,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
private fun shouldFailUnsafe(clusterService: ClusterService, indexName: String): Boolean {
// If forceUnsafe is set and is true, then we don't even need to check the number of replicas
if (action.forceUnsafe == true) return false
val numReplicas = clusterService.state().metadata.indices[indexName].numberOfReplicas
val numReplicas = clusterService.state().metadata.indices[indexName]?.numberOfReplicas
val shouldFailForceUnsafeCheck = numReplicas == 0
if (shouldFailForceUnsafeCheck) {
logger.info(UNSAFE_FAILURE_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
val numShardsInSync = getNumShardsInSync(shardStats, context.clusterService.state(), indexName)
val nodeToMoveOnto = localShrinkActionProperties.nodeName
val numShardsOnNode = getNumShardsWithCopyOnNode(shardStats, context.clusterService.state(), nodeToMoveOnto)
val numPrimaryShards = context.clusterService.state().metadata.indices[indexName].numberOfShards
val numPrimaryShards = context.clusterService.state().metadata.indices[indexName]?.numberOfShards
?: error("numberOfShards should not be null")

// If a copy of each shard is on the node, and all shards are in sync, move on
if (numShardsOnNode >= numPrimaryShards && numShardsInSync >= numPrimaryShards) {
Expand All @@ -49,16 +50,16 @@ class WaitForMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,

// Returns the number of shard IDs where all primary and replicas are in sync
private fun getNumShardsInSync(shardStats: Array<ShardStats>, state: ClusterState, indexName: String): Int {
val numReplicas = state.metadata.indices[indexName].numberOfReplicas
val inSyncAllocations = state.metadata.indices[indexName].inSyncAllocationIds
val numReplicas = state.metadata.indices[indexName]?.numberOfReplicas ?: error("numberOfReplicas should not be null")
val inSyncAllocations = state.metadata.indices[indexName]?.inSyncAllocationIds
var numShardsInSync = 0
for (shard: ShardStats in shardStats) {
val routingInfo = shard.shardRouting
// Only check primaries so that we only check once for each shardID
if (routingInfo.primary()) {
// All shards must be in sync as it isn't known which shard (replica or primary) will be
// moved to the target node and used in the shrink.
if (inSyncAllocations[routingInfo.id].size == (numReplicas + 1)) {
if (inSyncAllocations?.get(routingInfo.id)?.size == (numReplicas + 1)) {
numShardsInSync++
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru

private suspend fun shrinkNotDone(targetIndex: String, targetNumShards: Int, client: Client, clusterService: ClusterService): Boolean {
val numPrimaryShardsStarted = getNumPrimaryShardsStarted(client, targetIndex)
val numPrimaryShards = clusterService.state().metadata.indices[targetIndex].numberOfShards
val numPrimaryShards = clusterService.state().metadata.indices[targetIndex]?.numberOfShards
return numPrimaryShards != targetNumShards || numPrimaryShardsStarted != targetNumShards
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class TransportExplainAction @Inject constructor(
clusterStateRequest,
object : ActionListener<ClusterStateResponse> {
override fun onResponse(response: ClusterStateResponse) {
val clusterStateIndexMetadatas = response.state.metadata.indices.associate { it.key to it.value }
val clusterStateIndexMetadatas = response.state.metadata.indices
getMetadataMap(clusterStateIndexMetadatas, threadContext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,9 @@ class RollupMapperService(
return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause)
}

val indexMapping: MappingMetadata = res.mappings[targetIndexResolvedName]
val indexMapping: MappingMetadata? = res.mappings[targetIndexResolvedName]

return if (((indexMapping.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) {
return if (((indexMapping?.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) {
RollupJobValidationResult.Valid
} else {
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [$targetIndexResolvedName]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ object RollupFieldValueExpressionResolver {
open class IndexAliasUtils(val clusterService: ClusterService) {

open fun hasAlias(index: String): Boolean {
val aliases = this.clusterService.state().metadata().indices.get(index)?.aliases
val aliases = this.clusterService.state().metadata().indices[index]?.aliases
if (aliases != null) {
return aliases.size() > 0
return aliases.isNotEmpty()
}
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ data class Transform(
groups.add(Dimension.parse(xcp))
}
}
AGGREGATIONS_FIELD -> aggregations = AggregatorFactories.parseAggregators(xcp)
AGGREGATIONS_FIELD -> {
AggregatorFactories.parseAggregators(xcp)?.let { aggregations = it }
}
CONTINUOUS_FIELD -> continuous = xcp.booleanValue()
USER_FIELD -> {
user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ class IndexUtils {
return DEFAULT_SCHEMA_VERSION
}

fun shouldUpdateIndex(index: IndexMetadata, newVersion: Long): Boolean {
fun shouldUpdateIndex(index: IndexMetadata?, newVersion: Long): Boolean {
var oldVersion = DEFAULT_SCHEMA_VERSION

val indexMapping = index.mapping()?.sourceAsMap()
val indexMapping = index?.mapping()?.sourceAsMap()
if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is HashMap<*, *>) {
val metaData = indexMapping[_META] as HashMap<*, *>
if (metaData.containsKey(SCHEMA_VERSION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() {
// return listOf(TestPlugin::class.java)
// }

protected fun getIndexMetadata(indexName: String): IndexMetadata {
protected fun getIndexMetadata(indexName: String): IndexMetadata? {
return client().admin().cluster().prepareState()
.setIndices(indexName)
.setMetadata(true).get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
addPolicyToIndex(indexName, policyID)

// put some metadata into cluster state
var indexMetadata = getIndexMetadata(indexName)
var indexMetadata = getIndexMetadata(indexName)!!
metadataToClusterState = metadataToClusterState.copy(
index = indexName,
indexUuid = indexMetadata.indexUUID,
Expand All @@ -85,7 +85,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
UpdateManagedIndexMetaDataAction.INSTANCE, request
).get()
logger.info(response.isAcknowledged)
indexMetadata = getIndexMetadata(indexName)
indexMetadata = getIndexMetadata(indexName)!!
logger.info("check if metadata is saved in cluster state: ${indexMetadata.getCustomData("managed_index_metadata")}")

// TODO increase wait time since flaky seeing here. After looking through the log
Expand All @@ -102,7 +102,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
"Happy moving",
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
assertEquals(null, getIndexMetadata(indexName).getCustomData("managed_index_metadata"))
assertEquals(null, getIndexMetadata(indexName)!!.getCustomData("managed_index_metadata"))
}

logger.info("metadata has moved")
Expand Down Expand Up @@ -156,7 +156,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
logger.info("managed-index: ${getExistingManagedIndexConfig(indexName)}")

// manually save metadata into cluster state
var indexMetadata = getIndexMetadata(indexName)
var indexMetadata = getIndexMetadata(indexName)!!
metadataToClusterState = metadataToClusterState.copy(
index = indexName,
indexUuid = indexMetadata.indexUUID,
Expand All @@ -172,7 +172,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
).get()

logger.info(response.isAcknowledged)
indexMetadata = getIndexMetadata(indexName)
indexMetadata = getIndexMetadata(indexName)!!
logger.info("check if metadata is saved in cluster state: ${indexMetadata.getCustomData("managed_index_metadata")}")

waitFor {
Expand All @@ -187,7 +187,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
"Happy moving",
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
assertEquals(null, getIndexMetadata(indexName).getCustomData("managed_index_metadata"))
assertEquals(null, getIndexMetadata(indexName)!!.getCustomData("managed_index_metadata"))
}

logger.info("metadata has moved")
Expand Down Expand Up @@ -225,7 +225,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
addPolicyToIndex(indexName, policyID)

// put some metadata into cluster state
val indexMetadata = getIndexMetadata(indexName)
val indexMetadata = getIndexMetadata(indexName)!!
metadataToClusterState = metadataToClusterState.copy(
index = indexName,
indexUuid = "randomindexuuid",
Expand All @@ -237,7 +237,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
)
)
client().execute(UpdateManagedIndexMetaDataAction.INSTANCE, request).get()
logger.info("check if metadata is saved in cluster state: ${getIndexMetadata(indexName).getCustomData("managed_index_metadata")}")
logger.info("check if metadata is saved in cluster state: ${getIndexMetadata(indexName)!!.getCustomData("managed_index_metadata")}")

waitFor {
assertEquals(
Expand All @@ -248,7 +248,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {

waitFor(Instant.ofEpochSecond(120)) {
assertEquals(null, getExplainManagedIndexMetaData(indexName).info?.get("message"))
assertEquals(null, getIndexMetadata(indexName).getCustomData("managed_index_metadata"))
assertEquals(null, getIndexMetadata(indexName)!!.getCustomData("managed_index_metadata"))
}

logger.info("corrupt metadata has been cleaned")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.opensearch.client.ClusterAdminClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.ImmutableOpenMap
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.test.OpenSearchTestCase
import kotlin.test.assertFailsWith
Expand All @@ -40,7 +39,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
fun setup() {
whenever(clusterService.state()).doReturn(clusterState)
whenever(clusterState.metadata).doReturn(metadata)
whenever(metadata.indices).doReturn(ImmutableOpenMap.of())
whenever(metadata.indices).doReturn(mapOf())
}

fun `test config index not exists`() = runBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.ImmutableOpenMap
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
import org.opensearch.index.shard.DocsStats
Expand All @@ -50,7 +49,7 @@ class AttemptTransitionStepTests : OpenSearchTestCase() {
private val indexUUID: String = "indexUuid"
@Suppress("UNCHECKED_CAST")
private val indexMetadata: IndexMetadata = mock {
on { rolloverInfos } doReturn ImmutableOpenMap.builder<String, RolloverInfo>().build()
on { rolloverInfos } doReturn mapOf<String, RolloverInfo>()
on { indexUUID } doReturn indexUUID
}
private val metadata: Metadata = mock {
Expand Down
Loading

0 comments on commit 4fda78a

Please sign in to comment.