diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt index dc2b0936..6fe9c47b 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt @@ -105,19 +105,41 @@ class IndexWorkflowRequest : ActionRequest { val monitorIdOrderMap: Map = delegates.associate { it.monitorId to it.order } delegates.forEach { if (it.chainedMonitorFindings != null) { - if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) { - validationException = ValidateActions.addValidationError( - "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence", - validationException - ) - // Break the flow because next check will generate the NPE - return validationException - } - if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) { - validationException = ValidateActions.addValidationError( - "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}", - validationException - ) + + if (it.chainedMonitorFindings.monitorId != null) { + if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence", + validationException + ) + // Break the flow because next check will generate the NPE + return validationException + } + if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}", + validationException + ) + } + } else { + for (monitorId in it.chainedMonitorFindings.monitorIds) { + if (!monitorIdOrderMap.containsKey(monitorId)) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor $monitorId doesn't exist in sequence", + validationException + ) + return validationException + } else { + val order = monitorIdOrderMap.get(monitorId)!! + if (order >= it.order) { + return ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}. " + + "Order of monitor being chained [$order] should be smaller than order of monitor using findings as source data [${it.order}] in sequence", + validationException + ) + } + } + } } } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt index dbc15e34..92192eec 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt @@ -9,50 +9,65 @@ import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import java.io.IOException +import java.util.Collections /** - * Context passed in delegate monitor to filter data queried by a monitor based on the findings of the given monitor id. + * Context passed in delegate monitor to filter data matched by a list of monitors based on the findings of the given monitor ids. */ // TODO - Remove the class and move the monitorId to Delegate (as a chainedMonitorId property) if this class won't be updated by adding new properties data class ChainedMonitorFindings( - val monitorId: String + val monitorId: String? = null, + val monitorIds: List = emptyList(), // if monitorId field is non-null it would be given precendence for BWC ) : BaseModel { init { - validateId(monitorId) + require(!(monitorId.isNullOrBlank() && monitorIds.isEmpty())) { + "at least one of fields, 'monitorIds' and 'monitorId' should be provided" + } + if (monitorId != null && monitorId.isBlank()) { + validateId(monitorId) + } else { + monitorIds.forEach { validateId(it) } + } } @Throws(IOException::class) constructor(sin: StreamInput) : this( - sin.readString(), // monitorId + sin.readOptionalString(), // monitorId + Collections.unmodifiableList(sin.readStringList()) ) + @Suppress("UNCHECKED_CAST") fun asTemplateArg(): Map { return mapOf( MONITOR_ID_FIELD to monitorId, - ) + MONITOR_IDS_FIELD to monitorIds + ) as Map } @Throws(IOException::class) override fun writeTo(out: StreamOutput) { - out.writeString(monitorId) + out.writeOptionalString(monitorId) + out.writeStringCollection(monitorIds) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.startObject() .field(MONITOR_ID_FIELD, monitorId) + .field(MONITOR_IDS_FIELD, monitorIds) .endObject() return builder } companion object { const val MONITOR_ID_FIELD = "monitor_id" + const val MONITOR_IDS_FIELD = "monitor_ids" @JvmStatic @Throws(IOException::class) fun parse(xcp: XContentParser): ChainedMonitorFindings { - lateinit var monitorId: String - + var monitorId: String? = null + val monitorIds = mutableListOf() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName = xcp.currentName() @@ -60,12 +75,23 @@ data class ChainedMonitorFindings( when (fieldName) { MONITOR_ID_FIELD -> { - monitorId = xcp.text() - validateId(monitorId) + if (!xcp.currentToken().equals(XContentParser.Token.VALUE_NULL)) + monitorId = xcp.text() + } + + MONITOR_IDS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + monitorIds.add(xcp.text()) + } } } } - return ChainedMonitorFindings(monitorId) + return ChainedMonitorFindings(monitorId, monitorIds) } @JvmStatic diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index c4a68d44..a4f880a9 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -405,6 +405,16 @@ fun randomClusterMetricsInput( return ClusterMetricsInput(path, pathParams, url) } +fun ChainedMonitorFindings.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() +} + +fun Workflow.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string() +} + fun Monitor.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt index 8ff08738..58feffb2 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt @@ -21,6 +21,9 @@ import org.opensearch.search.SearchModule import java.lang.Exception import java.lang.IllegalArgumentException import java.util.UUID +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue class IndexWorkflowRequestTests { @@ -196,6 +199,21 @@ class IndexWorkflowRequestTests { delegates = listOf( Delegate(1, "monitor-1") ) + + // Chained finding list of monitors valid + delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2"), + Delegate(3, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))), + + ) + val req7 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + assertNull(req7.validate()) try { IndexWorkflowRequest( "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, @@ -207,5 +225,21 @@ class IndexWorkflowRequestTests { Assert.assertTrue(ex is IllegalArgumentException) Assert.assertTrue(ex.message!!.contains("Workflows can only have 1 search input.")) } + + // Chained finding list of monitors invalid order and old field null + delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2"), + Delegate(2, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))), + + ) + val req8 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + assertNotNull(req8.validate()) + assertTrue(req8.validate()!!.message!!.contains("should be executed before monitor")) } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt index 9680bdbe..ad0d2b24 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt @@ -70,10 +70,25 @@ class CompositeInputTests { } @Test - fun `test create Chained Findings with illegal monitorId value`() { + fun `test create Chained Findings with illegal monitorId value and empty monitorIds list`() { try { ChainedMonitorFindings("") Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + e.message?.let { + Assertions.assertTrue( + it.contains("at least one of fields, 'monitorIds' and 'monitorId' should be provided") + + ) + } + } + } + + @Test + fun `test create Chained Findings with null monitorId value and monitorIds list with blank monitorIds`() { + try { + ChainedMonitorFindings("", listOf("", "")) + Assertions.fail("Expecting an illegal argument exception") } catch (e: IllegalArgumentException) { e.message?.let { Assertions.assertTrue( diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index 67e16908..a284c187 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -264,6 +264,30 @@ class XContentTests { Assertions.assertNull(parsedMonitor.user) } + @Test + fun `test workflow parsing`() { + val workflow = randomWorkflow(monitorIds = listOf("1", "2", "3")) + val monitorString = workflow.toJsonString() + val parsedWorkflow = Workflow.parse(parser(monitorString)) + Assertions.assertEquals(workflow, parsedWorkflow, "Round tripping workflow failed") + } + + @Test + fun `test chainedMonitorFindings parsing`() { + val cmf1 = ChainedMonitorFindings(monitorId = "m1") + val cmf1String = cmf1.toJsonString() + Assertions.assertEquals( + ChainedMonitorFindings.parse(parser(cmf1String)), cmf1, + "Round tripping chained monitor findings failed" + ) + val cmf2 = ChainedMonitorFindings(monitorIds = listOf("m1", "m2")) + val cmf2String = cmf2.toJsonString() + Assertions.assertEquals( + ChainedMonitorFindings.parse(parser(cmf2String)), cmf2, + "Round tripping chained monitor findings failed" + ) + } + @Test fun `test old monitor format parsing`() { val monitorString = """