Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SearchMonitorsTool bugs; add corresponding ITs #151

Merged
merged 5 commits into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 67 additions & 105 deletions src/main/java/org/opensearch/agent/tools/SearchMonitorsTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.commons.alerting.AlertingPluginInterface;
import org.opensearch.commons.alerting.action.GetMonitorRequest;
import org.opensearch.commons.alerting.action.GetMonitorResponse;
import org.opensearch.commons.alerting.action.SearchMonitorRequest;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.ScheduledJob;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.query.BoolQueryBuilder;
Expand All @@ -35,7 +32,6 @@
import org.opensearch.ml.common.spi.tools.Parser;
import org.opensearch.ml.common.spi.tools.Tool;
import org.opensearch.ml.common.spi.tools.ToolAnnotation;
import org.opensearch.rest.RestRequest;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortOrder;
Expand Down Expand Up @@ -102,89 +98,73 @@
? Integer.parseInt(parameters.get("startIndex"))
: 0;

// If a monitor ID is specified, all other params will be ignored. Simply return the monitor details based on that ID
// via the get monitor transport action
List<QueryBuilder> mustList = new ArrayList<QueryBuilder>();
if (monitorId != null) {
GetMonitorRequest getMonitorRequest = new GetMonitorRequest(monitorId, 1L, RestRequest.Method.GET, null);
ActionListener<GetMonitorResponse> getMonitorListener = ActionListener.<GetMonitorResponse>wrap(response -> {
Monitor monitor = response.getMonitor();
processGetMonitorHit(monitor, listener);
}, e -> {
// System index isn't initialized by default, so ignore such errors. Alerting plugin does not return the
// standard IndexNotFoundException so we parse the message instead
if (e.getMessage().contains("Configured indices are not found")) {
processGetMonitorHit(null, listener);
} else {
log.error("Failed to get monitor.", e);
listener.onFailure(e);
}
});
AlertingPluginInterface.INSTANCE.getMonitor((NodeClient) client, getMonitorRequest, getMonitorListener);
} else {
List<QueryBuilder> mustList = new ArrayList<QueryBuilder>();
if (monitorName != null) {
mustList.add(new TermQueryBuilder("monitor.name.keyword", monitorName));
}
if (monitorNamePattern != null) {
mustList.add(new WildcardQueryBuilder("monitor.name.keyword", monitorNamePattern));
}
if (enabled != null) {
mustList.add(new TermQueryBuilder("monitor.enabled", enabled));
mustList.add(new TermQueryBuilder("_id", monitorId));
}
if (monitorName != null) {
mustList.add(new TermQueryBuilder("monitor.name.keyword", monitorName));
}
if (monitorNamePattern != null) {
mustList.add(new WildcardQueryBuilder("monitor.name.keyword", monitorNamePattern));
}
if (enabled != null) {
mustList.add(new TermQueryBuilder("monitor.enabled", enabled));
}
if (hasTriggers != null) {
NestedQueryBuilder nestedTriggerQuery = new NestedQueryBuilder(
"monitor.triggers",
new ExistsQueryBuilder("monitor.triggers"),
ScoreMode.None
);

BoolQueryBuilder triggerQuery = new BoolQueryBuilder();
if (hasTriggers) {
triggerQuery.must(nestedTriggerQuery);
} else {
triggerQuery.mustNot(nestedTriggerQuery);
Comment on lines +103 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are adding these filters to find the monitors, I think it would be beneficial to also search by monitor type as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR isn't adding any filters, it's just shown as updated due to tabbing since the overall if/else was removed. Prefer to leave the tool with the current set of params and add/remove/tune later on as more testing is done and we capture the breadth of questions and functional responses.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially for initial release, we need to be weary about how many knobs we allow the LLM to tune. Ideally we cover as many questions as possible in a confident/consistent/accurate way, and slowly adjust or expand over time after we have a good gauge of performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, we can set this as a further enhancement, down the road.

}
if (hasTriggers != null) {
NestedQueryBuilder nestedTriggerQuery = new NestedQueryBuilder(
"monitor.triggers",
new ExistsQueryBuilder("monitor.triggers"),
ScoreMode.None
mustList.add(triggerQuery);
}
if (indices != null) {
mustList
.add(
new NestedQueryBuilder(
"monitor.inputs",
new WildcardQueryBuilder("monitor.inputs.search.indices", indices),
ScoreMode.None
)
);
}

BoolQueryBuilder triggerQuery = new BoolQueryBuilder();
if (hasTriggers) {
triggerQuery.must(nestedTriggerQuery);
} else {
triggerQuery.mustNot(nestedTriggerQuery);
}
mustList.add(triggerQuery);
}
if (indices != null) {
mustList
.add(
new NestedQueryBuilder(
"monitor.inputs",
new WildcardQueryBuilder("monitor.inputs.search.indices", indices),
ScoreMode.None
)
);
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must().addAll(mustList);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(boolQueryBuilder)
.size(size)
.from(startIndex)
.sort(sortString, sortOrder);

SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(ScheduledJob.SCHEDULED_JOBS_INDEX);
SearchMonitorRequest searchMonitorRequest = new SearchMonitorRequest(searchRequest);

ActionListener<SearchResponse> searchMonitorListener = ActionListener.<SearchResponse>wrap(response -> {
List<SearchHit> hits = Arrays.asList(response.getHits().getHits());
Map<String, SearchHit> hitsAsMap = hits.stream().collect(Collectors.toMap(SearchHit::getId, hit -> hit));
processHits(hitsAsMap, listener);

}, e -> {
// System index isn't initialized by default, so ignore such errors. Alerting plugin does not return the
// standard IndexNotFoundException so we parse the message instead
if (e.getMessage().contains("Configured indices are not found")) {
processHits(Collections.emptyMap(), listener);

Check warning on line 160 in src/main/java/org/opensearch/agent/tools/SearchMonitorsTool.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/agent/tools/SearchMonitorsTool.java#L160

Added line #L160 was not covered by tests
} else {
log.error("Failed to search monitors.", e);
listener.onFailure(e);

Check warning on line 163 in src/main/java/org/opensearch/agent/tools/SearchMonitorsTool.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/agent/tools/SearchMonitorsTool.java#L162-L163

Added lines #L162 - L163 were not covered by tests
}
});

Check warning on line 165 in src/main/java/org/opensearch/agent/tools/SearchMonitorsTool.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/agent/tools/SearchMonitorsTool.java#L165

Added line #L165 was not covered by tests
AlertingPluginInterface.INSTANCE.searchMonitors((NodeClient) client, searchMonitorRequest, searchMonitorListener);

BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must().addAll(mustList);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(boolQueryBuilder)
.size(size)
.from(startIndex)
.sort(sortString, sortOrder);

SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(ScheduledJob.SCHEDULED_JOBS_INDEX);
SearchMonitorRequest searchMonitorRequest = new SearchMonitorRequest(searchRequest);

ActionListener<SearchResponse> searchMonitorListener = ActionListener.<SearchResponse>wrap(response -> {
List<SearchHit> hits = Arrays.asList(response.getHits().getHits());
Map<String, SearchHit> hitsAsMap = hits.stream().collect(Collectors.toMap(SearchHit::getId, hit -> hit));
processHits(hitsAsMap, listener);

}, e -> {
// System index isn't initialized by default, so ignore such errors. Alerting plugin does not return the
// standard IndexNotFoundException so we parse the message instead
if (e.getMessage().contains("Configured indices are not found")) {
processHits(Collections.emptyMap(), listener);
} else {
log.error("Failed to search monitors.", e);
listener.onFailure(e);
}
});
AlertingPluginInterface.INSTANCE.searchMonitors((NodeClient) client, searchMonitorRequest, searchMonitorListener);
}
}

@Override
Expand All @@ -201,39 +181,21 @@
StringBuilder sb = new StringBuilder();
sb.append("Monitors=[");
for (SearchHit hit : hitsAsMap.values()) {
Map<String, Object> monitorAsMap = (Map<String, Object>) hit.getSourceAsMap().get("monitor");
sb.append("{");
sb.append("id=").append(hit.getId()).append(",");
sb.append("name=").append(hit.getSourceAsMap().get("name")).append(",");
sb.append("type=").append(hit.getSourceAsMap().get("type")).append(",");
sb.append("enabled=").append(hit.getSourceAsMap().get("enabled")).append(",");
sb.append("enabledTime=").append(hit.getSourceAsMap().get("enabled_time")).append(",");
sb.append("lastUpdateTime=").append(hit.getSourceAsMap().get("last_update_time"));
sb.append("name=").append(monitorAsMap.get("name")).append(",");
sb.append("type=").append(monitorAsMap.get("type")).append(",");
sb.append("enabled=").append(monitorAsMap.get("enabled")).append(",");
sb.append("enabledTime=").append(monitorAsMap.get("enabled_time")).append(",");
sb.append("lastUpdateTime=").append(monitorAsMap.get("last_update_time"));
sb.append("}");
}
sb.append("]");
sb.append("TotalMonitors=").append(hitsAsMap.size());
listener.onResponse((T) sb.toString());
}

private <T> void processGetMonitorHit(Monitor monitor, ActionListener<T> listener) {
StringBuilder sb = new StringBuilder();
if (monitor != null) {
sb.append("Monitors=[");
sb.append("{");
sb.append("id=").append(monitor.getId()).append(",");
sb.append("name=").append(monitor.getName()).append(",");
sb.append("type=").append(monitor.getType()).append(",");
sb.append("enabled=").append(monitor.getEnabled()).append(",");
sb.append("enabledTime=").append(monitor.getEnabledTime().toEpochMilli()).append(",");
sb.append("lastUpdateTime=").append(monitor.getLastUpdateTime().toEpochMilli());
sb.append("}]");
sb.append("TotalMonitors=1");
} else {
sb.append("Monitors=[]TotalMonitors=0");
}
listener.onResponse((T) sb.toString());
}

/**
* Factory for the {@link SearchMonitorsTool}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.alerting.action.GetMonitorResponse;
import org.opensearch.commons.alerting.model.CronSchedule;
import org.opensearch.commons.alerting.model.DataSources;
import org.opensearch.commons.alerting.model.Monitor;
Expand Down Expand Up @@ -96,29 +95,15 @@ public void setup() {
@Test
public void testRunWithNoMonitors() throws Exception {
Tool tool = SearchMonitorsTool.Factory.getInstance().create(Collections.emptyMap());

SearchHit[] hits = new SearchHit[0];

TotalHits totalHits = new TotalHits(hits.length, TotalHits.Relation.EQUAL_TO);

SearchResponse getMonitorsResponse = new SearchResponse(
new SearchResponseSections(new SearchHits(hits, totalHits, 0), new Aggregations(new ArrayList<>()), null, false, null, null, 0),
null,
0,
0,
0,
0,
null,
null
);
String expectedResponseStr = String.format("Monitors=[]TotalMonitors=%d", hits.length);
SearchResponse searchMonitorsResponse = getEmptySearchMonitorsResponse();
String expectedResponseStr = "Monitors=[]TotalMonitors=0";

@SuppressWarnings("unchecked")
ActionListener<String> listener = Mockito.mock(ActionListener.class);

doAnswer((invocation) -> {
ActionListener<SearchResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(getMonitorsResponse);
responseListener.onResponse(searchMonitorsResponse);
return null;
}).when(nodeClient).execute(any(ActionType.class), any(), any());

Expand All @@ -132,21 +117,15 @@ public void testRunWithNoMonitors() throws Exception {
public void testRunWithMonitorId() throws Exception {
Tool tool = SearchMonitorsTool.Factory.getInstance().create(Collections.emptyMap());

GetMonitorResponse getMonitorResponse = new GetMonitorResponse(
testMonitor.getId(),
1L,
2L,
0L,
testMonitor,
Collections.emptyList()
);
SearchResponse searchMonitorsResponse = getSearchMonitorsResponse(testMonitor);
String expectedResponseStr = getExpectedResponseString(testMonitor);

@SuppressWarnings("unchecked")
ActionListener<String> listener = Mockito.mock(ActionListener.class);

doAnswer((invocation) -> {
ActionListener<GetMonitorResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(getMonitorResponse);
ActionListener<SearchResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(searchMonitorsResponse);
return null;
}).when(nodeClient).execute(any(ActionType.class), any(), any());

Expand All @@ -160,15 +139,15 @@ public void testRunWithMonitorId() throws Exception {
public void testRunWithMonitorIdNotFound() throws Exception {
Tool tool = SearchMonitorsTool.Factory.getInstance().create(Collections.emptyMap());

GetMonitorResponse responseWithNullMonitor = new GetMonitorResponse(testMonitor.getId(), 1L, 2L, 0L, null, Collections.emptyList());
String expectedResponseStr = String.format("Monitors=[]TotalMonitors=0");
SearchResponse searchMonitorsResponse = getEmptySearchMonitorsResponse();
String expectedResponseStr = "Monitors=[]TotalMonitors=0";

@SuppressWarnings("unchecked")
ActionListener<String> listener = Mockito.mock(ActionListener.class);

doAnswer((invocation) -> {
ActionListener<GetMonitorResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(responseWithNullMonitor);
ActionListener<SearchResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(searchMonitorsResponse);
return null;
}).when(nodeClient).execute(any(ActionType.class), any(), any());

Expand All @@ -182,37 +161,15 @@ public void testRunWithMonitorIdNotFound() throws Exception {
public void testRunWithSingleMonitor() throws Exception {
Tool tool = SearchMonitorsTool.Factory.getInstance().create(Collections.emptyMap());

XContentBuilder content = XContentBuilder.builder(XContentType.JSON.xContent());
content.startObject();
content.field("name", testMonitor.getName());
content.field("type", testMonitor.getType());
content.field("enabled", Boolean.toString(testMonitor.getEnabled()));
content.field("enabled_time", Long.toString(testMonitor.getEnabledTime().toEpochMilli()));
content.field("last_update_time", Long.toString(testMonitor.getLastUpdateTime().toEpochMilli()));
content.endObject();
SearchHit[] hits = new SearchHit[1];
hits[0] = new SearchHit(0, testMonitor.getId(), null, null).sourceRef(BytesReference.bytes(content));

TotalHits totalHits = new TotalHits(hits.length, TotalHits.Relation.EQUAL_TO);

SearchResponse getMonitorsResponse = new SearchResponse(
new SearchResponseSections(new SearchHits(hits, totalHits, 0), new Aggregations(new ArrayList<>()), null, false, null, null, 0),
null,
0,
0,
0,
0,
null,
null
);
SearchResponse searchMonitorsResponse = getSearchMonitorsResponse(testMonitor);
String expectedResponseStr = getExpectedResponseString(testMonitor);

@SuppressWarnings("unchecked")
ActionListener<String> listener = Mockito.mock(ActionListener.class);

doAnswer((invocation) -> {
ActionListener<SearchResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(getMonitorsResponse);
responseListener.onResponse(searchMonitorsResponse);
return null;
}).when(nodeClient).execute(any(ActionType.class), any(), any());

Expand Down Expand Up @@ -254,6 +211,50 @@ public void testValidate() {
assertTrue(tool.validate(nullParams));
}

private SearchResponse getSearchMonitorsResponse(Monitor monitor) throws Exception {
XContentBuilder content = XContentBuilder.builder(XContentType.JSON.xContent());
content
.startObject()
.startObject("monitor")
.field("name", monitor.getName())
.field("type", monitor.getType())
.field("enabled", Boolean.toString(monitor.getEnabled()))
.field("enabled_time", Long.toString(monitor.getEnabledTime().toEpochMilli()))
.field("last_update_time", Long.toString(monitor.getLastUpdateTime().toEpochMilli()))
.endObject()
.endObject();
SearchHit[] hits = new SearchHit[1];
hits[0] = new SearchHit(0, monitor.getId(), null, null).sourceRef(BytesReference.bytes(content));

TotalHits totalHits = new TotalHits(hits.length, TotalHits.Relation.EQUAL_TO);

return new SearchResponse(
new SearchResponseSections(new SearchHits(hits, totalHits, 0), new Aggregations(new ArrayList<>()), null, false, null, null, 0),
null,
0,
0,
0,
0,
null,
null
);
}

private SearchResponse getEmptySearchMonitorsResponse() throws Exception {
SearchHit[] hits = new SearchHit[0];
TotalHits totalHits = new TotalHits(hits.length, TotalHits.Relation.EQUAL_TO);
return new SearchResponse(
new SearchResponseSections(new SearchHits(hits, totalHits, 0), new Aggregations(new ArrayList<>()), null, false, null, null, 0),
null,
0,
0,
0,
0,
null,
null
);
}

private String getExpectedResponseString(Monitor testMonitor) {
return String
.format(
Expand Down
Loading
Loading