Skip to content

Commit

Permalink
Add Support for Handling Missing Data in Anomaly Detection
Browse files Browse the repository at this point in the history
This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy.

Key Changes:
1. Enhanced Missing Data Handling:

Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner).

2. Refactoring Imputation & Processing:

Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer.

3. Improved Imputed Value Reconstruction:

Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult).

4. Broadcast Support for HC Detectors:

Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor).

5. Introduction of ActionListenerExecutor:

Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool.

Testing:
Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Aug 12, 2024
1 parent 34b350e commit 9cb78cc
Show file tree
Hide file tree
Showing 149 changed files with 7,160 additions and 1,670 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/add-untriaged.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ on:
issues:
types: [opened, reopened, transferred]

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
apply-label:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/auto-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on:
tags:
- '*'

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
build:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/backport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on:
- closed
- labeled

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
backport:
if: github.event.pull_request.merged == true
Expand Down
18 changes: 16 additions & 2 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ on:
branches:
- "*"

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
Get-CI-Image-Tag:
uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main
Expand All @@ -19,7 +22,7 @@ jobs:
strategy:
matrix:
# each test scenario (rule, hc, single_stream) is treated as a separate job.
test: [rule, hc, single_stream]
test: [rule, hc, single_stream,missing]
fail-fast: false
concurrency:
# The concurrency setting is used to limit the concurrency of each test scenario group to ensure they do not run concurrently on the same machine.
Expand Down Expand Up @@ -48,11 +51,16 @@ jobs:
chown -R 1000:1000 `pwd`
case ${{ matrix.test }} in
rule)
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.RuleModelPerfIT' \
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.RealTimeRuleModelPerfIT' \
-Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \
-Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \
-Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \
-Dtests.timeoutSuite=3600000! -Dtest.logs=true"
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.HistoricalRuleModelPerfIT' \
-Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \
-Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \
-Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \
-Dtests.timeoutSuite=3600000! -Dtest.logs=true"
;;
hc)
su `id -un 1000` -c "./gradlew ':test' --tests 'org.opensearch.ad.ml.HCADModelPerfTests' \
Expand All @@ -66,4 +74,10 @@ jobs:
-Dtests.locale=kab-DZ -Dtests.timezone=Asia/Hebron -Dtest.logs=true \
-Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true"
;;
missing)
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.RealTimeMissingSingleFeatureModelPerfIT' \
-Dtests.seed=60CDDB34427ACD0C -Dtests.security.manager=false \
-Dtests.locale=kab-DZ -Dtests.timezone=Asia/Hebron -Dtest.logs=true \
-Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true"
;;
esac
5 changes: 4 additions & 1 deletion .github/workflows/delete_backport_branch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ on:
pull_request:
types:
- closed


env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
delete-branch:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/draft-release-notes-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on:
branches:
- main

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
update_release_draft:
name: Update draft release notes
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
types:
- opened

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
label:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/link-check-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on:
pull_request:
branches: [main]

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
linkchecker:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/maven-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ on:
- '1.*'
- '2.*'

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
build-and-publish-snapshots:
strategy:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/test_build_multi_platform.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ on:
branches:
- "*"

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
Get-CI-Image-Tag:
uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/test_bwc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ on:
branches:
- "*"

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
Get-CI-Image-Tag:
uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/test_security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ on:
branches:
- "*"

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
Build-ad:
strategy:
Expand Down
63 changes: 42 additions & 21 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.16.0"
bwcVersionShort = "2.17.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down Expand Up @@ -126,9 +126,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.12.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.1.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.1.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.1.0'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.16.1"
Expand Down Expand Up @@ -356,8 +356,7 @@ integTest {

if (System.getProperty("model-benchmark") == null || System.getProperty("model-benchmark") == "false") {
filter {
excludeTestsMatching "org.opensearch.ad.e2e.SingleStreamModelPerfIT"
excludeTestsMatching "org.opensearch.ad.e2e.RuleModelPerfIT"
excludeTestsMatching "org.opensearch.ad.e2e.*ModelPerfIT"
}
}

Expand Down Expand Up @@ -676,34 +675,56 @@ List<String> jacocoExclusions = [
// rest layer is tested in integration testing mostly, difficult to mock all of it
'org.opensearch.ad.rest.*',

'org.opensearch.ad.model.ModelProfileOnNode',
'org.opensearch.ad.model.InitProgressProfile',
'org.opensearch.ad.rest.*',
'org.opensearch.ad.AnomalyDetectorJobRunner',

// Class containing just constants. Don't need to test
'org.opensearch.ad.constant.*',
'org.opensearch.forecast.constant.*',
'org.opensearch.timeseries.constant.*',
'org.opensearch.timeseries.settings.TimeSeriesSettings',
'org.opensearch.forecast.settings.ForecastSettings',

'org.opensearch.ad.transport.CronRequest',
'org.opensearch.ad.AnomalyDetectorRunner',

// related to transport actions added for security
'org.opensearch.ad.transport.DeleteAnomalyDetectorTransportAction.1',

// TODO: unified flow caused coverage drop
'org.opensearch.ad.transport.DeleteAnomalyResultsTransportAction',
// TODO: fix unstable code coverage caused by null NodeClient issue
// https://github.com/opensearch-project/anomaly-detection/issues/241
'org.opensearch.ad.task.ADBatchTaskRunner',
'org.opensearch.ad.task.ADTaskManager',
// TODO: add forecast test coverage before release

// TODO: add test coverage (kaituo)
'org.opensearch.forecast.*',
'org.opensearch.timeseries.*',
'org.opensearch.ad.*',
'org.opensearch.ad.transport.GetAnomalyDetectorTransportAction',
'org.opensearch.ad.ml.ADColdStart',
'org.opensearch.ad.transport.ADHCImputeNodesResponse',
'org.opensearch.timeseries.transport.BooleanNodeResponse',
'org.opensearch.timeseries.ml.TimeSeriesSingleStreamCheckpointDao',
'org.opensearch.timeseries.transport.JobRequest',
'org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler',
'org.opensearch.timeseries.ml.Inferencer',
'org.opensearch.timeseries.transport.SingleStreamResultRequest',
'org.opensearch.timeseries.transport.BooleanResponse',
'org.opensearch.timeseries.rest.handler.IndexJobActionHandler.1',
'org.opensearch.timeseries.transport.SuggestConfigParamResponse',
'org.opensearch.timeseries.transport.SuggestConfigParamRequest',
'org.opensearch.timeseries.ml.MemoryAwareConcurrentHashmap',
'org.opensearch.timeseries.transport.ResultBulkTransportAction',
'org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler',
'org.opensearch.timeseries.transport.handler.ResultIndexingHandler',
'org.opensearch.ad.transport.ADHCImputeNodeResponse',
'org.opensearch.timeseries.ml.Sample',
'org.opensearch.timeseries.ratelimit.FeatureRequest',
'org.opensearch.ad.transport.ADHCImputeNodeRequest',
'org.opensearch.timeseries.model.ModelProfileOnNode',
'org.opensearch.timeseries.transport.ValidateConfigRequest',
'org.opensearch.timeseries.transport.ResultProcessor.PageListener.1',
'org.opensearch.ad.transport.ADHCImputeRequest',
'org.opensearch.timeseries.transport.BaseDeleteConfigTransportAction.1',
'org.opensearch.timeseries.transport.BaseSuggestConfigParamTransportAction',
'org.opensearch.timeseries.rest.AbstractSearchAction.1',
'org.opensearch.ad.transport.ADSingleStreamResultTransportAction',
'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker.RequestQueue',
'org.opensearch.timeseries.rest.RestStatsAction',
'org.opensearch.ad.ml.ADCheckpointDao',
'org.opensearch.timeseries.transport.CronRequest',
'org.opensearch.ad.task.ADBatchTaskCache',
'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker',
]


Expand Down
6 changes: 2 additions & 4 deletions src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ public void executeDetector(
startTime.toEpochMilli(),
endTime.toEpochMilli(),
ActionListener.wrap(features -> {
List<ThresholdingResult> entityResults = modelManager
.getPreviewResults(features, detector.getShingleSize(), detector.getTimeDecay());
List<ThresholdingResult> entityResults = modelManager.getPreviewResults(features, detector);
List<AnomalyResult> sampledEntityResults = sample(
parsePreviewResult(detector, features, entityResults, entity),
maxPreviewResults
Expand All @@ -116,8 +115,7 @@ public void executeDetector(
} else {
featureManager.getPreviewFeatures(detector, startTime.toEpochMilli(), endTime.toEpochMilli(), ActionListener.wrap(features -> {
try {
List<ThresholdingResult> results = modelManager
.getPreviewResults(features, detector.getShingleSize(), detector.getTimeDecay());
List<ThresholdingResult> results = modelManager.getPreviewResults(features, detector);
listener.onResponse(sample(parsePreviewResult(detector, features, results, null), maxPreviewResults));
} catch (Exception e) {
onFailure(e, listener, detector.getId());
Expand Down
46 changes: 27 additions & 19 deletions src/main/java/org/opensearch/ad/ml/ADColdStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -171,7 +172,7 @@ protected List<Sample> trainModelFromDataSegments(

double[] firstPoint = pointSamples.get(0).getValueList();
if (firstPoint == null || firstPoint.length == 0) {
logger.info("Return early since data points must not be empty.");
logger.info("Return early since the first data point must not be empty.");
return null;
}

Expand Down Expand Up @@ -216,6 +217,31 @@ protected List<Sample> trainModelFromDataSegments(
}

AnomalyDetector detector = (AnomalyDetector) config;
applyRule(rcfBuilder, detector);

// use build instead of new TRCF(Builder) because build method did extra validation and initialization
ThresholdedRandomCutForest trcf = rcfBuilder.build();

List<Sample> imputed = new ArrayList<>();
for (int i = 0; i < pointSamples.size(); i++) {
Sample dataSample = pointSamples.get(i);
double[] dataValue = dataSample.getValueList();
// We don't keep missing values during cold start as the actual data may not be reconstructed during the early stage.
trcf.process(dataValue, dataSample.getDataEndTime().getEpochSecond());
imputed.add(new Sample(dataValue, dataSample.getDataStartTime(), dataSample.getDataEndTime()));
}

entityState.setModel(trcf);

entityState.setLastUsedTime(clock.instant());

// save to checkpoint
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);

return pointSamples;
}

public static void applyRule(ThresholdedRandomCutForest.Builder rcfBuilder, AnomalyDetector detector) {
ThresholdArrays thresholdArrays = IgnoreSimilarExtractor.processDetectorRules(detector);

if (thresholdArrays != null) {
Expand All @@ -235,23 +261,5 @@ protected List<Sample> trainModelFromDataSegments(
rcfBuilder.ignoreNearExpectedFromBelowByRatio(thresholdArrays.ignoreSimilarFromBelowByRatio);
}
}

// use build instead of new TRCF(Builder) because build method did extra validation and initialization
ThresholdedRandomCutForest trcf = rcfBuilder.build();

for (int i = 0; i < pointSamples.size(); i++) {
Sample dataSample = pointSamples.get(i);
double[] dataValue = dataSample.getValueList();
trcf.process(dataValue, dataSample.getDataEndTime().getEpochSecond());
}

entityState.setModel(trcf);

entityState.setLastUsedTime(clock.instant());

// save to checkpoint
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);

return pointSamples;
}
}
Loading

0 comments on commit 9cb78cc

Please sign in to comment.