Skip to content

Commit

Permalink
Fix SearchMonitorsTool bugs; add corresponding ITs (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#151) (opensearch-project#153)

* Fix search monitor bugs; add search monitor ITs

* Remove unused fn

* Clean up UT

* Change to beforeEach

* Fix detector_type bug

---------

(cherry picked from commit 722bfd2)

Signed-off-by: Tyler Ohlsen <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: yuye-aws <[email protected]>
  • Loading branch information
2 people authored and yuye-aws committed Apr 26, 2024
1 parent 993d25c commit 4892de9
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private <T> void processHits(Map<String, SearchHit> hitsAsMap, ActionListener<T>
sb.append("{");
sb.append("id=").append(hit.getId()).append(",");
sb.append("name=").append(hit.getSourceAsMap().get("name")).append(",");
sb.append("type=").append(hit.getSourceAsMap().get("type")).append(",");
sb.append("type=").append(hit.getSourceAsMap().get("detector_type")).append(",");
sb.append("description=").append(hit.getSourceAsMap().get("description")).append(",");
sb.append("index=").append(hit.getSourceAsMap().get("indices")).append(",");
sb.append("lastUpdateTime=").append(hit.getSourceAsMap().get("last_update_time"));
Expand Down
172 changes: 67 additions & 105 deletions src/main/java/org/opensearch/agent/tools/SearchMonitorsTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.commons.alerting.AlertingPluginInterface;
import org.opensearch.commons.alerting.action.GetMonitorRequest;
import org.opensearch.commons.alerting.action.GetMonitorResponse;
import org.opensearch.commons.alerting.action.SearchMonitorRequest;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.ScheduledJob;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.query.BoolQueryBuilder;
Expand All @@ -35,7 +32,6 @@
import org.opensearch.ml.common.spi.tools.Parser;
import org.opensearch.ml.common.spi.tools.Tool;
import org.opensearch.ml.common.spi.tools.ToolAnnotation;
import org.opensearch.rest.RestRequest;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortOrder;
Expand Down Expand Up @@ -102,89 +98,73 @@ public <T> void run(Map<String, String> parameters, ActionListener<T> listener)
? Integer.parseInt(parameters.get("startIndex"))
: 0;

// If a monitor ID is specified, all other params will be ignored. Simply return the monitor details based on that ID
// via the get monitor transport action
List<QueryBuilder> mustList = new ArrayList<QueryBuilder>();
if (monitorId != null) {
GetMonitorRequest getMonitorRequest = new GetMonitorRequest(monitorId, 1L, RestRequest.Method.GET, null);
ActionListener<GetMonitorResponse> getMonitorListener = ActionListener.<GetMonitorResponse>wrap(response -> {
Monitor monitor = response.getMonitor();
processGetMonitorHit(monitor, listener);
}, e -> {
// System index isn't initialized by default, so ignore such errors. Alerting plugin does not return the
// standard IndexNotFoundException so we parse the message instead
if (e.getMessage().contains("Configured indices are not found")) {
processGetMonitorHit(null, listener);
} else {
log.error("Failed to get monitor.", e);
listener.onFailure(e);
}
});
AlertingPluginInterface.INSTANCE.getMonitor((NodeClient) client, getMonitorRequest, getMonitorListener);
} else {
List<QueryBuilder> mustList = new ArrayList<QueryBuilder>();
if (monitorName != null) {
mustList.add(new TermQueryBuilder("monitor.name.keyword", monitorName));
}
if (monitorNamePattern != null) {
mustList.add(new WildcardQueryBuilder("monitor.name.keyword", monitorNamePattern));
}
if (enabled != null) {
mustList.add(new TermQueryBuilder("monitor.enabled", enabled));
mustList.add(new TermQueryBuilder("_id", monitorId));
}
if (monitorName != null) {
mustList.add(new TermQueryBuilder("monitor.name.keyword", monitorName));
}
if (monitorNamePattern != null) {
mustList.add(new WildcardQueryBuilder("monitor.name.keyword", monitorNamePattern));
}
if (enabled != null) {
mustList.add(new TermQueryBuilder("monitor.enabled", enabled));
}
if (hasTriggers != null) {
NestedQueryBuilder nestedTriggerQuery = new NestedQueryBuilder(
"monitor.triggers",
new ExistsQueryBuilder("monitor.triggers"),
ScoreMode.None
);

BoolQueryBuilder triggerQuery = new BoolQueryBuilder();
if (hasTriggers) {
triggerQuery.must(nestedTriggerQuery);
} else {
triggerQuery.mustNot(nestedTriggerQuery);
}
if (hasTriggers != null) {
NestedQueryBuilder nestedTriggerQuery = new NestedQueryBuilder(
"monitor.triggers",
new ExistsQueryBuilder("monitor.triggers"),
ScoreMode.None
mustList.add(triggerQuery);
}
if (indices != null) {
mustList
.add(
new NestedQueryBuilder(
"monitor.inputs",
new WildcardQueryBuilder("monitor.inputs.search.indices", indices),
ScoreMode.None
)
);
}

BoolQueryBuilder triggerQuery = new BoolQueryBuilder();
if (hasTriggers) {
triggerQuery.must(nestedTriggerQuery);
} else {
triggerQuery.mustNot(nestedTriggerQuery);
}
mustList.add(triggerQuery);
}
if (indices != null) {
mustList
.add(
new NestedQueryBuilder(
"monitor.inputs",
new WildcardQueryBuilder("monitor.inputs.search.indices", indices),
ScoreMode.None
)
);
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must().addAll(mustList);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(boolQueryBuilder)
.size(size)
.from(startIndex)
.sort(sortString, sortOrder);

SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(ScheduledJob.SCHEDULED_JOBS_INDEX);
SearchMonitorRequest searchMonitorRequest = new SearchMonitorRequest(searchRequest);

ActionListener<SearchResponse> searchMonitorListener = ActionListener.<SearchResponse>wrap(response -> {
List<SearchHit> hits = Arrays.asList(response.getHits().getHits());
Map<String, SearchHit> hitsAsMap = hits.stream().collect(Collectors.toMap(SearchHit::getId, hit -> hit));
processHits(hitsAsMap, listener);

}, e -> {
// System index isn't initialized by default, so ignore such errors. Alerting plugin does not return the
// standard IndexNotFoundException so we parse the message instead
if (e.getMessage().contains("Configured indices are not found")) {
processHits(Collections.emptyMap(), listener);
} else {
log.error("Failed to search monitors.", e);
listener.onFailure(e);
}
});
AlertingPluginInterface.INSTANCE.searchMonitors((NodeClient) client, searchMonitorRequest, searchMonitorListener);

BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must().addAll(mustList);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(boolQueryBuilder)
.size(size)
.from(startIndex)
.sort(sortString, sortOrder);

SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(ScheduledJob.SCHEDULED_JOBS_INDEX);
SearchMonitorRequest searchMonitorRequest = new SearchMonitorRequest(searchRequest);

ActionListener<SearchResponse> searchMonitorListener = ActionListener.<SearchResponse>wrap(response -> {
List<SearchHit> hits = Arrays.asList(response.getHits().getHits());
Map<String, SearchHit> hitsAsMap = hits.stream().collect(Collectors.toMap(SearchHit::getId, hit -> hit));
processHits(hitsAsMap, listener);

}, e -> {
// System index isn't initialized by default, so ignore such errors. Alerting plugin does not return the
// standard IndexNotFoundException so we parse the message instead
if (e.getMessage().contains("Configured indices are not found")) {
processHits(Collections.emptyMap(), listener);
} else {
log.error("Failed to search monitors.", e);
listener.onFailure(e);
}
});
AlertingPluginInterface.INSTANCE.searchMonitors((NodeClient) client, searchMonitorRequest, searchMonitorListener);
}
}

@Override
Expand All @@ -201,39 +181,21 @@ private <T> void processHits(Map<String, SearchHit> hitsAsMap, ActionListener<T>
StringBuilder sb = new StringBuilder();
sb.append("Monitors=[");
for (SearchHit hit : hitsAsMap.values()) {
Map<String, Object> monitorAsMap = (Map<String, Object>) hit.getSourceAsMap().get("monitor");
sb.append("{");
sb.append("id=").append(hit.getId()).append(",");
sb.append("name=").append(hit.getSourceAsMap().get("name")).append(",");
sb.append("type=").append(hit.getSourceAsMap().get("type")).append(",");
sb.append("enabled=").append(hit.getSourceAsMap().get("enabled")).append(",");
sb.append("enabledTime=").append(hit.getSourceAsMap().get("enabled_time")).append(",");
sb.append("lastUpdateTime=").append(hit.getSourceAsMap().get("last_update_time"));
sb.append("name=").append(monitorAsMap.get("name")).append(",");
sb.append("type=").append(monitorAsMap.get("type")).append(",");
sb.append("enabled=").append(monitorAsMap.get("enabled")).append(",");
sb.append("enabledTime=").append(monitorAsMap.get("enabled_time")).append(",");
sb.append("lastUpdateTime=").append(monitorAsMap.get("last_update_time"));
sb.append("}");
}
sb.append("]");
sb.append("TotalMonitors=").append(hitsAsMap.size());
listener.onResponse((T) sb.toString());
}

private <T> void processGetMonitorHit(Monitor monitor, ActionListener<T> listener) {
StringBuilder sb = new StringBuilder();
if (monitor != null) {
sb.append("Monitors=[");
sb.append("{");
sb.append("id=").append(monitor.getId()).append(",");
sb.append("name=").append(monitor.getName()).append(",");
sb.append("type=").append(monitor.getType()).append(",");
sb.append("enabled=").append(monitor.getEnabled()).append(",");
sb.append("enabledTime=").append(monitor.getEnabledTime().toEpochMilli()).append(",");
sb.append("lastUpdateTime=").append(monitor.getLastUpdateTime().toEpochMilli());
sb.append("}]");
sb.append("TotalMonitors=1");
} else {
sb.append("Monitors=[]TotalMonitors=0");
}
listener.onResponse((T) sb.toString());
}

/**
* Factory for the {@link SearchMonitorsTool}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testRunWithSingleAnomalyDetector() throws Exception {
XContentBuilder content = XContentBuilder.builder(XContentType.JSON.xContent());
content.startObject();
content.field("name", testDetector.getName());
content.field("type", testDetector.getDetectorType());
content.field("detector_type", testDetector.getDetectorType());
content.field("description", testDetector.getDescription());
content.field("indices", testDetector.getIndices().get(0));
content.field("last_update_time", testDetector.getLastUpdateTime().toEpochMilli());
Expand Down
Loading

0 comments on commit 4892de9

Please sign in to comment.