Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.15] [Backport 2.x] Add support for remote monitors #695

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionType

class DocLevelMonitorFanOutAction private constructor() : ActionType<DocLevelMonitorFanOutResponse>(NAME, ::DocLevelMonitorFanOutResponse) {
companion object {
val INSTANCE = DocLevelMonitorFanOutAction()
const val NAME = "cluster:admin/opensearch/alerting/monitor/doclevel/fanout"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.model.IndexExecutionContext
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
val monitor: Monitor
val dryRun: Boolean
val monitorMetadata: MonitorMetadata
val executionId: String
val indexExecutionContext: IndexExecutionContext?
val shardIds: List<ShardId>
val concreteIndicesSeenSoFar: List<String>
val workflowRunContext: WorkflowRunContext?

constructor(
monitor: Monitor,
dryRun: Boolean,
monitorMetadata: MonitorMetadata,
executionId: String,
indexExecutionContext: IndexExecutionContext?,
shardIds: List<ShardId>,
concreteIndicesSeenSoFar: List<String>,
workflowRunContext: WorkflowRunContext?
) : super() {
this.monitor = monitor
this.dryRun = dryRun
this.monitorMetadata = monitorMetadata
this.executionId = executionId
this.indexExecutionContext = indexExecutionContext
this.shardIds = shardIds
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
this.workflowRunContext = workflowRunContext
require(false == shardIds.isEmpty()) { }
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
monitor = Monitor.readFrom(sin)!!,
dryRun = sin.readBoolean(),
monitorMetadata = MonitorMetadata.readFrom(sin),
executionId = sin.readString(),
shardIds = sin.readList(::ShardId),
concreteIndicesSeenSoFar = sin.readStringList(),
workflowRunContext = if (sin.readBoolean()) {
WorkflowRunContext(sin)
} else { null },
indexExecutionContext = IndexExecutionContext(sin)
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
monitor.writeTo(out)
out.writeBoolean(dryRun)
monitorMetadata.writeTo(out)
out.writeString(executionId)
out.writeCollection(shardIds)
out.writeStringCollection(concreteIndicesSeenSoFar)
out.writeBoolean(workflowRunContext != null)
workflowRunContext?.writeTo(out)
indexExecutionContext?.writeTo(out)
}

override fun validate(): ActionRequestValidationException? {
var actionValidationException: ActionRequestValidationException? = null
if (shardIds.isEmpty()) {
actionValidationException = ActionRequestValidationException()
actionValidationException.addValidationError("shard_ids is null or empty")
}
return actionValidationException
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("monitor", monitor)
.field("dry_run", dryRun)
.field("execution_id", executionId)
.field("index_execution_context", indexExecutionContext)
.field("shard_ids", shardIds)
.field("concrete_indices", concreteIndicesSeenSoFar)
.field("workflow_run_context", workflowRunContext)
return builder.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
val nodeId: String
val executionId: String
val monitorId: String
val lastRunContexts: MutableMap<String, Any>
val inputResults: InputRunResults
val triggerResults: Map<String, DocumentLevelTriggerRunResult>
val exception: AlertingException?

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
nodeId = sin.readString(),
executionId = sin.readString(),
monitorId = sin.readString(),
lastRunContexts = sin.readMap()!! as MutableMap<String, Any>,
inputResults = InputRunResults.readFrom(sin),
triggerResults = suppressWarning(sin.readMap(StreamInput::readString, DocumentLevelTriggerRunResult::readFrom)),
exception = sin.readException()
)

constructor(
nodeId: String,
executionId: String,
monitorId: String,
lastRunContexts: MutableMap<String, Any>,
inputResults: InputRunResults = InputRunResults(), // partial,
triggerResults: Map<String, DocumentLevelTriggerRunResult> = mapOf(),
exception: AlertingException? = null
) : super() {
this.nodeId = nodeId
this.executionId = executionId
this.monitorId = monitorId
this.lastRunContexts = lastRunContexts
this.inputResults = inputResults
this.triggerResults = triggerResults
this.exception = exception
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(nodeId)
out.writeString(executionId)
out.writeString(monitorId)
out.writeMap(lastRunContexts)
inputResults.writeTo(out)
out.writeMap(
triggerResults,
StreamOutput::writeString,
{ stream, stats -> stats.writeTo(stream) }
)
out.writeException(exception)
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("node_id", nodeId)
.field("execution_id", executionId)
.field("monitor_id", monitorId)
.field("last_run_contexts", lastRunContexts)
.field("input_results", inputResults)
.field("trigger_results", triggerResults)
.field("exception", exception)
.endObject()
return builder
}

companion object {
@Suppress("UNCHECKED_CAST")
fun suppressWarning(map: MutableMap<String?, Any?>?): Map<String, DocumentLevelTriggerRunResult> {
return map as Map<String, DocumentLevelTriggerRunResult>
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.model

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

data class BucketLevelTriggerRunResult(
override var triggerName: String,
override var error: Exception? = null,
var aggregationResultBuckets: Map<String, AggregationResultBucket>,
var actionResultsMap: MutableMap<String, MutableMap<String, ActionRunResult>> = mutableMapOf()
) : TriggerRunResult(triggerName, error) {

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : this(
sin.readString(),
sin.readException() as Exception?, // error
sin.readMap(StreamInput::readString, ::AggregationResultBucket),
sin.readMap() as MutableMap<String, MutableMap<String, ActionRunResult>>
)

override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder
.field(AGG_RESULT_BUCKETS, aggregationResultBuckets)
.field(ACTIONS_RESULTS, actionResultsMap as Map<String, Any>)
}

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeMap(aggregationResultBuckets, StreamOutput::writeString) {
valueOut: StreamOutput, aggResultBucket: AggregationResultBucket ->
aggResultBucket.writeTo(valueOut)
}
out.writeMap(actionResultsMap as Map<String, Any>)
}

companion object {
const val AGG_RESULT_BUCKETS = "agg_result_buckets"
const val ACTIONS_RESULTS = "action_results"

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): TriggerRunResult {
return BucketLevelTriggerRunResult(sin)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.model

import org.opensearch.commons.alerting.alerts.AlertError
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.script.ScriptException
import java.io.IOException
import java.time.Instant

data class ChainedAlertTriggerRunResult(
override var triggerName: String,
var triggered: Boolean,
override var error: Exception?,
var actionResults: MutableMap<String, ActionRunResult> = mutableMapOf(),
val associatedAlertIds: Set<String>
) : TriggerRunResult(triggerName, error) {

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : this(
triggerName = sin.readString(),
error = sin.readException(),
triggered = sin.readBoolean(),
actionResults = sin.readMap() as MutableMap<String, ActionRunResult>,
associatedAlertIds = sin.readStringList().toSet()
)

override fun alertError(): AlertError? {
if (error != null) {
return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}")
}
for (actionResult in actionResults.values) {
if (actionResult.error != null) {
return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}")
}
}
return null
}

override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error)
return builder
.field("triggered", triggered)
.field("action_results", actionResults as Map<String, ActionRunResult>)
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeBoolean(triggered)
out.writeMap(actionResults as Map<String, ActionRunResult>)
out.writeStringCollection(associatedAlertIds)
}

companion object {
@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): TriggerRunResult {
return ChainedAlertTriggerRunResult(sin)
}
}
}
Loading
Loading