Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/composite workflow execution v1 #1

Open
wants to merge 18 commits into
base: feature/composite-workflow-transport-crud-execution
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
56bba0d
Added integrations tests for checking workflow creation and update sc…
stevanbz Jan 27, 2023
e0af305
Added transport layer for getting and deleting the workflow
stevanbz Jan 31, 2023
feebf0e
Updated getting and deleting the workflow in order to check if the mo…
stevanbz Feb 23, 2023
22eb900
When deleting the monitor, added a check if the monitor is part of th…
stevanbz Feb 27, 2023
d6269ab
Added transport workflow execution layer. Adjusted monitor runners to…
stevanbz Feb 13, 2023
c2588a0
Removed unused classes
stevanbz Feb 13, 2023
ad01b70
Added rest action for executing the workflow
stevanbz Feb 13, 2023
5190726
Added integration tests for workflow execution. Added script modules …
stevanbz Feb 23, 2023
a1e0408
Added workflow execution run result and refactored ExecutionWorkflowR…
stevanbz Feb 27, 2023
a77d9bb
Added integration tests for workflow execution. PR comments addressed
stevanbz Mar 1, 2023
b6f17a8
Code adjusted to comments. Wrapped exceptions when executing workflow
stevanbz Mar 2, 2023
8ded8b8
Added logic for deleting the workflow underlying monitors. Added vali…
stevanbz Mar 8, 2023
a593d38
Added workflow metadata
stevanbz Mar 9, 2023
8e0d28d
Added mappings for the workflow-metadata. Added integration tests for…
stevanbz Mar 9, 2023
af86c69
Renamed properties. Added workflow metadata dryrun integration test t…
stevanbz Mar 13, 2023
4dd13ed
Added workflow integration test for verifying changing the order of t…
stevanbz Mar 13, 2023
a14dfea
Renamed methods for generating the workflows
stevanbz Mar 13, 2023
486c5ab
Added test when updating the non-existing workflow
stevanbz Mar 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ dependencies {
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.mockito:mockito-core:4.7.0"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:lang-painless:${opensearch_version}"
testImplementation "org.opensearch.plugin:lang-mustache-client:${opensearch_version}"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down Expand Up @@ -259,7 +261,7 @@ String bwcRemoteFile = 'https://ci.opensearch.org/ci/dbc/bundle-build/1.1.0/2021
testClusters {
"${baseName}$i" {
testDistribution = "ARCHIVE"
versions = ["1.1.0", "2.4.0-SNAPSHOT"]
versions = ["1.1.0", "2.5.0-SNAPSHOT"]
numberOfNodes = 3
plugin(provider(new Callable<RegularFile>(){
@Override
Expand Down
37 changes: 34 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's discuss offline about cluster/node level settings for composite workflows

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sounds good.

import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
Expand All @@ -27,6 +28,7 @@ import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
import org.opensearch.alerting.resthandler.RestExecuteWorkflowAction
import org.opensearch.alerting.resthandler.RestGetAlertsAction
import org.opensearch.alerting.resthandler.RestGetDestinationsAction
import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
Expand All @@ -44,19 +46,24 @@ import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
import org.opensearch.alerting.transport.TransportExecuteWorkflowAction
import org.opensearch.alerting.transport.TransportGetAlertsAction
import org.opensearch.alerting.transport.TransportGetDestinationsAction
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.alerting.workflow.WorkflowRunnerService
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
Expand All @@ -80,6 +87,7 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
Expand Down Expand Up @@ -117,8 +125,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val OPEN_SEARCH_DASHBOARDS_USER_AGENT = "OpenSearch-Dashboards"
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")
@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_WORKFLOW_BASE_URI = "/_opendistro/_alerting/workflows"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only for legacy APIs. This is a new API, so we should not have this

@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
@JvmField val EMAIL_ACCOUNT_BASE_URI = "$DESTINATION_BASE_URI/email_accounts"
@JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups"
Expand All @@ -129,6 +139,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
}

lateinit var runner: MonitorRunnerService
lateinit var workflowRunner: WorkflowRunnerService
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
Expand All @@ -153,6 +164,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestIndexMonitorAction(),
RestSearchMonitorAction(settings, clusterService),
RestExecuteMonitorAction(),
RestExecuteWorkflowAction(),
RestAcknowledgeAlertAction(),
RestScheduledJobStatsHandler("_alerting"),
RestSearchEmailAccountAction(),
Expand Down Expand Up @@ -180,8 +192,11 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java)

ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
)
}

Expand All @@ -193,7 +208,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
DocumentLevelTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}

Expand Down Expand Up @@ -227,6 +243,21 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerConsumers()
.registerDestinationSettings()
workflowRunner = WorkflowRunnerService
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry))
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
Expand Down Expand Up @@ -59,7 +60,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?
): MonitorRunResult<BucketLevelTriggerRunResult> {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")
Expand Down Expand Up @@ -118,7 +120,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitor,
periodStart,
periodEnd,
monitorResult.inputResults
monitorResult.inputResults,
workflowRunContext
)
if (firstIteration) {
firstPageOfInputResults = inputResults
Expand Down Expand Up @@ -154,7 +157,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID
!dryrun && monitor.id != Monitor.NO_ID,
workflowRunContext
)
} else {
emptyList()
Expand Down Expand Up @@ -335,7 +339,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
shouldCreateFinding: Boolean
shouldCreateFinding: Boolean,
workflowRunContext: WorkflowRunContext? = null
): List<String> {
monitor.inputs.forEach { input ->
if (input is SearchInput) {
Expand All @@ -346,14 +351,14 @@ object BucketLevelMonitorRunner : MonitorRunner() {
for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) {
when (aggFactory) {
is CompositeAggregationBuilder -> {
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
var groupByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
val sources = aggFactory.sources()
for (source in sources) {
if (grouByFields > 0) {
if (groupByFields > 0) {
logger.error("grouByFields > 0. not generating findings for bucket level monitor ${monitor.id}")
return listOf()
}
grouByFields++
groupByFields++
fieldName = source.field()
}
}
Expand Down Expand Up @@ -392,7 +397,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
sr.source().query(queryBuilder)
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding)
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, workflowRunContext?.workflowExecutionId)
} else {
logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}")
}
Expand All @@ -405,7 +410,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
searchResponse: SearchResponse,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
shouldCreateFinding: Boolean
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null
): List<String> {
val docIdsByIndexName: MutableMap<String, MutableList<String>> = mutableMapOf()
for (hit in searchResponse.hits.hits) {
Expand All @@ -424,7 +430,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorName = monitor.name,
index = it.key,
timestamp = Instant.now(),
docLevelQueries = listOf()
docLevelQueries = listOf(),
executionId = workflowExecutionId
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.updateMonitorMetadata
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -63,7 +64,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?
): MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.debug("Document-level-monitor is running ...")
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
Expand Down Expand Up @@ -154,7 +156,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)
// If monitor execution is triggered from a workflow
val indexToRelatedDocIdsMap = workflowRunContext?.matchingDocIdsPerIndex

val matchingDocs = getMatchingDocs(
monitor,
monitorCtx,
docExecutionContext,
indexName,
indexToRelatedDocIdsMap?.get(index)
)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor, indexName)
Expand Down Expand Up @@ -202,7 +213,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun
dryrun,
workflowRunContext?.workflowExecutionId
)
}

Expand All @@ -223,7 +235,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean
dryrun: Boolean,
workflowExecutionId: String? = null
): DocumentLevelTriggerRunResult {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)
Expand All @@ -234,7 +247,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// TODO: Implement throttling for findings
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key, !dryrun && monitor.id != Monitor.NO_ID)
val findingId = createFindings(
monitor,
monitorCtx,
triggeredQueries,
it.key,
!dryrun && monitor.id != Monitor.NO_ID,
workflowExecutionId
)
findings.add(findingId)

if (triggerResult.triggeredDocs.contains(it.key)) {
Expand Down Expand Up @@ -304,7 +324,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
docLevelQueries: List<DocLevelQuery>,
matchingDocId: String,
shouldCreateFinding: Boolean
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null,
): String {
// Before the "|" is the doc id and after the "|" is the index
val docIndex = matchingDocId.split("|")
Expand All @@ -316,7 +337,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = docLevelQueries,
timestamp = Instant.now()
timestamp = Instant.now(),
executionId = workflowExecutionId
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
Expand Down Expand Up @@ -433,7 +455,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String
index: String,
docIds: List<String>? = null
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
Expand All @@ -449,7 +472,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
shard,
prevSeqNo,
maxSeqNo,
null
null,
docIds
)

if (hits.hits.isNotEmpty()) {
Expand All @@ -468,7 +492,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
shard: String,
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?
query: String?,
docIds: List<String>? = null
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand All @@ -480,6 +505,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
}

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}

val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
Expand Down
Loading