From 7814c6d6353f7834e1e3487c9ade1498aaf300dc Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Fri, 16 Aug 2024 13:01:04 -0700 Subject: [PATCH] add comment and remove redundant code Signed-off-by: Kaituo Li --- .../org/opensearch/ad/ml/ADColdStart.java | 3 --- .../transport/ADHCImputeTransportAction.java | 27 +++++++++++++++++++ .../util/ActionListenerExecutor.java | 3 ++- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/opensearch/ad/ml/ADColdStart.java b/src/main/java/org/opensearch/ad/ml/ADColdStart.java index b4f329efa..7e6cb91c6 100644 --- a/src/main/java/org/opensearch/ad/ml/ADColdStart.java +++ b/src/main/java/org/opensearch/ad/ml/ADColdStart.java @@ -13,7 +13,6 @@ import java.time.Clock; import java.time.Duration; -import java.util.ArrayList; import java.util.List; import org.apache.logging.log4j.LogManager; @@ -222,13 +221,11 @@ protected List trainModelFromDataSegments( // use build instead of new TRCF(Builder) because build method did extra validation and initialization ThresholdedRandomCutForest trcf = rcfBuilder.build(); - List imputed = new ArrayList<>(); for (int i = 0; i < pointSamples.size(); i++) { Sample dataSample = pointSamples.get(i); double[] dataValue = dataSample.getValueList(); // We don't keep missing values during cold start as the actual data may not be reconstructed during the early stage. trcf.process(dataValue, dataSample.getDataEndTime().getEpochSecond()); - imputed.add(new Sample(dataValue, dataSample.getDataStartTime(), dataSample.getDataEndTime())); } entityState.setModel(trcf); diff --git a/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java b/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java index 52fdc0c6b..32c44d03e 100644 --- a/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java @@ -35,6 +35,33 @@ import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; +/** + * This class manages the broadcasting mechanism and entity data processing for + * the HC detector. The system broadcasts a message after processing all records + * in each interval to ensure that each node examines its hot models in memory + * and determines which entity models have not received data during the current interval. + * + * "Hot" entities refer to those models actively loaded in memory, as opposed to + * "cold" models, which are not loaded and remain in storage due to limited memory resources. + * + * Upon receiving the broadcast message, each node checks whether each hot entity + * has received new data. If a hot entity has not received any data, the system + * assigns a NaN value to that entity. This NaN value signals to the model that no + * data was received, prompting it to impute the missing value based on previous data, + * rather than using current interval data. + * + * The system determines which node manages which entities based on memory availability. + * The coordinating node does not immediately know which entities are hot or cold; + * it learns this during the pagination process. Hot entities are those that have + * recently received data and are actively maintained in memory, while cold entities + * remain in storage and are processed only if time permits within the interval. + * + * For cold entities whose models are not loaded in memory, the system does not + * produce an anomaly result for that interval due to insufficient time or resources + * to process them. This is particularly relevant in scenarios with short intervals, + * such as one minute, where an underscaled cluster may cause processing delays + * that prevent timely anomaly detection for some entities. + */ public class ADHCImputeTransportAction extends TransportNodesAction { private static final Logger LOG = LogManager.getLogger(ADHCImputeTransportAction.class); diff --git a/src/main/java/org/opensearch/timeseries/util/ActionListenerExecutor.java b/src/main/java/org/opensearch/timeseries/util/ActionListenerExecutor.java index b4cea8ebb..b7f601444 100644 --- a/src/main/java/org/opensearch/timeseries/util/ActionListenerExecutor.java +++ b/src/main/java/org/opensearch/timeseries/util/ActionListenerExecutor.java @@ -20,7 +20,8 @@ private ActionListenerExecutor() {} /** * Wraps the provided response and failure handlers in an ActionListener that executes the - * response handler asynchronously using the provided ExecutorService. + * response handler asynchronously using the provided ExecutorService. This would allow + * us to execute listener code using AD thread pool for example. * * @param the type of the response * @param onResponse a CheckedConsumer that handles the response; it can throw an exception