-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/composite workflow execution v1 #1
base: feature/composite-workflow-transport-crud-execution
Are you sure you want to change the base?
Feature/composite workflow execution v1 #1
Conversation
…enario Signed-off-by: Stevan Buzejic <[email protected]>
Signed-off-by: Stevan Buzejic <[email protected]>
can you look into adding this painless script module at plugin load test Let's look into how we can verify composite monitors containing bucket level monitors |
…nitor index is not initialized yet. Added workflow crud test cases Signed-off-by: Stevan Buzejic <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, Stevan
have reviewed 50% of the PR
will review more while you can address the comments
@@ -8,6 +8,7 @@ package org.opensearch.alerting | |||
import org.opensearch.action.ActionRequest | |||
import org.opensearch.action.ActionResponse | |||
import org.opensearch.alerting.action.ExecuteMonitorAction | |||
import org.opensearch.alerting.action.ExecuteWorkflowAction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's discuss offline about cluster/node level settings for composite workflows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok sounds good.
@@ -59,7 +62,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { | |||
monitorCtx: MonitorRunnerExecutionContext, | |||
periodStart: Instant, | |||
periodEnd: Instant, | |||
dryrun: Boolean | |||
dryrun: Boolean, | |||
workflowExecutionContext: WorkflowRunContext? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: name the variable same as the type
@@ -389,10 +396,22 @@ object BucketLevelMonitorRunner : MonitorRunner() { | |||
val queryBuilder = if (input.query.query() == null) BoolQueryBuilder() | |||
else QueryBuilders.boolQuery().must(source.query()) | |||
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues)) | |||
|
|||
if (workflowRunContext != null && !workflowRunContext.indexToDocIds.isNullOrEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we applying this logic here and not in InputService where the actual search query is being executed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you are right. This logic can be removed from here - I forgot to remove it once I added in input service. Tnx and good catch!
@@ -125,6 +127,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { | |||
} | |||
} | |||
|
|||
// If monitor execution is triggered from a workflow | |||
val indexToRelatedDocIdsMap = workflowRunContext?.indexToDocIds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we intialize this just before its usage instead of here?
@@ -105,6 +122,28 @@ class InputService( | |||
} | |||
} | |||
|
|||
private fun updateInputQueryWithFindingDocIds( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comments/javadocs to explain what we intend to do wherever we are using chained findings filtering
|
||
// Rewrite query to consider the doc ids per given index | ||
if (chainedFindingExist(indexToDocIds)) { | ||
val updatedSourceQuery = updateInputQueryWithFindingDocIds(input.query.query(), indexToDocIds!!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null check for query required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we changing at input query
add a filter after search query is constructed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null check for query required?
You are right. Adding the null check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we changing at input query add a filter after search query is constructed
Since rewrittenQuery.query() returns QueryBuilder()! (which can be null) we must do a cast to a BoolQueryBuilder (I guess) which then later we would need to set again to a rewrittenQuery.query.
You can see here that later on query is transformed in a String so it wouldn't be so straight forward to add a filter.
I don't have any more idea how to do this in elegant way (maybe lacking domain knowledge around the OpenSearch classes I can use for this purpose)- if you can give me a hint or a code snippet how I can do, it would be good. Tnx!
@@ -105,6 +122,28 @@ class InputService( | |||
} | |||
} | |||
|
|||
private fun updateInputQueryWithFindingDocIds( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a common methods used in all the monitor types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the same - but then I saw that the search query is executed on a different way depending of the monitor type.
Ie. here you can see how the doc level monitor is getting the matching docs. So, for example, doc level monitor iterates through the list of indices and getting the documents index by index. That's why I adjusted getting the matched docIds on doc level monitor to be aligned with existing logic. Check it out here
val xContentRegistry: NamedXContentRegistry, | ||
) { | ||
|
||
suspend fun getFindingDocIdsPerMonitorExecution(chainedMonitor: Monitor, workflowExecutionId: String): Map<String, List<String>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getFindingDocIdsByExecutionId*
.seqNoAndPrimaryTerm(true) | ||
) | ||
.indices(chainedMonitor.dataSources.findingsIndex) | ||
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handle indexNotFound
return buildMonitors(searchResponse) | ||
} | ||
|
||
private fun buildMonitors(response: SearchResponse): List<Monitor> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this function be called parseMonitors
return monitors | ||
} | ||
|
||
suspend fun getDocIdsPerFindingIndex(monitorId: String, workflowExecutionId: String): Map<String, List<String>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function will be removed since it's not used at all.
…e workflow Signed-off-by: Stevan Buzejic <[email protected]>
… consider the workflow execution id Added worfklow service used for retrieving monitors and their findings. Added business logic for considering the chained monitors Signed-off-by: Stevan Buzejic <[email protected]>
Signed-off-by: Stevan Buzejic <[email protected]>
Signed-off-by: Stevan Buzejic <[email protected]>
…when loading the cluster Signed-off-by: Stevan Buzejic <[email protected]>
|
||
class ExecuteWorkflowRequest : ActionRequest { | ||
val dryrun: Boolean | ||
val requestEnd: TimeValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is requestEnd?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy-paste of ExecuteMonitorRequest. Used in CompositeWorkflowRunner - and passed to concrete monitor runner - ie in bucketLevelMonitors used for defining the search params when creating findings. Check it out here
import org.opensearch.commons.alerting.model.Workflow | ||
import java.io.IOException | ||
|
||
class ExecuteWorkflowRequest : ActionRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadocs for field
) | ||
|
||
override fun validate(): ActionRequestValidationException? { | ||
return null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added check. Tnx and good point
|
||
class ExecuteWorkflowResponse : ActionResponse, ToXContentObject { | ||
|
||
val workflowRunResult: List<MonitorRunResult<*>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should store other fields like workflow execution start, and end time, status=failed, successful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Will add those fields and appropriate logic around them
return listOf() | ||
} | ||
|
||
override fun replacedRoutes(): MutableList<RestHandler.ReplacedRoute> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need replacedRoutes. we are not replacing routes. this would be a new API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing class completely. Sorry
TODO("Not yet implemented") | ||
} | ||
): List<MonitorRunResult<*>> { | ||
val workflowExecutionId = UUID.randomUUID().toString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make this execution id more deterministic..
workflowId+timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like:
val workflowExecutionId = UUID.randomUUID().toString() + LocalDateTime.now()
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to something like:
val executionId = workflow.id.plus(LocalDateTime.now()).plus(UUID.randomUUID().toString())
): MonitorRunResult<*> { | ||
TODO("Not yet implemented") | ||
} | ||
): List<MonitorRunResult<*>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should return workflowRunResult which should contain list of monitorRunResult
return indexToRelatedDocIdsMap | ||
} | ||
|
||
suspend fun searchMonitors(monitors: List<String>, size: Int, owner: String?): List<Monitor> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to getMonitorsById
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is owner field used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No will remove it. Good catch
|
||
val delegates = (workflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order } | ||
// Fetch monitors by ids | ||
val monitors = monitorCtx.workflowService!!.searchMonitors(delegates.map { it.monitorId }, delegates.size, workflow.owner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need owner field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't. Removing
// Validate the monitors size | ||
if (delegates.size != monitors.size) { | ||
val diffMonitorIds = delegates.map { it.monitorId }.minus(monitors.map { it.id }.toSet()).joinToString() | ||
throw IllegalStateException("Delegate monitors don't exist $diffMonitorIds") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plz also log workflow id in the message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do. Also will add a logs on the beginning and end of workflow execution
…esponse class Code adjusted according to comments Signed-off-by: Stevan Buzejic <[email protected]>
d94c257
to
a1e0408
Compare
Signed-off-by: Stevan Buzejic <[email protected]>
var indexToDocIds = mapOf<String, List<String>>() | ||
var delegateMonitor: Monitor | ||
delegateMonitor = monitorsById[delegate.monitorId] | ||
?: throw IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap with alerting exception
* @param chainedMonitor Monitor that is previously executed | ||
* @param workflowExecutionId Execution id of the current workflow | ||
*/ | ||
suspend fun getFindingDocIdsByExecutionId(chainedMonitor: Monitor, workflowExecutionId: String): Map<String, List<String>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handle indexNotFound and return empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking and let me elaborate a little bit my thinking and proposed solution:
Let's catch all the exceptions that can be raised, and wrap them up in AlertingException (check it out here). The caller function - the function in CompositeWorkflowRunner (here) will do a check and return empty workflow run result. What do you think?
?: throw IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id") | ||
if (delegate.chainedFindings != null) { | ||
val chainedMonitor = monitorsById[delegate.chainedFindings!!.monitorId] | ||
?: throw IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap with alerting exception
dryRun, | ||
workflowRunContext | ||
) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: use else if for query level and throw unsupported exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I also wrap into alerting exception or? Ie. something like this:
Something like this:
else if(delegateMonitor.isQueryLevelMonitor()){ QueryLevelMonitorRunner.runMonitor( delegateMonitor, monitorCtx, periodStart, periodEnd, dryRun, workflowRunContext ) } else { throw AlertingException.wrap( IllegalStateException("Unsupported monitor type") ) }
data class WorkflowRunContext( | ||
val chainedMonitorId: String?, | ||
val workflowExecutionId: String, | ||
val indexToDocIds: Map<String, List<String>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indexToDocIds
is not a good variable name. Someone reading the code would not understand that this is the input source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about "matchingDocIdsPerIndex"?
Signed-off-by: Stevan Buzejic <[email protected]>
Let's have latestRunTime and latestExecutionId in workflow object or workflow metadata object. |
…dation if the query monitor is part of the workflow chain Signed-off-by: Stevan Buzejic <[email protected]>
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" | ||
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors" | ||
@JvmField val LEGACY_OPENDISTRO_WORKFLOW_BASE_URI = "/_opendistro/_alerting/workflows" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only for legacy APIs. This is a new API, so we should not have this
Signed-off-by: Stevan Buzejic <[email protected]>
… checking workflow metadata. Changed flow of workflow execution Signed-off-by: Stevan Buzejic <[email protected]>
20de168
to
8e0d28d
Compare
|
…hat verify that workflow metadata is not created Signed-off-by: Stevan Buzejic <[email protected]>
…he monitors once the workflow is updated Signed-off-by: Stevan Buzejic <[email protected]>
Signed-off-by: Stevan Buzejic <[email protected]>
Signed-off-by: Stevan Buzejic <[email protected]>
Issue #, if available:
Description of changes:
CheckList:
[ ] Commits are signed per the DCO using --signoff
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.