Skip to content

Commit

Permalink
Index Management Action Metrics (#1195)
Browse files Browse the repository at this point in the history
* Initial integration of TelemetryAwarePlugin to ISM

Signed-off-by: harycash <[email protected]>

* Initial integration of TelemetryAwarePlugin to ISM

Signed-off-by: harycash <[email protected]>

* Initial integration of TelemetryAwarePlugin to ISM with Rollover Step Integration

Signed-off-by: harycash <[email protected]>

* Initial integration of TelemetryAwarePlugin to ISM with Rollover Step Integration

Signed-off-by: harycash <[email protected]>

* Additional actions metrics with requested changes from previous commit

Signed-off-by: harycash <[email protected]>

* Fixed Build Issues

Signed-off-by: harycash <[email protected]>

* Fixed Build Issues

Signed-off-by: harycash <[email protected]>

* Fixed Build Issues, Added new metric : Cumulative Latency

Signed-off-by: harycash <[email protected]>

* Fixed Build Issues, Added new metric : Cumulative Latency

Signed-off-by: harycash <[email protected]>

* Requested Changes and Addition of Metrics to all the remaining Actions

Signed-off-by: harycash <[email protected]>

* Updates on Action Metrics

Signed-off-by: harycash <[email protected]>

* Updates on Action Metrics

Signed-off-by: harycash <[email protected]>

* Build issues fixed

Signed-off-by: harycash <[email protected]>

* Build issues fixed

Signed-off-by: harycash <[email protected]>

---------

Signed-off-by: harycash <[email protected]>
Co-authored-by: harycash <[email protected]>
  • Loading branch information
harshitakaushik-dev and harycash committed Jul 4, 2024
1 parent 52f331f commit c5425a5
Show file tree
Hide file tree
Showing 28 changed files with 1,339 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
Expand All @@ -27,12 +28,109 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {

abstract suspend fun execute(): Step

fun postExecute(logger: Logger): Step {
fun postExecute(
logger: Logger,
indexManagementActionMetrics: IndexManagementActionsMetrics,
step: Step,
startingManagedIndexMetaData: ManagedIndexMetaData,
): Step {
logger.info("Finished executing $name for ${context?.metadata?.index}")
val updatedStepMetaData = step.getUpdatedManagedIndexMetadata(startingManagedIndexMetaData)
emitTelemetry(indexManagementActionMetrics, updatedStepMetaData, logger)
this.context = null
return this
}

private fun emitTelemetry(
indexManagementActionMetrics: IndexManagementActionsMetrics,
updatedStepMetaData: ManagedIndexMetaData,
logger: Logger,
) {
when (context?.metadata?.actionMetaData?.name) {
IndexManagementActionsMetrics.ROLLOVER -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.ROLLOVER,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.FORCE_MERGE -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.FORCE_MERGE,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.DELETE -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.DELETE,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.REPLICA_COUNT -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.REPLICA_COUNT,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.TRANSITION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.TRANSITION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.NOTIFICATION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.NOTIFICATION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.CLOSE -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.CLOSE,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SET_INDEX_PRIORITY -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SET_INDEX_PRIORITY, // problem in test
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.OPEN -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.OPEN,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.MOVE_SHARD -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.MOVE_SHARD,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SET_READ_ONLY -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SET_READ_ONLY,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SHRINK -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SHRINK,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SNAPSHOT -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SNAPSHOT,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.ALIAS_ACTION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.ALIAS_ACTION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.ALLOCATION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.ALLOCATION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

else -> {
logger.info(
"Action Metrics is not supported for this action [%s]",
context?.metadata?.actionMetaData?.name,
)
}
}
}

abstract fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData

abstract fun isIdempotent(): Boolean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AliasActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AllocationActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CloseActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ForceMergeActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.MoveShardActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.NotificationActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.OpenActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetIndexPriorityActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetReadOnlyActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ShrinkActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SnapshotActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.MetricsRegistry
import org.opensearch.telemetry.metrics.tags.Tags

abstract class ActionMetrics {
abstract val actionName: String

fun createTags(context: StepContext): Tags {
val tags = Tags.create()
.addTag("index_name", context.metadata.index)
.addTag("policy_id", context.metadata.policyID)
.addTag("node_id", context.clusterService.nodeName ?: "")
.addTag("index_uuid", context.metadata.indexUuid)
return tags
}

abstract fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
)
}

class IndexManagementActionsMetrics private constructor() {
private lateinit var metricsRegistry: MetricsRegistry
private lateinit var actionMetricsMap: Map<String, ActionMetrics>

companion object {
val instance: IndexManagementActionsMetrics by lazy { HOLDER.instance }

const val ROLLOVER = "rollover"
const val NOTIFICATION = "notification"
const val FORCE_MERGE = "force_merge"
const val DELETE = "delete"
const val REPLICA_COUNT = "replica_count"
const val TRANSITION = "transition"
const val CLOSE = "close"
const val SET_INDEX_PRIORITY = "set_index_priority"
const val OPEN = "open"
const val MOVE_SHARD = "move_shard"
const val SET_READ_ONLY = "set_read_only"
const val SHRINK = "shrink"
const val SNAPSHOT = "snapshot"
const val ALIAS_ACTION = "alias_action"
const val ALLOCATION = "allocation"

private object HOLDER {
val instance = IndexManagementActionsMetrics()
}
}

fun initialize(metricsRegistry: MetricsRegistry) {
this.metricsRegistry = metricsRegistry

RolloverActionMetrics.instance.initializeCounters(metricsRegistry)
NotificationActionMetrics.instance.initializeCounters(metricsRegistry)
ForceMergeActionMetrics.instance.initializeCounters(metricsRegistry)
DeleteActionMetrics.instance.initializeCounters(metricsRegistry)
ReplicaCountActionMetrics.instance.initializeCounters(metricsRegistry)
TransitionActionMetrics.instance.initializeCounters(metricsRegistry)
CloseActionMetrics.instance.initializeCounters(metricsRegistry)
SetIndexPriorityActionMetrics.instance.initializeCounters(metricsRegistry)
OpenActionMetrics.instance.initializeCounters(metricsRegistry)
MoveShardActionMetrics.instance.initializeCounters(metricsRegistry)
SetReadOnlyActionMetrics.instance.initializeCounters(metricsRegistry)
ShrinkActionMetrics.instance.initializeCounters(metricsRegistry)
SnapshotActionMetrics.instance.initializeCounters(metricsRegistry)
AliasActionMetrics.instance.initializeCounters(metricsRegistry)
AllocationActionMetrics.instance.initializeCounters(metricsRegistry)

actionMetricsMap = mapOf(
ROLLOVER to RolloverActionMetrics.instance,
NOTIFICATION to NotificationActionMetrics.instance,
FORCE_MERGE to ForceMergeActionMetrics.instance,
DELETE to DeleteActionMetrics.instance,
REPLICA_COUNT to ReplicaCountActionMetrics.instance,
TRANSITION to TransitionActionMetrics.instance,
CLOSE to CloseActionMetrics.instance,
SET_INDEX_PRIORITY to SetIndexPriorityActionMetrics.instance,
OPEN to OpenActionMetrics.instance,
MOVE_SHARD to MoveShardActionMetrics.instance,
SET_READ_ONLY to SetReadOnlyActionMetrics.instance,
SHRINK to ShrinkActionMetrics.instance,
SNAPSHOT to SnapshotActionMetrics.instance,
ALIAS_ACTION to AliasActionMetrics.instance,
ALLOCATION to AllocationActionMetrics.instance,
)
}

fun getActionMetrics(actionName: String): ActionMetrics? {
return actionMetricsMap[actionName]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class AliasActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.ALIAS_ACTION
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Alias Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Alias Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Alias Actions", "milliseconds")
}

companion object {
val instance: AliasActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = AliasActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val aliasActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALIAS_ACTION) as AliasActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
aliasActionMetrics.successes.add(1.0, context.let { aliasActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
aliasActionMetrics.failures.add(1.0, context.let { aliasActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
aliasActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { aliasActionMetrics.createTags(it) })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class AllocationActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.ALLOCATION
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Allocation Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Allocation Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Allocation Actions", "milliseconds")
}

companion object {
val instance: AllocationActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = AllocationActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val allocationActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALLOCATION) as AllocationActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
allocationActionMetrics.successes.add(1.0, context.let { allocationActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
allocationActionMetrics.failures.add(1.0, context.let { allocationActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
allocationActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { allocationActionMetrics.createTags(it) })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class CloseActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.CLOSE
lateinit var successes: Counter
lateinit var failures: Counter
lateinit var cumulativeLatency: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Close Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Close Action Failures", "count")
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Close Actions", "milliseconds")
}

companion object {
val instance: CloseActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = CloseActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val closeActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.CLOSE) as CloseActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
closeActionMetrics.successes.add(1.0, context.let { closeActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
closeActionMetrics.failures.add(1.0, context.let { closeActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
closeActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { closeActionMetrics.createTags(it) })
}
}
Loading

0 comments on commit c5425a5

Please sign in to comment.