diff --git a/build.gradle b/build.gradle index 920972d6a..5d7dae4da 100644 --- a/build.gradle +++ b/build.gradle @@ -725,6 +725,7 @@ List jacocoExclusions = [ 'org.opensearch.timeseries.transport.CronRequest', 'org.opensearch.ad.task.ADBatchTaskCache', 'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker', + 'org.opensearch.timeseries.util.TimeUtil', ] diff --git a/src/main/java/org/opensearch/ad/ml/ADInferencer.java b/src/main/java/org/opensearch/ad/ml/ADRealTimeInferencer.java similarity index 78% rename from src/main/java/org/opensearch/ad/ml/ADInferencer.java rename to src/main/java/org/opensearch/ad/ml/ADRealTimeInferencer.java index 26e6c032f..94d2223a3 100644 --- a/src/main/java/org/opensearch/ad/ml/ADInferencer.java +++ b/src/main/java/org/opensearch/ad/ml/ADRealTimeInferencer.java @@ -16,16 +16,16 @@ import org.opensearch.ad.ratelimit.ADColdStartWorker; import org.opensearch.ad.ratelimit.ADSaveResultStrategy; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.timeseries.ml.Inferencer; +import org.opensearch.timeseries.ml.RealTimeInferencer; import org.opensearch.timeseries.stats.StatNames; import org.opensearch.timeseries.stats.Stats; import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; -public class ADInferencer extends - Inferencer { +public class ADRealTimeInferencer extends + RealTimeInferencer { - public ADInferencer( + public ADRealTimeInferencer( ADModelManager modelManager, Stats stats, ADCheckpointDao checkpointDao, diff --git a/src/main/java/org/opensearch/ad/ratelimit/ADCheckpointReadWorker.java b/src/main/java/org/opensearch/ad/ratelimit/ADCheckpointReadWorker.java index 7a0fe75c8..f94911163 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/ADCheckpointReadWorker.java +++ b/src/main/java/org/opensearch/ad/ratelimit/ADCheckpointReadWorker.java @@ -24,8 +24,8 @@ import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.ml.ADCheckpointDao; import org.opensearch.ad.ml.ADColdStart; -import org.opensearch.ad.ml.ADInferencer; import org.opensearch.ad.ml.ADModelManager; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.ad.ml.ThresholdingResult; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.cluster.service.ClusterService; @@ -54,7 +54,7 @@ * */ public class ADCheckpointReadWorker extends - CheckpointReadWorker { + CheckpointReadWorker { public static final String WORKER_NAME = "ad-checkpoint-read"; public ADCheckpointReadWorker( @@ -79,7 +79,7 @@ public ADCheckpointReadWorker( Provider cacheProvider, Duration stateTtl, ADCheckpointWriteWorker checkpointWriteQueue, - ADInferencer inferencer + ADRealTimeInferencer inferencer ) { super( WORKER_NAME, diff --git a/src/main/java/org/opensearch/ad/ratelimit/ADColdEntityWorker.java b/src/main/java/org/opensearch/ad/ratelimit/ADColdEntityWorker.java index aa92704fe..466a5fbee 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/ADColdEntityWorker.java +++ b/src/main/java/org/opensearch/ad/ratelimit/ADColdEntityWorker.java @@ -23,8 +23,8 @@ import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.ml.ADCheckpointDao; import org.opensearch.ad.ml.ADColdStart; -import org.opensearch.ad.ml.ADInferencer; import org.opensearch.ad.ml.ADModelManager; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.ad.ml.ThresholdingResult; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.cluster.service.ClusterService; @@ -55,7 +55,7 @@ * */ public class ADColdEntityWorker extends - ColdEntityWorker { + ColdEntityWorker { public static final String WORKER_NAME = "ad-cold-entity"; public ADColdEntityWorker( diff --git a/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java b/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java index 6f0e442bf..52fdc0c6b 100644 --- a/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java @@ -18,7 +18,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.nodes.TransportNodesAction; import org.opensearch.ad.caching.ADCacheProvider; -import org.opensearch.ad.ml.ADInferencer; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.common.io.stream.StreamInput; @@ -41,7 +41,7 @@ public class ADHCImputeTransportAction extends private ADCacheProvider cache; private NodeStateManager nodeStateManager; - private ADInferencer adInferencer; + private ADRealTimeInferencer adInferencer; @Inject public ADHCImputeTransportAction( @@ -51,7 +51,7 @@ public ADHCImputeTransportAction( ActionFilters actionFilters, ADCacheProvider priorityCache, NodeStateManager nodeStateManager, - ADInferencer adInferencer + ADRealTimeInferencer adInferencer ) { super( ADHCImputeAction.NAME, @@ -104,9 +104,9 @@ protected ADHCImputeNodeResponse nodeOperation(ADHCImputeNodeRequest nodeRequest long executionEndTime = dataEndMillis + windowDelayMillis; String taskId = nodeRequest.getRequest().getTaskId(); for (ModelState modelState : cache.get().getAllModels(configId)) { - // execution end time (when job starts execution in this interval) > last used time => the model state is updated in + // execution end time (when job starts execution in this interval) >= last used time => the model state is updated in // previous intervals - if (executionEndTime > modelState.getLastUsedTime().toEpochMilli()) { + if (executionEndTime >= modelState.getLastUsedTime().toEpochMilli()) { double[] nanArray = new double[featureSize]; Arrays.fill(nanArray, Double.NaN); adInferencer diff --git a/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java index 6995c5b36..83283468e 100644 --- a/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java @@ -13,8 +13,8 @@ import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.ml.ADCheckpointDao; import org.opensearch.ad.ml.ADColdStart; -import org.opensearch.ad.ml.ADInferencer; import org.opensearch.ad.ml.ADModelManager; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.ad.ml.ThresholdingResult; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.ratelimit.ADCheckpointMaintainWorker; @@ -37,7 +37,7 @@ import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; public class ADSingleStreamResultTransportAction extends - AbstractSingleStreamResultTransportAction { + AbstractSingleStreamResultTransportAction { @Inject public ADSingleStreamResultTransportAction( @@ -47,7 +47,7 @@ public ADSingleStreamResultTransportAction( ADCacheProvider cache, NodeStateManager stateManager, ADCheckpointReadWorker checkpointReadQueue, - ADInferencer inferencer, + ADRealTimeInferencer inferencer, ThreadPool threadPool ) { super( diff --git a/src/main/java/org/opensearch/ad/transport/EntityADResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/EntityADResultTransportAction.java index 3712b9f1f..5aba6acc1 100644 --- a/src/main/java/org/opensearch/ad/transport/EntityADResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/EntityADResultTransportAction.java @@ -24,8 +24,8 @@ import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.ml.ADCheckpointDao; import org.opensearch.ad.ml.ADColdStart; -import org.opensearch.ad.ml.ADInferencer; import org.opensearch.ad.ml.ADModelManager; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.ad.ml.ThresholdingResult; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.ratelimit.ADCheckpointReadWorker; @@ -77,12 +77,12 @@ public class EntityADResultTransportAction extends HandledTransportAction cache; private final NodeStateManager stateManager; private ThreadPool threadPool; - private EntityResultProcessor intervalDataProcessor; + private EntityResultProcessor intervalDataProcessor; private final ADCacheProvider entityCache; private final ADCheckpointReadWorker checkpointReadQueue; private final ADColdEntityWorker coldEntityQueue; - private final ADInferencer inferencer; + private final ADRealTimeInferencer inferencer; @Inject public EntityADResultTransportAction( @@ -95,7 +95,7 @@ public EntityADResultTransportAction( ADCheckpointReadWorker checkpointReadQueue, ADColdEntityWorker coldEntityQueue, ThreadPool threadPool, - ADInferencer inferencer + ADRealTimeInferencer inferencer ) { super(EntityADResultAction.NAME, transportService, actionFilters, EntityResultRequest::new); this.adCircuitBreakerService = adCircuitBreakerService; diff --git a/src/main/java/org/opensearch/forecast/ml/ForecastInferencer.java b/src/main/java/org/opensearch/forecast/ml/ForecastRealTimeInferencer.java similarity index 78% rename from src/main/java/org/opensearch/forecast/ml/ForecastInferencer.java rename to src/main/java/org/opensearch/forecast/ml/ForecastRealTimeInferencer.java index 793b21995..d2373dfce 100644 --- a/src/main/java/org/opensearch/forecast/ml/ForecastInferencer.java +++ b/src/main/java/org/opensearch/forecast/ml/ForecastRealTimeInferencer.java @@ -16,16 +16,16 @@ import org.opensearch.forecast.ratelimit.ForecastColdStartWorker; import org.opensearch.forecast.ratelimit.ForecastSaveResultStrategy; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.timeseries.ml.Inferencer; +import org.opensearch.timeseries.ml.RealTimeInferencer; import org.opensearch.timeseries.stats.StatNames; import org.opensearch.timeseries.stats.Stats; import com.amazon.randomcutforest.parkservices.RCFCaster; -public class ForecastInferencer extends - Inferencer { +public class ForecastRealTimeInferencer extends + RealTimeInferencer { - public ForecastInferencer( + public ForecastRealTimeInferencer( ForecastModelManager modelManager, Stats stats, ForecastCheckpointDao checkpointDao, diff --git a/src/main/java/org/opensearch/forecast/ratelimit/ForecastCheckpointReadWorker.java b/src/main/java/org/opensearch/forecast/ratelimit/ForecastCheckpointReadWorker.java index 652b38ec2..efa2b7c1b 100644 --- a/src/main/java/org/opensearch/forecast/ratelimit/ForecastCheckpointReadWorker.java +++ b/src/main/java/org/opensearch/forecast/ratelimit/ForecastCheckpointReadWorker.java @@ -22,8 +22,8 @@ import org.opensearch.forecast.indices.ForecastIndexManagement; import org.opensearch.forecast.ml.ForecastCheckpointDao; import org.opensearch.forecast.ml.ForecastColdStart; -import org.opensearch.forecast.ml.ForecastInferencer; import org.opensearch.forecast.ml.ForecastModelManager; +import org.opensearch.forecast.ml.ForecastRealTimeInferencer; import org.opensearch.forecast.ml.RCFCasterResult; import org.opensearch.forecast.model.ForecastResult; import org.opensearch.threadpool.ThreadPool; @@ -36,7 +36,7 @@ import com.amazon.randomcutforest.parkservices.RCFCaster; public class ForecastCheckpointReadWorker extends - CheckpointReadWorker { + CheckpointReadWorker { public static final String WORKER_NAME = "forecast-checkpoint-read"; public ForecastCheckpointReadWorker( @@ -61,7 +61,7 @@ public ForecastCheckpointReadWorker( Provider cacheProvider, Duration stateTtl, ForecastCheckpointWriteWorker checkpointWriteQueue, - ForecastInferencer inferencer + ForecastRealTimeInferencer inferencer ) { super( WORKER_NAME, diff --git a/src/main/java/org/opensearch/forecast/ratelimit/ForecastColdEntityWorker.java b/src/main/java/org/opensearch/forecast/ratelimit/ForecastColdEntityWorker.java index dcc1dca6f..c79b31214 100644 --- a/src/main/java/org/opensearch/forecast/ratelimit/ForecastColdEntityWorker.java +++ b/src/main/java/org/opensearch/forecast/ratelimit/ForecastColdEntityWorker.java @@ -20,8 +20,8 @@ import org.opensearch.forecast.indices.ForecastIndexManagement; import org.opensearch.forecast.ml.ForecastCheckpointDao; import org.opensearch.forecast.ml.ForecastColdStart; -import org.opensearch.forecast.ml.ForecastInferencer; import org.opensearch.forecast.ml.ForecastModelManager; +import org.opensearch.forecast.ml.ForecastRealTimeInferencer; import org.opensearch.forecast.ml.RCFCasterResult; import org.opensearch.forecast.model.ForecastResult; import org.opensearch.threadpool.ThreadPool; @@ -49,7 +49,7 @@ * */ public class ForecastColdEntityWorker extends - ColdEntityWorker { + ColdEntityWorker { public static final String WORKER_NAME = "forecast-cold-entity"; public ForecastColdEntityWorker( diff --git a/src/main/java/org/opensearch/forecast/transport/EntityForecastResultTransportAction.java b/src/main/java/org/opensearch/forecast/transport/EntityForecastResultTransportAction.java index 9d58ec049..17e7e2be9 100644 --- a/src/main/java/org/opensearch/forecast/transport/EntityForecastResultTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/EntityForecastResultTransportAction.java @@ -25,8 +25,8 @@ import org.opensearch.forecast.indices.ForecastIndexManagement; import org.opensearch.forecast.ml.ForecastCheckpointDao; import org.opensearch.forecast.ml.ForecastColdStart; -import org.opensearch.forecast.ml.ForecastInferencer; import org.opensearch.forecast.ml.ForecastModelManager; +import org.opensearch.forecast.ml.ForecastRealTimeInferencer; import org.opensearch.forecast.ml.RCFCasterResult; import org.opensearch.forecast.model.ForecastResult; import org.opensearch.forecast.ratelimit.ForecastCheckpointReadWorker; @@ -77,12 +77,12 @@ public class EntityForecastResultTransportAction extends HandledTransportAction< private CacheProvider cache; private final NodeStateManager stateManager; private ThreadPool threadPool; - private EntityResultProcessor intervalDataProcessor; + private EntityResultProcessor intervalDataProcessor; private final ForecastCacheProvider entityCache; private final ForecastCheckpointReadWorker checkpointReadQueue; private final ForecastColdEntityWorker coldEntityQueue; - private final ForecastInferencer inferencer; + private final ForecastRealTimeInferencer inferencer; @Inject public EntityForecastResultTransportAction( @@ -96,7 +96,7 @@ public EntityForecastResultTransportAction( ForecastCheckpointReadWorker checkpointReadQueue, ForecastColdEntityWorker coldEntityQueue, ThreadPool threadPool, - ForecastInferencer inferencer + ForecastRealTimeInferencer inferencer ) { super(EntityForecastResultAction.NAME, transportService, actionFilters, EntityResultRequest::new); this.circuitBreakerService = adCircuitBreakerService; diff --git a/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java b/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java index 5a0aee36b..6b3a09835 100644 --- a/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java @@ -16,8 +16,8 @@ import org.opensearch.forecast.indices.ForecastIndexManagement; import org.opensearch.forecast.ml.ForecastCheckpointDao; import org.opensearch.forecast.ml.ForecastColdStart; -import org.opensearch.forecast.ml.ForecastInferencer; import org.opensearch.forecast.ml.ForecastModelManager; +import org.opensearch.forecast.ml.ForecastRealTimeInferencer; import org.opensearch.forecast.ml.RCFCasterResult; import org.opensearch.forecast.model.ForecastResult; import org.opensearch.forecast.ratelimit.ForecastCheckpointMaintainWorker; @@ -39,7 +39,7 @@ import com.amazon.randomcutforest.parkservices.RCFCaster; public class ForecastSingleStreamResultTransportAction extends - AbstractSingleStreamResultTransportAction { + AbstractSingleStreamResultTransportAction { private static final Logger LOG = LogManager.getLogger(ForecastSingleStreamResultTransportAction.class); @@ -51,7 +51,7 @@ public ForecastSingleStreamResultTransportAction( ForecastCacheProvider cache, NodeStateManager stateManager, ForecastCheckpointReadWorker checkpointReadQueue, - ForecastInferencer inferencer, + ForecastRealTimeInferencer inferencer, ThreadPool threadPool ) { super( diff --git a/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java b/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java index c1ce884a4..3898fa9f0 100644 --- a/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java @@ -13,11 +13,19 @@ import static java.util.Collections.unmodifiableList; import static org.opensearch.ad.constant.ADCommonName.ANOMALY_RESULT_INDEX_ALIAS; +import static org.opensearch.ad.constant.ADCommonName.CHECKPOINT_INDEX_NAME; +import static org.opensearch.ad.constant.ADCommonName.DETECTION_STATE_INDEX; +import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN; import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_COOLDOWN_MINUTES; +import static org.opensearch.forecast.constant.ForecastCommonName.FORECAST_CHECKPOINT_INDEX_NAME; +import static org.opensearch.forecast.constant.ForecastCommonName.FORECAST_STATE_INDEX; +import static org.opensearch.timeseries.constant.CommonName.CONFIG_INDEX; +import static org.opensearch.timeseries.constant.CommonName.JOB_INDEX; import java.security.AccessController; import java.security.PrivilegedAction; import java.time.Clock; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -46,8 +54,8 @@ import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.ml.ADCheckpointDao; import org.opensearch.ad.ml.ADColdStart; -import org.opensearch.ad.ml.ADInferencer; import org.opensearch.ad.ml.ADModelManager; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.ad.ml.HybridThresholdingModel; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyResult; @@ -176,8 +184,8 @@ import org.opensearch.forecast.indices.ForecastIndexManagement; import org.opensearch.forecast.ml.ForecastCheckpointDao; import org.opensearch.forecast.ml.ForecastColdStart; -import org.opensearch.forecast.ml.ForecastInferencer; import org.opensearch.forecast.ml.ForecastModelManager; +import org.opensearch.forecast.ml.ForecastRealTimeInferencer; import org.opensearch.forecast.model.ForecastResult; import org.opensearch.forecast.model.Forecaster; import org.opensearch.forecast.ratelimit.ForecastCheckpointMaintainWorker; @@ -253,6 +261,7 @@ import org.opensearch.forecast.transport.ValidateForecasterTransportAction; import org.opensearch.forecast.transport.handler.ForecastIndexMemoryPressureAwareResultHandler; import org.opensearch.forecast.transport.handler.ForecastSearchHandler; +import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.jobscheduler.spi.JobSchedulerExtension; import org.opensearch.jobscheduler.spi.ScheduledJobParser; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; @@ -261,6 +270,7 @@ import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.ScriptPlugin; +import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; @@ -317,7 +327,7 @@ /** * Entry point of time series analytics plugin. */ -public class TimeSeriesAnalyticsPlugin extends Plugin implements ActionPlugin, ScriptPlugin, JobSchedulerExtension { +public class TimeSeriesAnalyticsPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension { private static final Logger LOG = LogManager.getLogger(TimeSeriesAnalyticsPlugin.class); @@ -823,7 +833,7 @@ public PooledObject wrap(LinkedBuffer obj) { adStats = new ADStats(adStatsMap); - ADInferencer adInferencer = new ADInferencer( + ADRealTimeInferencer adInferencer = new ADRealTimeInferencer( adModelManager, adStats, adCheckpoint, @@ -1213,7 +1223,7 @@ public PooledObject wrap(LinkedBuffer obj) { forecastStats = new ForecastStats(forecastStatsMap); - ForecastInferencer forecastInferencer = new ForecastInferencer( + ForecastRealTimeInferencer forecastInferencer = new ForecastRealTimeInferencer( forecastModelManager, forecastStats, forecastCheckpoint, @@ -1695,6 +1705,19 @@ public List getNamedXContent() { ); } + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + List systemIndexDescriptors = new ArrayList<>(); + systemIndexDescriptors.add(new SystemIndexDescriptor(CONFIG_INDEX, "Time Series Analytics config index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(ALL_AD_RESULTS_INDEX_PATTERN, "AD result index pattern")); + systemIndexDescriptors.add(new SystemIndexDescriptor(CHECKPOINT_INDEX_NAME, "AD Checkpoints index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(DETECTION_STATE_INDEX, "AD State index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(FORECAST_CHECKPOINT_INDEX_NAME, "Forecast Checkpoints index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(FORECAST_STATE_INDEX, "Forecast state index")); + systemIndexDescriptors.add(new SystemIndexDescriptor(JOB_INDEX, "Time Series Analytics job index")); + return systemIndexDescriptors; + } + @Override public String getJobType() { return TIME_SERIES_JOB_TYPE; diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java index 8e0a7a537..059947f91 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java @@ -74,6 +74,7 @@ public static String getTooManyCategoricalFieldErr(int limit) { + " characters."; public static final String INDEX_NOT_FOUND = "index does not exist"; public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s"; + public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config"; // ====================================== // Index message diff --git a/src/main/java/org/opensearch/timeseries/ml/Inferencer.java b/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java similarity index 71% rename from src/main/java/org/opensearch/timeseries/ml/Inferencer.java rename to src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java index ff7cdca3a..9ca57309d 100644 --- a/src/main/java/org/opensearch/timeseries/ml/Inferencer.java +++ b/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java @@ -25,18 +25,22 @@ import org.opensearch.timeseries.indices.TimeSeriesIndex; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.IndexableResult; +import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.ratelimit.CheckpointWriteWorker; import org.opensearch.timeseries.ratelimit.ColdStartWorker; import org.opensearch.timeseries.ratelimit.FeatureRequest; import org.opensearch.timeseries.ratelimit.RequestPriority; import org.opensearch.timeseries.ratelimit.SaveResultStrategy; import org.opensearch.timeseries.stats.Stats; -import org.opensearch.timeseries.util.TimeUtil; import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; -public abstract class Inferencer, IndexType extends Enum & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, CheckpointWriterType extends CheckpointWriteWorker, ColdStarterType extends ModelColdStart, ModelManagerType extends ModelManager, SaveResultStrategyType extends SaveResultStrategy, CacheType extends TimeSeriesCache, ColdStartWorkerType extends ColdStartWorker> { - private static final Logger LOG = LogManager.getLogger(Inferencer.class); +/** + * Since we assume model state's last access time is current time and compare it with incoming data's execution time, + * this class is only meant to be used by real time analysis. + */ +public abstract class RealTimeInferencer, IndexType extends Enum & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, CheckpointWriterType extends CheckpointWriteWorker, ColdStarterType extends ModelColdStart, ModelManagerType extends ModelManager, SaveResultStrategyType extends SaveResultStrategy, CacheType extends TimeSeriesCache, ColdStartWorkerType extends ColdStartWorker> { + private static final Logger LOG = LogManager.getLogger(RealTimeInferencer.class); protected ModelManagerType modelManager; protected Stats stats; private String modelCorruptionStat; @@ -48,7 +52,7 @@ public abstract class Inferencer modelState, Config config, String taskId) { - long expiryEpoch = TimeUtil.calculateTimeoutMillis(config, sample.getDataEndTime().toEpochMilli()); - return processWithTimeout(sample, modelState, config, taskId, expiryEpoch); + long windowDelayMillis = config.getWindowDelay() == null + ? 0 + : ((IntervalTimeConfiguration) config.getWindowDelay()).toDuration().toMillis(); + long curExecutionEnd = sample.getDataEndTime().toEpochMilli() + windowDelayMillis; + long nextExecutionEnd = curExecutionEnd + config.getIntervalInMilliseconds(); + + return processWithTimeout(sample, modelState, config, taskId, curExecutionEnd, nextExecutionEnd); } - private boolean processWithTimeout(Sample sample, ModelState modelState, Config config, String taskId, long expiryEpoch) { + private boolean processWithTimeout( + Sample sample, + ModelState modelState, + Config config, + String taskId, + long curExecutionEnd, + long nextExecutionEnd + ) { String modelId = modelState.getModelId(); ReentrantLock lock = (ReentrantLock) modelLocks.computeIfAbsent(modelId, k -> new ReentrantLock()); if (lock.tryLock()) { try { - tryProcess(sample, modelState, config, taskId); + tryProcess(sample, modelState, config, taskId, curExecutionEnd); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); @@ -101,13 +117,13 @@ private boolean processWithTimeout(Sample sample, ModelState model } return true; } else { - if (System.currentTimeMillis() >= expiryEpoch) { + if (System.currentTimeMillis() >= nextExecutionEnd) { LOG.warn("Timeout reached, not retrying."); } else { // Schedule a retry in one second threadPool .schedule( - () -> processWithTimeout(sample, modelState, config, taskId, expiryEpoch), + () -> processWithTimeout(sample, modelState, config, taskId, curExecutionEnd, nextExecutionEnd), new TimeValue(1, TimeUnit.SECONDS), threadPoolName ); @@ -117,7 +133,14 @@ private boolean processWithTimeout(Sample sample, ModelState model } } - private boolean tryProcess(Sample sample, ModelState modelState, Config config, String taskId) { + private boolean tryProcess(Sample sample, ModelState modelState, Config config, String taskId, long curExecutionEnd) { + // execution end time (when job starts execution in this interval) >= last used time => the model state is updated in + // previous intervals + // This can happen while scheduled to waiting some other threads have already scored the same interval (e.g., during tests + // when everything happens fast) + if (curExecutionEnd < modelState.getLastUsedTime().toEpochMilli()) { + return false; + } String modelId = modelState.getModelId(); try { RCFResultType result = modelManager.getResult(sample, modelState, modelId, config, taskId); diff --git a/src/main/java/org/opensearch/timeseries/model/Config.java b/src/main/java/org/opensearch/timeseries/model/Config.java index 2e78818ad..f814a8832 100644 --- a/src/main/java/org/opensearch/timeseries/model/Config.java +++ b/src/main/java/org/opensearch/timeseries/model/Config.java @@ -219,18 +219,18 @@ protected Config( } if (imputationOption != null && imputationOption.getMethod() == ImputationMethod.FIXED_VALUES) { - Map defaultFill = imputationOption.getDefaultFill(); - if (defaultFill.isEmpty()) { - issueType = ValidationIssueType.IMPUTATION; - errorMessage = "No given values for fixed value interpolation"; - return; - } - // Calculate the number of enabled features List enabledFeatures = features == null ? null : features.stream().filter(Feature::getEnabled).collect(Collectors.toList()); + Map defaultFill = imputationOption.getDefaultFill(); + if (defaultFill.isEmpty() && enabledFeatures.size() > 0) { + issueType = ValidationIssueType.IMPUTATION; + errorMessage = "No given values for fixed value imputation"; + return; + } + // Check if the length of the defaultFill array matches the number of expected features if (enabledFeatures == null || defaultFill.size() != enabledFeatures.size()) { issueType = ValidationIssueType.IMPUTATION; @@ -762,27 +762,27 @@ public static List findRedundantNames(List features) { @Override public String toString() { return new ToStringBuilder(this) - .append("name", name) - .append("description", description) - .append("timeField", timeField) - .append("indices", indices) - .append("featureAttributes", featureAttributes) - .append("filterQuery", filterQuery) - .append("interval", interval) - .append("windowDelay", windowDelay) - .append("shingleSize", shingleSize) - .append("categoryFields", categoryFields) - .append("schemaVersion", schemaVersion) - .append("user", user) - .append("customResultIndex", customResultIndexOrAlias) - .append("imputationOption", imputationOption) - .append("recencyEmphasis", recencyEmphasis) - .append("seasonIntervals", seasonIntervals) - .append("historyIntervals", historyIntervals) - .append("customResultIndexMinSize", customResultIndexMinSize) - .append("customResultIndexMinAge", customResultIndexMinAge) - .append("customResultIndexTTL", customResultIndexTTL) - .append("flattenResultIndexMapping", flattenResultIndexMapping) - .toString(); + .append("name", name) + .append("description", description) + .append("timeField", timeField) + .append("indices", indices) + .append("featureAttributes", featureAttributes) + .append("filterQuery", filterQuery) + .append("interval", interval) + .append("windowDelay", windowDelay) + .append("shingleSize", shingleSize) + .append("categoryFields", categoryFields) + .append("schemaVersion", schemaVersion) + .append("user", user) + .append("customResultIndex", customResultIndexOrAlias) + .append("imputationOption", imputationOption) + .append("recencyEmphasis", recencyEmphasis) + .append("seasonIntervals", seasonIntervals) + .append("historyIntervals", historyIntervals) + .append("customResultIndexMinSize", customResultIndexMinSize) + .append("customResultIndexMinAge", customResultIndexMinAge) + .append("customResultIndexTTL", customResultIndexTTL) + .append("flattenResultIndexMapping", flattenResultIndexMapping) + .toString(); } } diff --git a/src/main/java/org/opensearch/timeseries/ratelimit/CheckpointReadWorker.java b/src/main/java/org/opensearch/timeseries/ratelimit/CheckpointReadWorker.java index 41f145b72..e1c7c5ea0 100644 --- a/src/main/java/org/opensearch/timeseries/ratelimit/CheckpointReadWorker.java +++ b/src/main/java/org/opensearch/timeseries/ratelimit/CheckpointReadWorker.java @@ -44,11 +44,11 @@ import org.opensearch.timeseries.indices.IndexManagement; import org.opensearch.timeseries.indices.TimeSeriesIndex; import org.opensearch.timeseries.ml.CheckpointDao; -import org.opensearch.timeseries.ml.Inferencer; import org.opensearch.timeseries.ml.IntermediateResult; import org.opensearch.timeseries.ml.ModelColdStart; import org.opensearch.timeseries.ml.ModelManager; import org.opensearch.timeseries.ml.ModelState; +import org.opensearch.timeseries.ml.RealTimeInferencer; import org.opensearch.timeseries.ml.Sample; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.IndexableResult; @@ -57,7 +57,7 @@ import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; -public abstract class CheckpointReadWorker, IndexType extends Enum & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointType extends CheckpointDao, CheckpointWriteWorkerType extends CheckpointWriteWorker, ColdStarterType extends ModelColdStart, ModelManagerType extends ModelManager, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, InferencerType extends Inferencer> +public abstract class CheckpointReadWorker, IndexType extends Enum & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointType extends CheckpointDao, CheckpointWriteWorkerType extends CheckpointWriteWorker, ColdStarterType extends ModelColdStart, ModelManagerType extends ModelManager, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, InferencerType extends RealTimeInferencer> extends BatchWorker { private static final Logger LOG = LogManager.getLogger(CheckpointReadWorker.class); diff --git a/src/main/java/org/opensearch/timeseries/ratelimit/ColdEntityWorker.java b/src/main/java/org/opensearch/timeseries/ratelimit/ColdEntityWorker.java index 5e3f20196..c5eb088ad 100644 --- a/src/main/java/org/opensearch/timeseries/ratelimit/ColdEntityWorker.java +++ b/src/main/java/org/opensearch/timeseries/ratelimit/ColdEntityWorker.java @@ -28,15 +28,15 @@ import org.opensearch.timeseries.indices.IndexManagement; import org.opensearch.timeseries.indices.TimeSeriesIndex; import org.opensearch.timeseries.ml.CheckpointDao; -import org.opensearch.timeseries.ml.Inferencer; import org.opensearch.timeseries.ml.IntermediateResult; import org.opensearch.timeseries.ml.ModelColdStart; import org.opensearch.timeseries.ml.ModelManager; +import org.opensearch.timeseries.ml.RealTimeInferencer; import org.opensearch.timeseries.model.IndexableResult; import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; -public class ColdEntityWorker & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, RCFResultType extends IntermediateResult, ModelManagerType extends ModelManager, CheckpointWriteWorkerType extends CheckpointWriteWorker, ColdStarterType extends ModelColdStart, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, InferencerType extends Inferencer, CheckpointReadWorkerType extends CheckpointReadWorker> +public class ColdEntityWorker & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, RCFResultType extends IntermediateResult, ModelManagerType extends ModelManager, CheckpointWriteWorkerType extends CheckpointWriteWorker, ColdStarterType extends ModelColdStart, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, InferencerType extends RealTimeInferencer, CheckpointReadWorkerType extends CheckpointReadWorker> extends ScheduledWorker { public ColdEntityWorker( diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java index 251512cff..bba0a4f09 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java @@ -482,7 +482,7 @@ protected void createConfig(boolean indexingDryRun, ActionListener listener) searchRequest, ActionListener .wrap( - response -> onSearchSingleStreamConfigResponse(response, indexingDryRun, listener), + response -> onSearchTotalConfigResponse(response, indexingDryRun, listener), exception -> listener.onFailure(exception) ) ); @@ -496,7 +496,7 @@ protected void createConfig(boolean indexingDryRun, ActionListener listener) } } - protected void onSearchSingleStreamConfigResponse(SearchResponse response, boolean indexingDryRun, ActionListener listener) + protected void onSearchTotalConfigResponse(SearchResponse response, boolean indexingDryRun, ActionListener listener) throws IOException { if (response.getHits().getTotalHits().value >= getMaxSingleStreamConfigs()) { String errorMsgSingleEntity = getExceedMaxSingleStreamConfigsErrorMsg(getMaxSingleStreamConfigs()); diff --git a/src/main/java/org/opensearch/timeseries/transport/AbstractSingleStreamResultTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/AbstractSingleStreamResultTransportAction.java index 3cf3aa8dc..98ae9ee24 100644 --- a/src/main/java/org/opensearch/timeseries/transport/AbstractSingleStreamResultTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/AbstractSingleStreamResultTransportAction.java @@ -30,11 +30,11 @@ import org.opensearch.timeseries.indices.IndexManagement; import org.opensearch.timeseries.indices.TimeSeriesIndex; import org.opensearch.timeseries.ml.CheckpointDao; -import org.opensearch.timeseries.ml.Inferencer; import org.opensearch.timeseries.ml.IntermediateResult; import org.opensearch.timeseries.ml.ModelColdStart; import org.opensearch.timeseries.ml.ModelManager; import org.opensearch.timeseries.ml.ModelState; +import org.opensearch.timeseries.ml.RealTimeInferencer; import org.opensearch.timeseries.ml.Sample; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.IndexableResult; @@ -52,7 +52,7 @@ import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; -public abstract class AbstractSingleStreamResultTransportAction & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, CheckpointWriterType extends CheckpointWriteWorker, CheckpointMaintainerType extends CheckpointMaintainWorker, CacheBufferType extends CacheBuffer, PriorityCacheType extends PriorityCache, CacheProviderType extends CacheProvider, ResultType extends IndexableResult, RCFResultType extends IntermediateResult, ColdStarterType extends ModelColdStart, ModelManagerType extends ModelManager, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, InferencerType extends Inferencer, CheckpointReadWorkerType extends CheckpointReadWorker, ResultWriteRequestType extends ResultWriteRequest> +public abstract class AbstractSingleStreamResultTransportAction & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, CheckpointWriterType extends CheckpointWriteWorker, CheckpointMaintainerType extends CheckpointMaintainWorker, CacheBufferType extends CacheBuffer, PriorityCacheType extends PriorityCache, CacheProviderType extends CacheProvider, ResultType extends IndexableResult, RCFResultType extends IntermediateResult, ColdStarterType extends ModelColdStart, ModelManagerType extends ModelManager, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, InferencerType extends RealTimeInferencer, CheckpointReadWorkerType extends CheckpointReadWorker, ResultWriteRequestType extends ResultWriteRequest> extends HandledTransportAction { private static final Logger LOG = LogManager.getLogger(AbstractSingleStreamResultTransportAction.class); protected CircuitBreakerService circuitBreakerService; diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java index f3fe74608..3b6ad29d9 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java @@ -6,7 +6,7 @@ package org.opensearch.timeseries.transport; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.opensearch.forecast.constant.ForecastCommonMessages.FAIL_TO_GET_FORECASTER; +import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_GET_CONFIG_MSG; import static org.opensearch.timeseries.util.ParseUtils.resolveUserAndExecute; import static org.opensearch.timeseries.util.RestHandlerUtils.PROFILE; import static org.opensearch.timeseries.util.RestHandlerUtils.wrapRestActionListener; @@ -161,7 +161,7 @@ public void doExecute(Task task, ActionRequest request, ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_FORECASTER); + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_CONFIG_MSG); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute( user, diff --git a/src/main/java/org/opensearch/timeseries/transport/EntityResultProcessor.java b/src/main/java/org/opensearch/timeseries/transport/EntityResultProcessor.java index 484265b49..4d9722097 100644 --- a/src/main/java/org/opensearch/timeseries/transport/EntityResultProcessor.java +++ b/src/main/java/org/opensearch/timeseries/transport/EntityResultProcessor.java @@ -28,11 +28,11 @@ import org.opensearch.timeseries.indices.IndexManagement; import org.opensearch.timeseries.indices.TimeSeriesIndex; import org.opensearch.timeseries.ml.CheckpointDao; -import org.opensearch.timeseries.ml.Inferencer; import org.opensearch.timeseries.ml.IntermediateResult; import org.opensearch.timeseries.ml.ModelColdStart; import org.opensearch.timeseries.ml.ModelManager; import org.opensearch.timeseries.ml.ModelState; +import org.opensearch.timeseries.ml.RealTimeInferencer; import org.opensearch.timeseries.ml.Sample; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.Entity; @@ -53,7 +53,7 @@ * (e.g., EntityForecastResultTransportAction) * */ -public class EntityResultProcessor, IndexType extends Enum & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, CheckpointWriteWorkerType extends CheckpointWriteWorker, ModelColdStartType extends ModelColdStart, ModelManagerType extends ModelManager, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, InferencerType extends Inferencer, HCCheckpointReadWorkerType extends CheckpointReadWorker, ColdEntityWorkerType extends ColdEntityWorker> { +public class EntityResultProcessor, IndexType extends Enum & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, CheckpointWriteWorkerType extends CheckpointWriteWorker, ModelColdStartType extends ModelColdStart, ModelManagerType extends ModelManager, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, InferencerType extends RealTimeInferencer, HCCheckpointReadWorkerType extends CheckpointReadWorker, ColdEntityWorkerType extends ColdEntityWorker> { private static final Logger LOG = LogManager.getLogger(EntityResultProcessor.class); diff --git a/src/main/resources/mappings/anomaly-detection-state.json b/src/main/resources/mappings/anomaly-detection-state.json index fcb360ba6..be37da1eb 100644 --- a/src/main/resources/mappings/anomaly-detection-state.json +++ b/src/main/resources/mappings/anomaly-detection-state.json @@ -1,7 +1,7 @@ { "dynamic": false, "_meta": { - "schema_version": 3 + "schema_version": 4 }, "properties": { "schema_version": { diff --git a/src/main/resources/mappings/config.json b/src/main/resources/mappings/config.json index 2dc4954c9..36663ad37 100644 --- a/src/main/resources/mappings/config.json +++ b/src/main/resources/mappings/config.json @@ -229,6 +229,9 @@ } } } + }, + "flatten_result_index_mapping": { + "type": "boolean" } } } \ No newline at end of file diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index bd1159047..c62e975cf 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -853,7 +853,8 @@ public void doE null, detector.getCustomResultIndexMinSize(), detector.getCustomResultIndexMinAge(), - detector.getCustomResultIndexTTL() + detector.getCustomResultIndexTTL(), + false ); try { listener.onResponse((Response) TestHelpers.createGetResponse(clone, clone.getId(), CommonName.CONFIG_INDEX)); diff --git a/src/test/java/org/opensearch/ad/bwc/ADBackwardsCompatibilityIT.java b/src/test/java/org/opensearch/ad/bwc/ADBackwardsCompatibilityIT.java index d7949adc3..9d75a8e18 100644 --- a/src/test/java/org/opensearch/ad/bwc/ADBackwardsCompatibilityIT.java +++ b/src/test/java/org/opensearch/ad/bwc/ADBackwardsCompatibilityIT.java @@ -468,5 +468,4 @@ private void verifyAnomalyDetectorCount(String uri, long expectedCount) throws E Integer count = (Integer) responseMap.get("count"); assertEquals(expectedCount, (long) count); } - } diff --git a/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java b/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java index 941af35a5..bcc2468cd 100644 --- a/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java +++ b/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java @@ -146,6 +146,7 @@ protected TrainResult ingestTrainData( * x + y - y' != x. */ long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes(); + Duration windowDelay = Duration.ofMinutes(windowDelayMinutes); return new TrainResult(null, data, rawDataTrainTestSplit, windowDelay, trainTime); } diff --git a/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java b/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java index 1fe3bcb6f..2f715041f 100644 --- a/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java +++ b/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java @@ -98,6 +98,7 @@ public void testHCFixed() throws Exception { } public void testHCPrevious() throws Exception { + lastSeen.clear(); int numberOfEntities = 2; AbstractSyntheticDataTest.MISSING_MODE mode = AbstractSyntheticDataTest.MISSING_MODE.NO_MISSING_DATA; diff --git a/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java b/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java index 50f067e0d..1b4bd98bb 100644 --- a/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java +++ b/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java @@ -16,6 +16,7 @@ import java.io.File; import java.io.FileReader; import java.nio.charset.Charset; +import java.time.Duration; import java.time.Instant; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; @@ -71,6 +72,10 @@ private void verifyAnomaly( String mapping = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\": \"date\"}," + " \"Feature1\": { \"type\": \"double\" }, \"Feature2\": { \"type\": \"double\" } } } }"; bulkIndexTrainData(datasetName, data, trainTestSplit, client, mapping); + + long windowDelayMinutes = getWindowDelayMinutes(data, trainTestSplit - 1, "timestamp"); + Duration windowDelay = Duration.ofMinutes(windowDelayMinutes); + // single-stream detector can use window delay 0 here because we give the run api the actual data time String detector = String .format( @@ -84,18 +89,22 @@ private void verifyAnomaly( + "\"schema_version\": 0 }", datasetName, intervalMinutes, - 0 + windowDelayMinutes ); String detectorId = createDetector(client, detector); - Instant trainTime = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(data.get(trainTestSplit - 1).get("timestamp").getAsString())); - Instant begin = trainTime; - Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); + Instant dataStartTime = Instant + .from(DateTimeFormatter.ISO_INSTANT.parse(data.get(trainTestSplit - 1).get("timestamp").getAsString())); + Instant dataEndTime = dataStartTime.plus(intervalMinutes, ChronoUnit.MINUTES); + Instant trainTime = dataToExecutionTime(dataStartTime, windowDelay); + ; + Instant executionStartTime = trainTime; + Instant executionEndTime = executionStartTime.plus(intervalMinutes, ChronoUnit.MINUTES); - simulateStartDetector(detectorId, begin, end, client, 1); - simulateWaitForInitDetector(detectorId, client, end, 1); + simulateStartDetector(detectorId, executionStartTime, executionEndTime, client, 1); + simulateWaitForInitDetector(detectorId, client, dataEndTime, 1); bulkIndexTestData(data, datasetName, trainTestSplit, client); - double[] testResults = getTestResults(detectorId, data, trainTestSplit, intervalMinutes, anomalies, client, 1); + double[] testResults = getTestResults(detectorId, data, trainTestSplit, intervalMinutes, anomalies, client, 1, windowDelay); verifyTestResults(testResults, anomalies, minPrecision, minRecall, maxError); } @@ -134,12 +143,16 @@ private double[] getTestResults( int intervalMinutes, List> anomalies, RestClient client, - int entitySize + int entitySize, + Duration windowDelay ) throws Exception { int errors = 0; for (int i = trainTestSplit; i < data.size(); i++) { - Instant begin = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(data.get(i).get("timestamp").getAsString())); + Instant begin = dataToExecutionTime( + Instant.from(DateTimeFormatter.ISO_INSTANT.parse(data.get(i).get("timestamp").getAsString())), + windowDelay + ); Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); try { runDetectionResult(detectorId, begin, end, client, entitySize); @@ -158,7 +171,7 @@ private double[] getTestResults( Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); try { List sourceList = getRealTimeAnomalyResult(detectorId, end, 1, client); - assertTrue("anomalyGrade cannot be negative", sourceList.size() == 1); + assertTrue("expect 1 result, but got " + sourceList.size(), sourceList.size() == 1); double anomalyGrade = getAnomalyGrade(sourceList.get(0)); assertTrue("anomalyGrade cannot be negative", anomalyGrade >= 0); if (anomalyGrade > 0) { @@ -209,4 +222,22 @@ private void bulkIndexTestData(List data, String datasetName, int tr Thread.sleep(1_000); waitAllSyncheticDataIngested(data.size(), datasetName, client); } + + protected long getWindowDelayMinutes(List data, int trainTestSplit, String timestamp) { + // e.g., "2019-11-02T00:59:00Z" + String trainTimeStr = data.get(trainTestSplit - 1).get("timestamp").getAsString(); + Instant trainTime = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(trainTimeStr)); + /* + * The {@code CompositeRetriever.PageIterator.hasNext()} method checks if a request is expired + * relative to the current system time. This method is designed to ensure that the execution time + * is set to either the current time or a future time to prevent premature expirations in our tests. + * + * Also, AD accepts windowDelay in the unit of minutes. Thus, we need to convert the delay in minutes. This will + * make it easier to search for results based on data end time. Otherwise, real data time and the converted + * data time from request time. + * Assume x = real data time. y= real window delay. y'= window delay in minutes. If y and y' are different, + * x + y - y' != x. + */ + return Duration.between(trainTime, Instant.now()).toMinutes(); + } } diff --git a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java index d4dd1878c..1f40fa1f9 100644 --- a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java +++ b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java @@ -48,8 +48,8 @@ import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.ml.ADCheckpointDao; -import org.opensearch.ad.ml.ADInferencer; import org.opensearch.ad.ml.ADModelManager; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.ad.ml.ThresholdingResult; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.settings.AnomalyDetectorSettings; @@ -103,7 +103,7 @@ public class CheckpointReadWorkerTests extends AbstractRateLimitingTest { FeatureRequest request, request2, request3; ClusterSettings clusterSettings; ADStats adStats; - ADInferencer inferencer; + ADRealTimeInferencer inferencer; @Override public void setUp() throws Exception { @@ -152,7 +152,15 @@ public void setUp() throws Exception { }; adStats = new ADStats(statsMap); - inferencer = new ADInferencer(modelManager, adStats, checkpoint, coldstartQueue, resultWriteStrategy, cacheProvider, threadPool); + inferencer = new ADRealTimeInferencer( + modelManager, + adStats, + checkpoint, + coldstartQueue, + resultWriteStrategy, + cacheProvider, + threadPool + ); // Integer.MAX_VALUE makes a huge heap worker = new ADCheckpointReadWorker( @@ -180,9 +188,33 @@ public void setUp() throws Exception { inferencer ); - request = new FeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, new double[] { 0 }, 0, entity, null); - request2 = new FeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, new double[] { 0 }, 0, entity2, null); - request3 = new FeatureRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, new double[] { 0 }, 0, entity3, null); + request = new FeatureRequest( + Integer.MAX_VALUE, + detectorId, + RequestPriority.MEDIUM, + new double[] { 0 }, + System.currentTimeMillis(), + entity, + null + ); + request2 = new FeatureRequest( + Integer.MAX_VALUE, + detectorId, + RequestPriority.MEDIUM, + new double[] { 0 }, + System.currentTimeMillis(), + entity2, + null + ); + request3 = new FeatureRequest( + Integer.MAX_VALUE, + detectorId, + RequestPriority.MEDIUM, + new double[] { 0 }, + System.currentTimeMillis(), + entity3, + null + ); } static class RegularSetUpConfig { diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java index 99762c5b1..c4556fe0a 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java @@ -70,8 +70,8 @@ import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.ml.ADCheckpointDao; -import org.opensearch.ad.ml.ADInferencer; import org.opensearch.ad.ml.ADModelManager; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.ad.ml.ThresholdingResult; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.DetectorInternalState; @@ -142,6 +142,8 @@ import com.google.gson.JsonElement; import test.org.opensearch.ad.util.JsonDeserializer; +import test.org.opensearch.ad.util.MLUtil; +import test.org.opensearch.ad.util.RandomModelStateConfig; public class AnomalyResultTests extends AbstractTimeSeriesTest { private Settings settings; @@ -166,7 +168,7 @@ public class AnomalyResultTests extends AbstractTimeSeriesTest { private ADTaskManager adTaskManager; private ADCheckpointReadWorker checkpointReadQueue; private ADCacheProvider cacheProvider; - private ADInferencer inferencer; + private ADRealTimeInferencer inferencer; private ADColdStartWorker coldStartWorker; @BeforeClass @@ -356,7 +358,7 @@ public void setUp() throws Exception { when(cacheProvider.get()).thenReturn(mock(ADPriorityCache.class)); coldStartWorker = mock(ADColdStartWorker.class); - inferencer = new ADInferencer( + inferencer = new ADRealTimeInferencer( normalModelManager, adStats, mock(ADCheckpointDao.class), @@ -612,12 +614,11 @@ public void testInsufficientCapacityExceptionDuringColdStart() { assertException(listener, LimitExceededException.class); } - @SuppressWarnings("unchecked") public void testInsufficientCapacityExceptionDuringRestoringModel() throws InterruptedException { ADModelManager badModelManager = mock(ADModelManager.class); doThrow(new NullPointerException()).when(badModelManager).getResult(any(), any(), any(), any(), any()); - inferencer = new ADInferencer( + inferencer = new ADRealTimeInferencer( badModelManager, adStats, mock(ADCheckpointDao.class), @@ -629,7 +630,8 @@ public void testInsufficientCapacityExceptionDuringRestoringModel() throws Inter ADPriorityCache adPriorityCache = mock(ADPriorityCache.class); when(cacheProvider.get()).thenReturn(adPriorityCache); - when(adPriorityCache.get(anyString(), any())).thenReturn(mock(ModelState.class)); + when(adPriorityCache.get(anyString(), any())) + .thenReturn(MLUtil.randomModelState(new RandomModelStateConfig.Builder().fullModel(true).build())); CountDownLatch inProgress = new CountDownLatch(1); doAnswer(invocation -> { @@ -668,7 +670,10 @@ public void testInsufficientCapacityExceptionDuringRestoringModel() throws Inter adTaskManager ); - AnomalyResultRequest request = new AnomalyResultRequest(adID, 100, 200); + // make sure request data end time is assigned after state initialization to pass Inferencer.tryProcess method time check. + long start = System.currentTimeMillis() - 100; + long end = System.currentTimeMillis(); + AnomalyResultRequest request = new AnomalyResultRequest(adID, start, end); PlainActionFuture listener = new PlainActionFuture<>(); action.doExecute(null, request, listener); diff --git a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java index e35e85c87..5bd044ea8 100644 --- a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java @@ -55,8 +55,8 @@ import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.ml.ADCheckpointDao; import org.opensearch.ad.ml.ADColdStart; -import org.opensearch.ad.ml.ADInferencer; import org.opensearch.ad.ml.ADModelManager; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.ratelimit.ADCheckpointReadWorker; import org.opensearch.ad.ratelimit.ADColdEntityWorker; @@ -136,7 +136,7 @@ public class EntityResultTransportActionTests extends AbstractTimeSeriesTest { ClusterService clusterService; ADStats adStats; ADSaveResultStrategy resultSaver; - ADInferencer inferencer; + ADRealTimeInferencer inferencer; @BeforeClass public static void setUpBeforeClass() { @@ -164,10 +164,6 @@ public void setUp() throws Exception { detectorId = "123"; entities = new HashMap<>(); - start = 10L; - end = 20L; - request = new EntityResultRequest(detectorId, entities, start, end, AnalysisType.AD, null); - clock = mock(Clock.class); now = Instant.now(); when(clock.instant()).thenReturn(now); @@ -235,6 +231,11 @@ public void setUp() throws Exception { coldEntities.add(cacheMissEntityObj); when(entityCache.selectUpdateCandidate(any(), anyString(), any())).thenReturn(Pair.of(new ArrayList<>(), coldEntities)); + // make sure request data end time is assigned after state initialization to pass Inferencer.tryProcess method time check. + start = System.currentTimeMillis() - 10; + end = System.currentTimeMillis(); + request = new EntityResultRequest(detectorId, entities, start, end, AnalysisType.AD, null); + indexUtil = mock(ADIndexManagement.class); when(indexUtil.getSchemaVersion(any())).thenReturn(CommonValue.NO_SCHEMA_VERSION); @@ -263,7 +264,7 @@ public void setUp() throws Exception { adStats = new ADStats(statsMap); resultSaver = new ADSaveResultStrategy(1, resultWriteQueue); - inferencer = new ADInferencer(manager, adStats, checkpointDao, entityColdStartQueue, resultSaver, provider, threadPool); + inferencer = new ADRealTimeInferencer(manager, adStats, checkpointDao, entityColdStartQueue, resultSaver, provider, threadPool); entityResult = new EntityADResultTransportAction( actionFilters, @@ -389,7 +390,15 @@ public void testJsonResponse() throws IOException, JsonPathNotFoundException { public void testFailToScore() { ADModelManager spyModelManager = spy(manager); doThrow(new IllegalArgumentException()).when(spyModelManager).getResult(any(), any(), anyString(), any(), any()); - inferencer = new ADInferencer(spyModelManager, adStats, checkpointDao, entityColdStartQueue, resultSaver, provider, threadPool); + inferencer = new ADRealTimeInferencer( + spyModelManager, + adStats, + checkpointDao, + entityColdStartQueue, + resultSaver, + provider, + threadPool + ); entityResult = new EntityADResultTransportAction( actionFilters, transportService, diff --git a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java index 81903b375..fb93d8d1c 100644 --- a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java @@ -69,8 +69,8 @@ import org.opensearch.ad.caching.ADPriorityCache; import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.ml.ADCheckpointDao; -import org.opensearch.ad.ml.ADInferencer; import org.opensearch.ad.ml.ADModelManager; +import org.opensearch.ad.ml.ADRealTimeInferencer; import org.opensearch.ad.ml.ThresholdingResult; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.ratelimit.ADCheckpointReadWorker; @@ -177,7 +177,7 @@ public class MultiEntityResultTests extends AbstractTimeSeriesTest { private ADPriorityCache entityCache; private ADTaskManager adTaskManager; private ADSaveResultStrategy resultSaver; - private ADInferencer inferencer; + private ADRealTimeInferencer inferencer; @BeforeClass public static void setUpBeforeClass() { @@ -323,7 +323,7 @@ public void setUp() throws Exception { attrs3.put(serviceField, app0); attrs3.put(hostField, server3); - inferencer = new ADInferencer( + inferencer = new ADRealTimeInferencer( normalModelManager, adStats, mock(ADCheckpointDao.class), diff --git a/src/test/java/org/opensearch/timeseries/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/org/opensearch/timeseries/transport/AnomalyDetectorJobTransportActionTests.java index 3ea8d0fec..aa8c74c11 100644 --- a/src/test/java/org/opensearch/timeseries/transport/AnomalyDetectorJobTransportActionTests.java +++ b/src/test/java/org/opensearch/timeseries/transport/AnomalyDetectorJobTransportActionTests.java @@ -11,6 +11,7 @@ package org.opensearch.timeseries.transport; +import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_MODEL_MAX_SIZE_PERCENTAGE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR; @@ -83,6 +84,9 @@ public void setUp() throws Exception { dateRange = new DateRange(startTime, endTime); ingestTestData(testIndex, startTime, detectionIntervalInMinutes, type, 2000); createDetectorIndex(); + // increase the AD memory percentage. Otherwise testStartHistoricalAnalysisForMultiCategoryHCWithUser + // may fail. + updateTransientSettings(ImmutableMap.of(AD_MODEL_MAX_SIZE_PERCENTAGE.getKey(), 0.5)); } @Override