Skip to content

Commit

Permalink
[Backport 2.x] Implemented cross-cluster monitor support opensearch-p…
Browse files Browse the repository at this point in the history
…roject#584 (opensearch-project#586)

* Added clusters field to support cross cluster cluster metrics monitors.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed writeTo.

Signed-off-by: AWSHurneyt <[email protected]>

* Updated tests.

Signed-off-by: AWSHurneyt <[email protected]>

* Updated tests.

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Apr 12, 2024
1 parent 56c94d0 commit 0fd0e2e
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 57 deletions.
46 changes: 37 additions & 9 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ data class Alert(
val aggregationResultBucket: AggregationResultBucket? = null,
val executionId: String? = null,
val associatedAlertIds: List<String>,
val clusters: List<String>? = null,
) : Writeable, ToXContent {

init {
Expand All @@ -61,6 +62,7 @@ data class Alert(
chainedAlertTrigger: ChainedAlertTrigger,
workflow: Workflow,
associatedAlertIds: List<String>,
clusters: List<String>? = null
) : this(
monitorId = NO_ID,
monitorName = "",
Expand All @@ -82,7 +84,8 @@ data class Alert(
executionId = executionId,
workflowId = workflow.id,
workflowName = workflow.name,
associatedAlertIds = associatedAlertIds
associatedAlertIds = associatedAlertIds,
clusters = clusters
)

constructor(
Expand All @@ -97,6 +100,7 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -118,7 +122,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -134,6 +139,7 @@ data class Alert(
findingIds: List<String> = emptyList(),
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -155,7 +161,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -172,6 +179,7 @@ data class Alert(
findingIds: List<String> = emptyList(),
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -193,7 +201,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -211,6 +220,7 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
id = id,
monitorId = monitor.id,
Expand All @@ -233,7 +243,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -248,6 +259,7 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
workflowId: String? = null,
executionId: String?,
clusters: List<String>? = null
) : this(
id = id,
monitorId = monitor.id,
Expand All @@ -270,7 +282,8 @@ data class Alert(
relatedDocIds = listOf(),
workflowId = workflowId ?: "",
executionId = executionId,
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

enum class State {
Expand Down Expand Up @@ -311,7 +324,8 @@ data class Alert(
actionExecutionResults = sin.readList(::ActionExecutionResult),
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null,
executionId = sin.readOptionalString(),
associatedAlertIds = sin.readStringList()
associatedAlertIds = sin.readStringList(),
clusters = sin.readOptionalStringList()
)

fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
Expand Down Expand Up @@ -349,6 +363,7 @@ data class Alert(
}
out.writeOptionalString(executionId)
out.writeStringCollection(associatedAlertIds)
out.writeOptionalStringArray(clusters?.toTypedArray())
}

companion object {
Expand Down Expand Up @@ -379,6 +394,7 @@ data class Alert(
const val ASSOCIATED_ALERT_IDS_FIELD = "associated_alert_ids"
const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
const val CLUSTERS_FIELD = "clusters"
const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND

Expand Down Expand Up @@ -409,6 +425,7 @@ data class Alert(
val actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()
var aggAlertBucket: AggregationResultBucket? = null
val associatedAlertIds = mutableListOf<String>()
val clusters = mutableListOf<String>()
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down Expand Up @@ -475,6 +492,12 @@ data class Alert(
AggregationResultBucket.parse(xcp)
}
}
CLUSTERS_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
clusters.add(xcp.text())
}
}
}
}

Expand Down Expand Up @@ -503,7 +526,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId,
workflowName = workflowName,
associatedAlertIds = associatedAlertIds
associatedAlertIds = associatedAlertIds,
clusters = if (clusters.size > 0) clusters else null
)
}

Expand Down Expand Up @@ -553,6 +577,9 @@ data class Alert(
.optionalTimeField(END_TIME_FIELD, endTime)
.optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime)
aggregationResultBucket?.innerXContent(builder)

if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray())

builder.endObject()
return builder
}
Expand All @@ -576,7 +603,8 @@ data class Alert(
BUCKET_KEYS to aggregationResultBucket?.bucketKeys?.joinToString(","),
PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath,
FINDING_IDS to findingIds.joinToString(","),
RELATED_DOC_IDS to relatedDocIds.joinToString(",")
RELATED_DOC_IDS to relatedDocIds.joinToString(","),
CLUSTERS_FIELD to clusters?.joinToString(",")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.net.URI
import java.net.URISyntaxException

val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', '#', '>', '<', ' ')

Expand All @@ -22,7 +23,8 @@ val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', '
data class ClusterMetricsInput(
var path: String,
var pathParams: String = "",
var url: String
var url: String,
var clusters: List<String> = listOf()
) : Input {
val clusterMetricType: ClusterMetricType
val constructedUri: URI
Expand All @@ -43,11 +45,10 @@ data class ClusterMetricsInput(
"Invalid URI constructed from the path and path_params inputs, or the url input."
}

if (url.isNotEmpty() && validateFieldsNotEmpty()) {
if (url.isNotEmpty() && validateFieldsNotEmpty())
require(constructedUri == constructUrlFromInputs()) {
"The provided URL and URI fields form different URLs."
}
}

require(constructedUri.host.lowercase() == SUPPORTED_HOST) {
"Only host '$SUPPORTED_HOST' is supported."
Expand All @@ -74,6 +75,7 @@ data class ClusterMetricsInput(
.field(PATH_FIELD, path)
.field(PATH_PARAMS_FIELD, pathParams)
.field(URL_FIELD, url)
.field(CLUSTERS_FIELD, clusters)
.endObject()
.endObject()
}
Expand All @@ -87,6 +89,7 @@ data class ClusterMetricsInput(
out.writeString(path)
out.writeString(pathParams)
out.writeString(url)
out.writeStringArray(clusters.toTypedArray())
}

companion object {
Expand All @@ -99,18 +102,19 @@ data class ClusterMetricsInput(
const val PATH_PARAMS_FIELD = "path_params"
const val URL_FIELD = "url"
const val URI_FIELD = "uri"
const val CLUSTERS_FIELD = "clusters"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField(URI_FIELD), CheckedFunction { parseInner(it) })

/**
* This parse function uses [XContentParser] to parse JSON input and store corresponding fields to create a [ClusterMetricsInput] object
*/
@JvmStatic
@Throws(IOException::class)
@JvmStatic @Throws(IOException::class)
fun parseInner(xcp: XContentParser): ClusterMetricsInput {
var path = ""
var pathParams = ""
var url = ""
val clusters = mutableListOf<String>()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)

Expand All @@ -121,9 +125,17 @@ data class ClusterMetricsInput(
PATH_FIELD -> path = xcp.text()
PATH_PARAMS_FIELD -> pathParams = xcp.text()
URL_FIELD -> url = xcp.text()
CLUSTERS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) clusters.add(xcp.text())
}
}
}
return ClusterMetricsInput(path, pathParams, url)
return ClusterMetricsInput(path, pathParams, url, clusters)
}
}

Expand Down Expand Up @@ -163,20 +175,17 @@ data class ClusterMetricsInput(
if (pathParams.isNotEmpty()) {
pathParams = pathParams.trim('/')
ILLEGAL_PATH_PARAMETER_CHARACTERS.forEach { character ->
if (pathParams.contains(character)) {
if (pathParams.contains(character))
throw IllegalArgumentException(
"The provided path parameters contain invalid characters or spaces. Please omit: " + "${ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")}"
"The provided path parameters contain invalid characters or spaces. Please omit: " + ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")
)
}
}
}

if (apiType.requiresPathParams && pathParams.isEmpty()) {
if (apiType.requiresPathParams && pathParams.isEmpty())
throw IllegalArgumentException("The API requires path parameters.")
}
if (!apiType.supportsPathParams && pathParams.isNotEmpty()) {
if (!apiType.supportsPathParams && pathParams.isNotEmpty())
throw IllegalArgumentException("The API does not use path parameters.")
}

return pathParams
}
Expand All @@ -192,13 +201,11 @@ data class ClusterMetricsInput(
ClusterMetricType.values()
.filter { option -> option != ClusterMetricType.BLANK }
.forEach { option ->
if (uriPath.startsWith(option.prependPath) || uriPath.startsWith(option.defaultPath)) {
if (uriPath.startsWith(option.prependPath) || uriPath.startsWith(option.defaultPath))
apiType = option
}
}
if (apiType.isBlank()) {
if (apiType.isBlank())
throw IllegalArgumentException("The API could not be determined from the provided URI.")
}
return apiType
}

Expand All @@ -207,28 +214,36 @@ data class ClusterMetricsInput(
* @return The constructed [URI].
*/
private fun constructUrlFromInputs(): URI {
val uriBuilder = URIBuilder()
.setScheme(SUPPORTED_SCHEME)
.setHost(SUPPORTED_HOST)
.setPort(SUPPORTED_PORT)
.setPath(path + pathParams)
return uriBuilder.build()
/**
* this try-catch block is required due to a httpcomponents 5.1.x library issue
* it auto encodes path params in the url.
*/
return try {
val formattedPath = if (path.startsWith("/") || path.isBlank()) path else "/$path"
val formattedPathParams = if (pathParams.startsWith("/") || pathParams.isBlank()) pathParams else "/$pathParams"
val uriBuilder = URIBuilder("$SUPPORTED_SCHEME://$SUPPORTED_HOST:$SUPPORTED_PORT$formattedPath$formattedPathParams")
uriBuilder.build()
} catch (ex: URISyntaxException) {
val uriBuilder = URIBuilder()
.setScheme(SUPPORTED_SCHEME)
.setHost(SUPPORTED_HOST)
.setPort(SUPPORTED_PORT)
.setPath(path + pathParams)
uriBuilder.build()
}
}

/**
* If [url] field is empty, populates it with [constructedUri].
* If [path] and [pathParams] are empty, populates them with values from [url].
*/
private fun parseEmptyFields() {
if (pathParams.isEmpty()) {
if (pathParams.isEmpty())
pathParams = this.parsePathParams()
}
if (path.isEmpty()) {
if (path.isEmpty())
path = if (pathParams.isEmpty()) clusterMetricType.defaultPath else clusterMetricType.prependPath
}
if (url.isEmpty()) {
if (url.isEmpty())
url = constructedUri.toString()
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class AlertTests {
assertEquals(templateArgs[Alert.START_TIME_FIELD], alert.startTime.toEpochMilli(), "Template args start time does not")
assertEquals(templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null, "Template args last notification time does not match")
assertEquals(templateArgs[Alert.SEVERITY_FIELD], alert.severity, "Template args severity does not match")
assertEquals(templateArgs[Alert.CLUSTERS_FIELD], alert.clusters?.joinToString(","), "Template args clusters does not match")
}

@Test
Expand All @@ -40,6 +41,7 @@ class AlertTests {
assertEquals(templateArgs[Alert.START_TIME_FIELD], alert.startTime.toEpochMilli(), "Template args start time does not")
assertEquals(templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null, "Template args last notification time does not match")
assertEquals(templateArgs[Alert.SEVERITY_FIELD], alert.severity, "Template args severity does not match")
assertEquals(templateArgs[Alert.CLUSTERS_FIELD], alert.clusters?.joinToString(","), "Template args clusters does not match")
assertEquals(
templateArgs[Alert.BUCKET_KEYS],
alert.aggregationResultBucket?.bucketKeys?.joinToString(","),
Expand Down
Loading

0 comments on commit 0fd0e2e

Please sign in to comment.