diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java index 059947f91..8ffed43b7 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java @@ -37,6 +37,7 @@ public static String getTooManyCategoricalFieldErr(int limit) { "Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and _(underscore)"; public static String FAIL_TO_VALIDATE = "failed to validate"; public static String INVALID_TIMESTAMP = "Timestamp field: (%s) must be of type date"; + public static String NON_EXISTENT_TIMESTAMP_IN_INDEX = "Timestamp field: (%s) is not found in the (%s) index mapping"; public static String NON_EXISTENT_TIMESTAMP = "Timestamp field: (%s) is not found in index mapping"; public static String INVALID_NAME = "Valid characters for name are a-z, A-Z, 0-9, -(hyphen), _(underscore) and .(period)"; // change this error message to make it compatible with old version's integration(nexus) test @@ -74,6 +75,9 @@ public static String getTooManyCategoricalFieldErr(int limit) { + " characters."; public static final String INDEX_NOT_FOUND = "index does not exist"; public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s"; + public static final String FAIL_TO_GET_MAPPING = "Fail to get the index mapping"; + public static final String TIMESTAMP_VALIDATION_FAILED = "Validation failed for timefield of %s, "; + public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config"; // ====================================== diff --git a/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java b/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java index fa546c3d9..e9da98c8d 100644 --- a/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java +++ b/src/main/java/org/opensearch/timeseries/rest/RestValidateAction.java @@ -84,7 +84,6 @@ private Boolean validationTypesAreAccepted(String validationType) { public ValidateConfigRequest prepareRequest(RestRequest request, NodeClient client, String typesStr) throws IOException { XContentParser parser = request.contentParser(); ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - // if type param isn't blank and isn't a part of possible validation types throws exception if (!StringUtils.isBlank(typesStr)) { if (!validationTypesAreAccepted(typesStr)) { diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java index bba0a4f09..44e29f6ae 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java @@ -7,21 +7,14 @@ import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.timeseries.constant.CommonMessages.CATEGORICAL_FIELD_TYPE_ERR_MSG; +import static org.opensearch.timeseries.constant.CommonMessages.TIMESTAMP_VALIDATION_FAILED; import static org.opensearch.timeseries.util.ParseUtils.parseAggregators; import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE; import static org.opensearch.timeseries.util.RestHandlerUtils.isExceptionCausedByInvalidQuery; import java.io.IOException; import java.time.Clock; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; @@ -76,10 +69,7 @@ import org.opensearch.timeseries.model.ValidationIssueType; import org.opensearch.timeseries.task.TaskCacheManager; import org.opensearch.timeseries.task.TaskManager; -import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener; -import org.opensearch.timeseries.util.ParseUtils; -import org.opensearch.timeseries.util.RestHandlerUtils; -import org.opensearch.timeseries.util.SecurityClientUtil; +import org.opensearch.timeseries.util.*; import org.opensearch.transport.TransportService; import com.google.common.collect.Sets; @@ -241,7 +231,6 @@ public void start(ActionListener listener) { createOrUpdateConfig(listener); return; } - if (this.isDryRun) { if (timeSeriesIndices.doesIndexExist(resultIndexOrAlias) || timeSeriesIndices.doesAliasExist(resultIndexOrAlias)) { timeSeriesIndices @@ -304,64 +293,106 @@ protected void validateName(boolean indexingDryRun, ActionListener listener) protected void validateTimeField(boolean indexingDryRun, ActionListener listener) { String givenTimeField = config.getTimeField(); - GetFieldMappingsRequest getMappingsRequest = new GetFieldMappingsRequest(); - getMappingsRequest.indices(config.getIndices().toArray(new String[0])).fields(givenTimeField); - getMappingsRequest.indicesOptions(IndicesOptions.strictExpand()); - - // comments explaining fieldMappingResponse parsing can be found inside validateCategoricalField(String, boolean) - ActionListener mappingsListener = ActionListener.wrap(getMappingsResponse -> { - boolean foundField = false; - Map> mappingsByIndex = getMappingsResponse.mappings(); + HashMap> clusterIndicesMap = CrossClusterConfigUtils + .separateClusterIndexes(config.getIndices(), clusterService); - for (Map mappingsByField : mappingsByIndex.values()) { - for (Map.Entry field2Metadata : mappingsByField.entrySet()) { + ActionListener>> validateGetMappingForTimeFieldListener = ActionListener.wrap(response -> { + prepareConfigIndexing(indexingDryRun, listener); + }, exception -> { listener.onFailure(createValidationException(exception.getMessage(), ValidationIssueType.TIMEFIELD_FIELD)); }); + MultiResponsesDelegateActionListener>> multiGetMappingResponseListener = + new MultiResponsesDelegateActionListener<>( + validateGetMappingForTimeFieldListener, + clusterIndicesMap.entrySet().size(), + String.format(Locale.ROOT, TIMESTAMP_VALIDATION_FAILED, config.getName()), + false + ); - GetFieldMappingsResponse.FieldMappingMetadata fieldMetadata = field2Metadata.getValue(); - if (fieldMetadata != null) { - // sourceAsMap returns sth like {host2={type=keyword}} with host2 being a nested field - Map fieldMap = fieldMetadata.sourceAsMap(); - if (fieldMap != null) { - for (Object type : fieldMap.values()) { - if (type instanceof Map) { - foundField = true; - Map metadataMap = (Map) type; - String typeName = (String) metadataMap.get(CommonName.TYPE); - if (!typeName.equals(CommonName.DATE_TYPE) && !typeName.equals(CommonName.DATE_NANOS_TYPE)) { - listener - .onFailure( - new ValidationException( - String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, givenTimeField), - ValidationIssueType.TIMEFIELD_FIELD, - configValidationAspect - ) - ); - return; + for (Map.Entry> clusterIndicesEntry : clusterIndicesMap.entrySet()) { + GetFieldMappingsRequest getMappingsRequestForIndex = new GetFieldMappingsRequest(); + getMappingsRequestForIndex.indices((clusterIndicesEntry.getValue().toArray(new String[0]))).fields(givenTimeField); + getMappingsRequestForIndex.indicesOptions(IndicesOptions.strictExpand()); + Client targetClusterClient = CrossClusterConfigUtils.getClientForCluster(clusterIndicesEntry.getKey(), client, clusterService); + ActionListener getMappingResponseListener = ActionListener.wrap(getMappingsResponse -> { + boolean foundField = false; + Map> mappingsByIndex = getMappingsResponse.mappings(); + for (Map.Entry> mappingsByField : mappingsByIndex + .entrySet()) { + if (mappingsByField.getValue().isEmpty()) { + multiGetMappingResponseListener + .onFailure( + new ValidationException( + String + .format( + Locale.ROOT, + CommonMessages.NON_EXISTENT_TIMESTAMP_IN_INDEX, + givenTimeField, + mappingsByField.getKey() + ), + ValidationIssueType.TIMEFIELD_FIELD, + configValidationAspect + ) + ); + return; + } + for (Map.Entry field2Metadata : mappingsByField + .getValue() + .entrySet()) { + GetFieldMappingsResponse.FieldMappingMetadata fieldMetadata = field2Metadata.getValue(); + if (fieldMetadata != null) { + // sourceAsMap returns sth like {host2={type=keyword}} with host2 being a nested field + Map fieldMap = fieldMetadata.sourceAsMap(); + if (fieldMap != null) { + for (Object type : fieldMap.values()) { + if (type instanceof Map) { + foundField = true; + Map metadataMap = (Map) type; + String typeName = (String) metadataMap.get(CommonName.TYPE); + if (!typeName.equals(CommonName.DATE_TYPE) && !typeName.equals(CommonName.DATE_NANOS_TYPE)) { + multiGetMappingResponseListener + .onFailure( + new ValidationException( + String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, givenTimeField), + ValidationIssueType.TIMEFIELD_FIELD, + configValidationAspect + ) + ); + return; + } } } } } } } - } - if (!foundField) { - listener - .onFailure( - new ValidationException( - String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, givenTimeField), - ValidationIssueType.TIMEFIELD_FIELD, - configValidationAspect - ) - ); - return; - } - prepareConfigIndexing(indexingDryRun, listener); - }, error -> { - String message = String.format(Locale.ROOT, "Fail to get the index mapping of %s", config.getIndices()); - logger.error(message, error); - listener.onFailure(new IllegalArgumentException(message)); - }); - clientUtil - .executeWithInjectedSecurity(GetFieldMappingsAction.INSTANCE, getMappingsRequest, user, client, context, mappingsListener); + if (!foundField) { + multiGetMappingResponseListener + .onFailure( + new ValidationException( + String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, givenTimeField), + ValidationIssueType.TIMEFIELD_FIELD, + configValidationAspect + ) + ); + return; + } + + multiGetMappingResponseListener + .onResponse(new MergeableList<>(new ArrayList<>(Collections.singletonList(Optional.empty())))); + }, e -> { + String errorMessage = String.format(Locale.ROOT, "Fail to get the index mapping of %s", clusterIndicesEntry.getValue()); + logger.error(errorMessage, e); + multiGetMappingResponseListener.onFailure(new IllegalArgumentException(errorMessage, e)); + }); + clientUtil + .executeWithInjectedSecurity( + GetFieldMappingsAction.INSTANCE, + getMappingsRequestForIndex, + user, + targetClusterClient, + context, + getMappingResponseListener + ); + } } /** @@ -448,7 +479,6 @@ protected void validateAgainstExistingHCConfig(String configId, boolean indexing QueryBuilder query = QueryBuilders.boolQuery().filter(QueryBuilders.existsQuery(Config.CATEGORY_FIELD)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeout); - SearchRequest searchRequest = new SearchRequest(CommonName.CONFIG_INDEX).source(searchSourceBuilder); client .search( @@ -460,7 +490,7 @@ protected void validateAgainstExistingHCConfig(String configId, boolean indexing ) ); } else { - validateCategoricalField(configId, indexingDryRun, listener); + validateCategoricalFieldsInAllIndices(configId, indexingDryRun, listener); } } @@ -522,12 +552,33 @@ protected void onSearchHCConfigResponse(SearchResponse response, String detector } listener.onFailure(new IllegalArgumentException(errorMsg)); } else { - validateCategoricalField(detectorId, indexingDryRun, listener); + validateCategoricalFieldsInAllIndices(detectorId, indexingDryRun, listener); } } - @SuppressWarnings("unchecked") - protected void validateCategoricalField(String configId, boolean indexingDryRun, ActionListener listener) { + protected void validateCategoricalFieldsInAllIndices(String configId, boolean indexingDryRun, ActionListener listener) { + HashMap> clusterIndicesMap = CrossClusterConfigUtils + .separateClusterIndexes(config.getIndices(), clusterService); + + Iterator>> iterator = clusterIndicesMap.entrySet().iterator(); + + validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener); + + } + + protected void validateCategoricalFieldRecursive( + Iterator>> iterator, + String configId, + boolean indexingDryRun, + ActionListener listener + ) { + if (!iterator.hasNext()) { + searchConfigInputIndices(configId, indexingDryRun, listener); // Call after all indices are validated + return; + } + + // Get the next cluster indices entry + Map.Entry> clusterIndicesEntry = iterator.next(); List categoryField = config.getCategoryFields(); // categoryField should have at least 1 element. Otherwise, we won't reach here. @@ -537,12 +588,16 @@ protected void validateCategoricalField(String configId, boolean indexingDryRun, // throws validation exception before reaching here String categoryField0 = categoryField.get(0); - - GetFieldMappingsRequest getMappingsRequest = new GetFieldMappingsRequest(); - getMappingsRequest.indices(config.getIndices().toArray(new String[0])).fields(categoryField.toArray(new String[0])); - getMappingsRequest.indicesOptions(IndicesOptions.strictExpand()); - - ActionListener mappingsListener = ActionListener.wrap(getMappingsResponse -> { + Client targetClusterClient = CrossClusterConfigUtils.getClientForCluster(clusterIndicesEntry.getKey(), client, clusterService); + // Create the GetFieldMappingsRequest for each index + GetFieldMappingsRequest getMappingsRequestForIndex = new GetFieldMappingsRequest(); + getMappingsRequestForIndex + .indices(clusterIndicesEntry.getValue().toArray(new String[0])) + .fields(categoryField.toArray(new String[0])); + getMappingsRequestForIndex.indicesOptions(IndicesOptions.strictExpand()); + + // Define the listener for each getMapping request + ActionListener getMappingsListener = ActionListener.wrap(getMappingsResponse -> { // example getMappingsResponse: // GetFieldMappingsResponse{mappings={server-metrics={_doc={service=FieldMappingMetadata{fullName='service', // source=org.opensearch.core.common.bytes.BytesArray@7ba87dbd}}}}} @@ -596,18 +651,25 @@ protected void validateCategoricalField(String configId, boolean indexingDryRun, ); return; } + validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener); - searchConfigInputIndices(configId, indexingDryRun, listener); }, error -> { String message = String.format(Locale.ROOT, CommonMessages.FAIL_TO_GET_MAPPING_MSG, config.getIndices()); logger.error(message, error); listener.onFailure(new IllegalArgumentException(message)); }); - clientUtil - .executeWithInjectedSecurity(GetFieldMappingsAction.INSTANCE, getMappingsRequest, user, client, context, mappingsListener); + .executeWithInjectedSecurity( + GetFieldMappingsAction.INSTANCE, + getMappingsRequestForIndex, + user, + targetClusterClient, + context, + getMappingsListener + ); } + @SuppressWarnings("unchecked") protected void searchConfigInputIndices(String configId, boolean indexingDryRun, ActionListener listener) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .query(QueryBuilders.matchAllQuery()) diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java index dca5d9ee1..d2fe30c02 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java @@ -8,10 +8,7 @@ import static org.opensearch.timeseries.util.ParseUtils.checkFilterByBackendRoles; import java.time.Clock; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; +import java.util.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -205,7 +202,6 @@ public void validateExecute( storedContext.restore(); Config config = request.getConfig(); ActionListener validateListener = ActionListener.wrap(response -> { - logger.debug("Result of validation process " + response); // forcing response to be empty listener.onResponse(new ValidateConfigResponse((ConfigValidationIssue) null)); }, exception -> { diff --git a/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java b/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java new file mode 100644 index 000000000..61c97a814 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/util/CrossClusterConfigUtils.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.timeseries.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; + +public class CrossClusterConfigUtils { + private static final Logger logger = LogManager.getLogger(ParseUtils.class); + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local {@link NodeClient}. + * @param localClusterName The name of the local cluster. + * @return The local {@link NodeClient} for the local cluster, or a remote client for a remote cluster. + */ + public static Client getClientForCluster(String clusterName, Client client, String localClusterName) { + return clusterName.equals(localClusterName) ? client : client.getRemoteClusterClient(clusterName); + } + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local {@link NodeClient}. + * @param clusterService Used to retrieve the name of the local cluster. + * @return The local {@link NodeClient} for the local cluster, or a remote client for a remote cluster. + */ + public static Client getClientForCluster(String clusterName, Client client, ClusterService clusterService) { + return getClientForCluster(clusterName, client, clusterService.getClusterName().value()); + } + + /** + * Parses the list of indexes into a map of cluster_name to List of index names + * @param indexes A list of index names in cluster_name:index_name format. + * Local indexes can also be in index_name format. + * @param clusterService Used to retrieve the name of the local cluster. + * @return A map of cluster_name:index names + */ + public static HashMap> separateClusterIndexes(List indexes, ClusterService clusterService) { + return separateClusterIndexes(indexes, clusterService.getClusterName().value()); + } + + /** + * Parses the list of indexes into a map of cluster_name to list of index_name + * @param indexes A list of index names in cluster_name:index_name format. + * @param localClusterName The name of the local cluster. + * @return A map of cluster_name to List index_name + */ + public static HashMap> separateClusterIndexes(List indexes, String localClusterName) { + HashMap> output = new HashMap<>(); + for (String index : indexes) { + // Use the refactored method to get both cluster and index names in one call + Pair clusterAndIndex = parseClusterAndIndexName(index); + String clusterName = clusterAndIndex.getKey(); + String indexName = clusterAndIndex.getValue(); + logger.info("clusterName: " + clusterName); + logger.info("indexName: " + indexName); + + // If the index entry does not have a cluster_name, it indicates the index is on the local cluster. + if (clusterName.isEmpty()) { + clusterName = localClusterName; + } + output.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(indexName); + } + return output; + } + + /** + * Parses the cluster and index names from the given input string. + * The input can be in either "cluster_name:index_name" format or just "index_name". + * @param index The name of the index to evaluate. + * @return A Pair where the left is the cluster name (or empty if not present), and the right is the index name. + */ + public static Pair parseClusterAndIndexName(String index) { + if (index.contains(":")) { + String[] parts = index.split(":", 2); + String clusterName = parts[0]; + String indexName = parts.length > 1 ? parts[1] : ""; + return Pair.of(clusterName, indexName); + } else { + return Pair.of("", index); + } + } +} diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java index 5bee84cba..d1228ae98 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/AbstractForecasterActionHandlerTestCase.java @@ -73,6 +73,8 @@ public class AbstractForecasterActionHandlerTestCase extends AbstractTimeSeriesT protected ThreadContext threadContext; protected SecurityClientUtil clientUtil; protected String categoricalField; + // @Mock + protected ClusterName clusterName; @SuppressWarnings("unchecked") @Override @@ -85,6 +87,9 @@ public void setUp() throws Exception { clusterService = mock(ClusterService.class); ClusterName clusterName = new ClusterName("test"); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); ClusterState clusterState = ClusterState.builder(clusterName).metadata(Metadata.builder().build()).build(); when(clusterService.state()).thenReturn(clusterState); diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index c62e975cf..36d8157d7 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -100,6 +100,7 @@ public class IndexAnomalyDetectorActionHandlerTests extends AbstractTimeSeriesTe private RestRequest.Method method; private ADTaskManager adTaskManager; private SearchFeatureDao searchFeatureDao; + private ClusterName clusterName; @BeforeClass public static void beforeClass() { @@ -157,6 +158,10 @@ public void setUp() throws Exception { searchFeatureDao = mock(SearchFeatureDao.class); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); + handler = new IndexAnomalyDetectorActionHandler( clusterService, clientMock, diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java index e78b154ea..86702129e 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexForecasterActionHandlerTests.java @@ -40,6 +40,7 @@ import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.AllocationId; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; @@ -240,7 +241,7 @@ public void doE verify(clientSpy, times(1)).execute(eq(GetAction.INSTANCE), any(), any()); } - public void testFaiToParse() throws InterruptedException { + public void testFailToParse() throws InterruptedException { NodeClient client = new NodeClient(Settings.EMPTY, threadPool) { @Override public void doExecute( @@ -273,6 +274,9 @@ public void doE } }; NodeClient clientSpy = spy(client); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); method = RestRequest.Method.PUT; @@ -508,6 +512,9 @@ public void doE } }; NodeClient clientSpy = spy(client); + clusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(clusterName); + when(clusterName.value()).thenReturn("test"); method = RestRequest.Method.POST; diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java index 2fea7b5db..a5d88ef53 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateAnomalyDetectorActionHandlerTests.java @@ -43,6 +43,7 @@ import org.opensearch.ad.task.ADTaskManager; import org.opensearch.client.Client; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -91,6 +92,7 @@ public class ValidateAnomalyDetectorActionHandlerTests extends AbstractTimeSerie @Mock protected ThreadPool threadPool; protected ThreadContext threadContext; + protected ClusterName mockClusterName; @SuppressWarnings("unchecked") @Override @@ -106,7 +108,9 @@ public void setUp() throws Exception { anomalyDetectionIndices = mock(ADIndexManagement.class); when(anomalyDetectionIndices.doesConfigIndexExist()).thenReturn(true); - + mockClusterName = mock(ClusterName.class); + when(clusterService.getClusterName()).thenReturn(mockClusterName); + when(mockClusterName.value()).thenReturn("test"); detectorId = "123"; seqNo = 0L; primaryTerm = 0L; diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateForecasterActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateForecasterActionHandlerTests.java index e179a326f..10077f462 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateForecasterActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/ValidateForecasterActionHandlerTests.java @@ -22,6 +22,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.forecast.rest.handler.ValidateForecasterActionHandler; +import org.opensearch.timeseries.common.exception.ValidationException; import org.opensearch.timeseries.model.ValidationAspect; public class ValidateForecasterActionHandlerTests extends AbstractForecasterActionHandlerTestCase { @@ -100,7 +101,7 @@ public void doE assertTrue("should not reach here", false); inProgressLatch.countDown(); }, e -> { - assertTrue(e instanceof IllegalArgumentException); + assertTrue(e instanceof ValidationException); inProgressLatch.countDown(); })); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); diff --git a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java index 95cd0917a..d53b68011 100644 --- a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java @@ -1377,6 +1377,73 @@ public void testValidateAnomalyDetectorWithNoTimeField() throws Exception { assertEquals("time field missing", CommonMessages.NULL_TIME_FIELD, messageMap.get("time_field").get("message")); } + public void testValidateAnomalyDetectorWithMultipleIndicesOneNotFound() throws Exception { + TestHelpers.createIndex(client(), "test-index", TestHelpers.toHttpEntity("{\"timestamp\": " + Instant.now().toEpochMilli() + "}")); + Response resp = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/_validate", + ImmutableMap.of(), + TestHelpers + .toHttpEntity( + "{\"name\":\"" + + "test-detector" + + "\",\"description\":\"Test detector\",\"time_field\":\"timestamp\"," + + "\"indices\":[\"test-index\", \"test-index-2\"],\"feature_attributes\":[{\"feature_name\":\"cpu-sum\",\"" + + "feature_enabled\":true,\"aggregation_query\":{\"total_cpu\":{\"sum\":{\"field\":\"cpu\"}}}}," + + "{\"feature_name\":\"error-sum\",\"feature_enabled\":true,\"aggregation_query\":" + + "{\"total_error\":" + + "{\"sum\":{\"field\":\"error\"}}}}],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":" + + "{\"field\":" + + "\"cpu\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"detection_interval\":" + + "{\"period\":{\"interval\":1,\"unit\":\"Minutes\"}}," + + "\"window_delay\":{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}}," + + "\"shingle_size\": 8}" + ), + null + ); + Map responseMap = entityAsMap(resp); + + @SuppressWarnings("unchecked") + Map> messageMap = (Map>) XContentMapValues + .extractValue("detector", responseMap); + String errorMessage = "index does not exist"; + assertEquals("index does not exist", errorMessage, messageMap.get("indices").get("message")); + } + + public void testValidateAnomalyDetectorWithMultipleIndices() throws Exception { + TestHelpers.createIndexWithTimeField(client(), "test-index", TIME_FIELD); + TestHelpers.createIndexWithTimeField(client(), "test-index-2", TIME_FIELD); + + Response resp = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/_validate", + ImmutableMap.of(), + TestHelpers + .toHttpEntity( + "{\"name\":\"" + + "test-detector" + + "\",\"description\":\"Test detector\",\"time_field\":\"timestamp\"," + + "\"indices\":[\"test-index\", \"test-index-2\"],\"feature_attributes\":[{\"feature_name\":\"cpu-sum\",\"" + + "feature_enabled\":true,\"aggregation_query\":{\"total_cpu\":{\"sum\":{\"field\":\"cpu\"}}}}," + + "{\"feature_name\":\"error-sum\",\"feature_enabled\":true,\"aggregation_query\":" + + "{\"total_error\":" + + "{\"sum\":{\"field\":\"error\"}}}}],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":" + + "{\"field\":" + + "\"cpu\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"detection_interval\":" + + "{\"period\":{\"interval\":1,\"unit\":\"Minutes\"}}," + + "\"window_delay\":{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}}," + + "\"shingle_size\": 8}" + ), + null + ); + Map responseMap = entityAsMap(resp); + assertEquals("no issue, empty response body", new HashMap(), responseMap); + } + public void testValidateAnomalyDetectorWithIncorrectShingleSize() throws Exception { TestHelpers.createIndex(client(), "test-index", TestHelpers.toHttpEntity("{\"timestamp\": " + Instant.now().toEpochMilli() + "}")); Response resp = TestHelpers diff --git a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java index 8550dce8a..53f6f0ab5 100644 --- a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java @@ -490,9 +490,8 @@ public void testValidateAnomalyDetectorWithNonExistentTimefield() throws IOExcep ValidateConfigResponse response = client().execute(ValidateAnomalyDetectorAction.INSTANCE, request).actionGet(5_000); assertEquals(ValidationIssueType.TIMEFIELD_FIELD, response.getIssue().getType()); assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect()); - assertEquals( - String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, anomalyDetector.getTimeField()), - response.getIssue().getMessage() + assertTrue( + response.getIssue().getMessage().contains("Timestamp field: (" + anomalyDetector.getTimeField() + ") is not found in the") ); } @@ -513,9 +512,11 @@ public void testValidateAnomalyDetectorWithNonDateTimeField() throws IOException ValidateConfigResponse response = client().execute(ValidateAnomalyDetectorAction.INSTANCE, request).actionGet(5_000); assertEquals(ValidationIssueType.TIMEFIELD_FIELD, response.getIssue().getType()); assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect()); - assertEquals( - String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField()), - response.getIssue().getMessage() + assertTrue( + response + .getIssue() + .getMessage() + .contains(String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField())) ); } diff --git a/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java b/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java new file mode 100644 index 000000000..dfc46c6d8 --- /dev/null +++ b/src/test/java/org/opensearch/timeseries/util/CrossClusterConfigUtilsTests.java @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.timeseries.util; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Before; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.test.OpenSearchTestCase; + +public class CrossClusterConfigUtilsTests extends OpenSearchTestCase { + + private Client mockClient; + private String remoteClusterName; + private String localClusterName; + + @Before + public void setup() { + // Initialize the mock clients + mockClient = mock(NodeClient.class); + localClusterName = "localCluster"; + remoteClusterName = "remoteCluster"; + } + + public void testGetClientForClusterLocalCluster() { + Client result = CrossClusterConfigUtils.getClientForCluster(localClusterName, mockClient, localClusterName); + assertEquals(mockClient, result); + verify(mockClient, never()).getRemoteClusterClient(anyString()); + } + + public void testGetClientForClusterRemoteCluster() { + Client mockClient = mock(NodeClient.class); + CrossClusterConfigUtils.getClientForCluster(remoteClusterName, mockClient, localClusterName); + + // Verify that getRemoteClusterClient was called once with the correct cluster name + verify(mockClient, times(1)).getRemoteClusterClient("remoteCluster"); + when(mockClient.getRemoteClusterClient(remoteClusterName)).thenReturn(mockClient); + } + + public void testSeparateClusterIndexesRemoteCluster() { + List indexes = Arrays.asList("remoteCluster:index1", "index2", "remoteCluster2:index2"); + ClusterService mockClusterService = mock(ClusterService.class); + when(mockClusterService.getClusterName()).thenReturn(new ClusterName("localCluster")); + + HashMap> result = CrossClusterConfigUtils.separateClusterIndexes(indexes, mockClusterService); + + assertEquals(3, result.size()); + assertEquals(Arrays.asList("index1"), result.get("remoteCluster")); + assertEquals(Arrays.asList("index2"), result.get("localCluster")); + assertEquals(Arrays.asList("index2"), result.get("remoteCluster2")); + } + + public void testParseClusterAndIndexName_WithClusterAndIndex() { + String input = "clusterA:index1"; + Pair result = CrossClusterConfigUtils.parseClusterAndIndexName(input); + assertEquals("clusterA", result.getKey()); + assertEquals("index1", result.getValue()); + } +}