Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/composite workflow rest apis #2

Open
wants to merge 5 commits into
base: feature/composite-workflow-execution-v1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
import org.opensearch.alerting.resthandler.RestExecuteWorkflowAction
import org.opensearch.alerting.resthandler.RestGetAlertsAction
Expand All @@ -35,7 +36,9 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
Expand Down Expand Up @@ -128,6 +131,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_WORKFLOW_BASE_URI = "/_opendistro/_alerting/workflows"
@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
@JvmField val EMAIL_ACCOUNT_BASE_URI = "$DESTINATION_BASE_URI/email_accounts"
@JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups"
Expand Down Expand Up @@ -161,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetMonitorAction(),
RestDeleteMonitorAction(),
RestIndexMonitorAction(),
RestIndexWorkflowAction(),
RestSearchMonitorAction(settings, clusterService),
RestExecuteMonitorAction(),
RestExecuteWorkflowAction(),
Expand All @@ -172,7 +177,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetEmailGroupAction(),
RestGetDestinationsAction(),
RestGetAlertsAction(),
RestGetFindingsAction()
RestGetFindingsAction(),
RestGetWorkflowAction(),
RestDeleteWorkflowAction()
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.REFRESH
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.action.RestToXContentListener
import java.io.IOException

/**
* This class consists of the REST handler to delete workflows.
*/
class RestDeleteWorkflowAction : BaseRestHandler() {

private val log = LogManager.getLogger(javaClass)

override fun getName(): String {
return "delete_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(
RestRequest.Method.DELETE,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")

val workflowId = request.param("workflowID")
val deleteDelegateMonitors = request.paramAsBoolean("deleteDelegateMonitors", false)
log.debug("${request.method()} ${request.uri()}")

val refreshPolicy =
WriteRequest.RefreshPolicy.parse(request.param(REFRESH, WriteRequest.RefreshPolicy.IMMEDIATE.value))
val deleteWorkflowRequest = DeleteWorkflowRequest(workflowId, deleteDelegateMonitors, refreshPolicy)

return RestChannelConsumer { channel ->
client.execute(
AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, deleteWorkflowRequest,
RestToXContentListener(channel)
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.context
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.GetWorkflowRequest
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.action.RestActions
import org.opensearch.rest.action.RestToXContentListener
import org.opensearch.search.fetch.subphase.FetchSourceContext

/**
* This class consists of the REST handler to retrieve a workflow .
*/
class RestGetWorkflowAction : BaseRestHandler() {

private val log = LogManager.getLogger(javaClass)

override fun getName(): String {
return "get_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(
RestRequest.Method.GET,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
),
RestHandler.Route(
RestRequest.Method.HEAD,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")

val workflowId = request.param("workflowID")
if (workflowId == null || workflowId.isEmpty()) {
throw IllegalArgumentException("missing id")
}

var srcContext = context(request)
if (request.method() == RestRequest.Method.HEAD) {
srcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE
}
val getWorkflowRequest =
GetWorkflowRequest(workflowId, RestActions.parseVersion(request), request.method(), srcContext)
return RestChannelConsumer {
channel ->
client.execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, getWorkflowRequest, RestToXContentListener(channel))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.alerting.resthandler

import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IF_PRIMARY_TERM
import org.opensearch.alerting.util.IF_SEQ_NO
import org.opensearch.alerting.util.REFRESH
import org.opensearch.client.node.NodeClient
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.BytesRestResponse
import org.opensearch.rest.RestChannel
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestResponse
import org.opensearch.rest.RestStatus
import org.opensearch.rest.action.RestResponseListener
import java.io.IOException
import java.time.Instant

/**
* Rest handlers to create and update workflows.
*/
class RestIndexWorkflowAction : BaseRestHandler() {

override fun getName(): String {
return "index_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(RestRequest.Method.POST, AlertingPlugin.WORKFLOW_BASE_URI),
RestHandler.Route(
RestRequest.Method.PUT,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val id = request.param("workflowID", Workflow.NO_ID)
if (request.method() == RestRequest.Method.PUT && Workflow.NO_ID == id) {
throw AlertingException.wrap(IllegalArgumentException("Missing workflow ID"))
}

// Validate request by parsing JSON to Monitor
val xcp = request.contentParser()
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val workflow = Workflow.parse(xcp, id).copy(lastUpdateTime = Instant.now())
val rbacRoles = request.contentParser().map()["rbac_roles"] as List<String>?

val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
val refreshPolicy = if (request.hasParam(REFRESH)) {
WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
} else {
WriteRequest.RefreshPolicy.IMMEDIATE
}
val workflowRequest =
IndexWorkflowRequest(id, seqNo, primaryTerm, refreshPolicy, request.method(), workflow, rbacRoles)

return RestChannelConsumer { channel ->
client.execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, workflowRequest, indexMonitorResponse(channel, request.method()))
}
}

private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener<IndexWorkflowResponse> {
return object : RestResponseListener<IndexWorkflowResponse>(channel) {
@Throws(Exception::class)
override fun buildResponse(response: IndexWorkflowResponse): RestResponse {
var returnStatus = RestStatus.CREATED
if (restMethod == RestRequest.Method.PUT)
returnStatus = RestStatus.OK

val restResponse =
BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
if (returnStatus == RestStatus.CREATED) {
val location = "${AlertingPlugin.WORKFLOW_BASE_URI}/${response.id}"
restResponse.addHeader("Location", location)
}
return restResponse
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchStatusException
import org.opensearch.ResourceAlreadyExistsException
Expand Down Expand Up @@ -206,7 +207,7 @@ class TransportIndexWorkflowAction @Inject constructor(

override fun onFailure(t: Exception) {
// https://github.com/opensearch-project/alerting/issues/646
if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) {
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
scope.launch {
// Wait for the yellow status
val request = ClusterHealthRequest()
Expand Down Expand Up @@ -321,7 +322,6 @@ class TransportIndexWorkflowAction @Inject constructor(
}

private suspend fun indexWorkflow() {

if (user != null) {
// Use the backend roles which is an intersection of the requested backend roles and the user's backend roles.
// Admins can pass in any backend role. Also if no backend role is passed in, all the user's backend roles are used.
Expand Down Expand Up @@ -555,6 +555,9 @@ class TransportIndexWorkflowAction @Inject constructor(
reqMonitorIds.remove(it.id)
}
if (reqMonitorIds.isNotEmpty()) {
log.error("monitorIds: " + monitorIds.joinToString())
log.error("delegateMonitors: " + delegateMonitors.joinToString { it.id })
log.error("reqMonitorIds: " + reqMonitorIds.joinToString())
throw AlertingException.wrap(IllegalArgumentException(("${reqMonitorIds.joinToString()} are not valid monitor ids")))
}
}
Expand All @@ -579,6 +582,14 @@ class TransportIndexWorkflowAction @Inject constructor(
monitors.add(monitor as Monitor)
}
}
if (monitors.isEmpty()) {
val searchSource1 = SearchSourceBuilder().query(QueryBuilders.matchAllQuery())
val searchRequest1 = SearchRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).source(searchSource1)
val response1: SearchResponse = client.suspendUntil { client.search(searchRequest1, it) }

print(response1)
}

return monitors
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ fun randomScript(source: String = "return " + OpenSearchRestTestCase.randomBoole

val ADMIN = "admin"
val ALERTING_BASE_URI = "/_plugins/_alerting/monitors"
val WORKFLOW_ALERTING_BASE_URI = "/_plugins/_alerting/workflows"
val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
val LEGACY_OPENDISTRO_ALERTING_BASE_URI = "/_opendistro/_alerting/monitors"
val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
Expand Down
Loading