Skip to content

Commit

Permalink
Refactoring task cache manager for forecasting (#982)
Browse files Browse the repository at this point in the history
* Refactoring Shared Functionality for Broader Task Compatibility

This PR extracts shared components from `ADTaskCacheManager` into `TaskCacheManager` to support the requirements of the forecasting feature.

**Renamings**:

- **Method-level in `ADTaskCacheManager`**:
  - `addDeletedDetector` to `addDeletedConfig`
  - `addDeletedDetectorTask` to `addDeletedTask`
  - `hasDeletedDetectorTask` to `hasDeletedTask`
  - `pollDeletedDetector` to `pollDeletedConfig`
  - `pollDeletedDetectorTask` to `pollDeletedTask`

- **Variable-level in `AnomalyDetectorSettings`**:
  - `CHECKPOINT_MAINTAIN_QUEUE_MAX_HEAP_PERCENT` to `AD_CHECKPOINT_MAINTAIN_QUEUE_MAX_HEAP_PERCENT`
  - `CHECKPOINT_READ_QUEUE_MAX_HEAP_PERCENT` to `AD_CHECKPOINT_READ_QUEUE_MAX_HEAP_PERCENT`
  - `CHECKPOINT_SAVING_FREQ` to `AD_CHECKPOINT_SAVING_FREQ`
  - `CHECKPOINT_TTL` to `AD_CHECKPOINT_TTL`
  - `CHECKPOINT_WRITE_QUEUE_MAX_HEAP_PERCENT` to `AD_CHECKPOINT_WRITE_QUEUE_MAX_HEAP_PERCENT`
  - `COLD_ENTITY_QUEUE_MAX_HEAP_PERCENT` to `AD_COLD_ENTITY_QUEUE_MAX_HEAP_PERCENT`
  - `DEDICATED_CACHE_SIZE` to `AD_DEDICATED_CACHE_SIZE`
  - `ENTITY_COLD_START_QUEUE_CONCURRENCY` to `AD_ENTITY_COLD_START_QUEUE_CONCURRENCY`
  - `ENTITY_COLD_START_QUEUE_MAX_HEAP_PERCENT` to `AD_ENTITY_COLD_START_QUEUE_MAX_HEAP_PERCENT`
  - `EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS` to `AD_EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS`
  - `FILTER_BY_BACKEND_ROLES` to `AD_FILTER_BY_BACKEND_ROLES`
  - `MAX_ENTITIES_PER_QUERY` to `AD_MAX_ENTITIES_PER_QUERY`
  - `MAX_MODEL_SIZE_PER_NODE` to `AD_MAX_MODEL_SIZE_PER_NODE`
  - `MAX_MULTI_ENTITY_ANOMALY_DETECTORS` to `AD_MAX_HC_ANOMALY_DETECTORS`
  - `MAX_RETRY_FOR_END_RUN_EXCEPTION` to `AD_MAX_RETRY_FOR_END_RUN_EXCEPTION`
  - `MAX_SINGLE_ENTITY_ANOMALY_DETECTORS` to `AD_MAX_SINGLE_ENTITY_ANOMALY_DETECTORS`
  - `MODEL_MAX_SIZE_PERCENTAGE` to `AD_MODEL_MAX_SIZE_PERCENTAGE`
  - `PAGE_SIZE` to `AD_PAGE_SIZE`
  - `RESULT_WRITE_QUEUE_MAX_HEAP_PERCENT` to `AD_RESULT_WRITE_QUEUE_MAX_HEAP_PERCENT`

- **Class-level**:
  - `ADRealtimeTaskCache` renamed to `RealtimeTaskCache`

- **Package-level**:
  - Shifted from `org.opensearch.ad.breaker` to `org.opensearch.timeseries.breaker`

**Migrations**:

- Variables transferred from `AnomalyDetectorSettings` to `TimeSeriesSettings`:
  - `BATCH_BOUNDING_BOX_CACHE_RATIO`
  - `CHECKPOINT_MAINTAIN_REQUEST_SIZE_IN_BYTES`
  - `CHECKPOINT_WRITE_QUEUE_SIZE_IN_BYTES`
  - `DEFAULT_AD_JOB_LOC_DURATION_SECONDS` (renamed to `DEFAULT_JOB_LOC_DURATION_SECONDS`)
  - `ENTITY_REQUEST_SIZE_IN_BYTES`
  - `HOURLY_MAINTENANCE`
  - `INTERVAL_RATIO_FOR_REQUESTS`
  - `LOW_SEGMENT_PRUNE_RATIO`
  - `MAINTENANCE_FREQ_CONSTANT`
  - `MAX_COLD_START_ROUNDS`
  - `MAX_QUEUED_TASKS_RATIO`
  - `MAX_TOTAL_RCF_SERIALIZATION_BUFFERS`
  - `MAX_CHECKPOINT_BYTES`
  - `MEDIUM_SEGMENT_PRUNE_RATIO`
  - `NUM_MIN_SAMPLES`
  - `NUM_SAMPLES_PER_TREE`
  - `NUM_TREES`
  - `QUEUE_MAINTENANCE`
  - `RESULT_WRITE_QUEUE_SIZE_IN_BYTES`
  - `SERIALIZATION_BUFFER_BYTES`
  - `THRESHOLD_MIN_PVALUE`
  - `TIME_DECAY`

**Deletions**:
- Obsolete settings and methods:
  - `DESIRED_MODEL_SIZE_PERCENTAGE` in `AnomalyDetectorSettings`
  - `getDesiredModelSize` in `MemoryTracker`

**Modifications**:
- `MemoryTracker` enums renamed for clear differentiation between real-time and historical memory usage, adding `REAL_TIME_FORECASTER` for a harmonized single-stream and HC analysis approach.

**Tests**:
- Changes validated with a successful Gradle build.

Signed-off-by: Kaituo Li <[email protected]>

* improve comments

Signed-off-by: Kaituo Li <[email protected]>

* improve comments

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Aug 17, 2023
1 parent 0c5b4b9 commit 5ac6390
Show file tree
Hide file tree
Showing 95 changed files with 1,090 additions and 1,163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void setThreadPool(ThreadPool threadPool) {

public void setSettings(Settings settings) {
this.settings = settings;
this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings);
this.maxRetryForEndRunException = AnomalyDetectorSettings.AD_MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings);
}

public void setAdTaskManager(ADTaskManager adTaskManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.ad.model.DetectorState;
import org.opensearch.ad.model.InitProgressProfile;
import org.opensearch.ad.settings.ADNumericSetting;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.ProfileAction;
import org.opensearch.ad.transport.ProfileRequest;
Expand Down Expand Up @@ -70,6 +69,7 @@
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
Expand Down Expand Up @@ -105,7 +105,7 @@ public AnomalyDetectorProfileRunner(
}
this.transportService = transportService;
this.adTaskManager = adTaskManager;
this.maxTotalEntitiesToTrack = AnomalyDetectorSettings.MAX_TOTAL_ENTITIES_TO_TRACK;
this.maxTotalEntitiesToTrack = TimeSeriesSettings.MAX_TOTAL_ENTITIES_TO_TRACK;
}

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profilesToCollect) {
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/opensearch/ad/caching/CacheBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelState;
import org.opensearch.ad.model.InitProgressProfile;
Expand All @@ -36,6 +34,8 @@
import org.opensearch.ad.ratelimit.RequestPriority;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.timeseries.ExpiringState;
import org.opensearch.timeseries.MemoryTracker;
import org.opensearch.timeseries.MemoryTracker.Origin;

/**
* We use a layered cache to manage active entities’ states. We have a two-level
Expand Down Expand Up @@ -159,7 +159,7 @@ private void put(String entityModelId, ModelState<EntityModel> value, float prio
// Since we have already considered them while allocating CacheBuffer,
// skip bookkeeping.
if (!sharedCacheEmpty()) {
memoryTracker.consumeMemory(memoryConsumptionPerEntity, false, Origin.HC_DETECTOR);
memoryTracker.consumeMemory(memoryConsumptionPerEntity, false, Origin.REAL_TIME_DETECTOR);
}
} else {
update(entityModelId);
Expand Down Expand Up @@ -267,7 +267,7 @@ public ModelState<EntityModel> remove(String keyToRemove, boolean saveCheckpoint
if (valueRemoved != null) {
if (!reserved) {
// release in shared memory
memoryTracker.releaseMemory(memoryConsumptionPerEntity, false, Origin.HC_DETECTOR);
memoryTracker.releaseMemory(memoryConsumptionPerEntity, false, Origin.REAL_TIME_DETECTOR);
}

EntityModel modelRemoved = valueRemoved.getModel();
Expand Down Expand Up @@ -460,9 +460,9 @@ public void clear() {
// not a problem as we are releasing memory in MemoryTracker.
// The newly added one loses references and soon GC will collect it.
// We have memory tracking correction to fix incorrect memory usage record.
memoryTracker.releaseMemory(getReservedBytes(), true, Origin.HC_DETECTOR);
memoryTracker.releaseMemory(getReservedBytes(), true, Origin.REAL_TIME_DETECTOR);
if (!sharedCacheEmpty()) {
memoryTracker.releaseMemory(getBytesInSharedCache(), false, Origin.HC_DETECTOR);
memoryTracker.releaseMemory(getBytesInSharedCache(), false, Origin.REAL_TIME_DETECTOR);
}
items.clear();
priorityTracker.clearPriority();
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/opensearch/ad/caching/PriorityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

package org.opensearch.ad.caching;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.DEDICATED_CACHE_SIZE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MODEL_MAX_SIZE_PERCENTAGE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_DEDICATED_CACHE_SIZE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_MODEL_MAX_SIZE_PERCENTAGE;

import java.time.Clock;
import java.time.Duration;
Expand All @@ -38,8 +38,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.ml.CheckpointDao;
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelManager.ModelType;
Expand All @@ -57,6 +55,8 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.MemoryTracker;
import org.opensearch.timeseries.MemoryTracker.Origin;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.common.exception.LimitExceededException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
Expand Down Expand Up @@ -116,12 +116,12 @@ public PriorityCache(

this.activeEnities = new ConcurrentHashMap<>();
this.dedicatedCacheSize = dedicatedCacheSize;
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEDICATED_CACHE_SIZE, (it) -> {
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_DEDICATED_CACHE_SIZE, (it) -> {
this.dedicatedCacheSize = it;
this.setDedicatedCacheSizeListener();
this.tryClearUpMemory();
}, this::validateDedicatedCacheSize);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MODEL_MAX_SIZE_PERCENTAGE, it -> this.tryClearUpMemory());
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_MODEL_MAX_SIZE_PERCENTAGE, it -> this.tryClearUpMemory());

this.memoryTracker = memoryTracker;
this.maintenanceLock = new ReentrantLock();
Expand Down Expand Up @@ -461,7 +461,7 @@ private CacheBuffer computeBufferIfAbsent(AnomalyDetector detector, String detec
if (buffer == null) {
long requiredBytes = getRequiredMemory(detector, dedicatedCacheSize);
if (memoryTracker.canAllocateReserved(requiredBytes)) {
memoryTracker.consumeMemory(requiredBytes, true, Origin.HC_DETECTOR);
memoryTracker.consumeMemory(requiredBytes, true, Origin.REAL_TIME_DETECTOR);
long intervalSecs = detector.getIntervalInSeconds();

buffer = new CacheBuffer(
Expand Down Expand Up @@ -621,7 +621,7 @@ private void recalculateUsedMemory() {
reserved += buffer.getReservedBytes();
shared += buffer.getBytesInSharedCache();
}
memoryTracker.syncMemoryState(Origin.HC_DETECTOR, reserved + shared, reserved);
memoryTracker.syncMemoryState(Origin.REAL_TIME_DETECTOR, reserved + shared, reserved);
}

/**
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/org/opensearch/ad/ml/ModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ad.DetectorModelSize;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.timeseries.MemoryTracker;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.ml.SingleStreamModelIdMapper;
import org.opensearch.timeseries.model.Entity;
Expand Down Expand Up @@ -624,7 +623,7 @@ public List<ThresholdingResult> getPreviewResults(double[][] dataPoints, int shi
.parallelExecutionEnabled(false)
.compact(true)
.precision(Precision.FLOAT_32)
.boundingBoxCacheFraction(AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.boundingBoxCacheFraction(TimeSeriesSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.anomalyRate(1 - this.thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

import java.util.concurrent.ConcurrentHashMap;

import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.timeseries.MemoryTracker;
import org.opensearch.timeseries.MemoryTracker.Origin;

import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

Expand All @@ -37,7 +37,7 @@ public ModelState<ThresholdedRandomCutForest> remove(Object key) {
ModelState<ThresholdedRandomCutForest> deletedModelState = super.remove(key);
if (deletedModelState != null && deletedModelState.getModel() != null) {
long memoryToRelease = memoryTracker.estimateTRCFModelSize(deletedModelState.getModel());
memoryTracker.releaseMemory(memoryToRelease, true, Origin.SINGLE_ENTITY_DETECTOR);
memoryTracker.releaseMemory(memoryToRelease, true, Origin.REAL_TIME_DETECTOR);
}
return deletedModelState;
}
Expand All @@ -47,7 +47,7 @@ public ModelState<ThresholdedRandomCutForest> put(K key, ModelState<ThresholdedR
ModelState<ThresholdedRandomCutForest> previousAssociatedState = super.put(key, value);
if (value != null && value.getModel() != null) {
long memoryToConsume = memoryTracker.estimateTRCFModelSize(value.getModel());
memoryTracker.consumeMemory(memoryToConsume, true, Origin.SINGLE_ENTITY_DETECTOR);
memoryTracker.consumeMemory(memoryToConsume, true, Origin.REAL_TIME_DETECTOR);
}
return previousAssociatedState;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/ratelimit/BatchWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.breaker.CircuitBreakerService;

/**
*
Expand All @@ -46,7 +46,7 @@ public BatchWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;

public class CheckpointMaintainWorker extends ScheduledWorker<CheckpointMaintainRequest, CheckpointWriteRequest> {
private static final Logger LOG = LogManager.getLogger(CheckpointMaintainWorker.class);
Expand All @@ -43,7 +43,7 @@ public CheckpointMaintainWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.opensearch.action.get.MultiGetItemResponse;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.caching.CacheProvider;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.ADIndex;
Expand All @@ -53,6 +52,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.model.Config;
Expand Down Expand Up @@ -91,7 +91,7 @@ public CheckpointReadWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.ml.CheckpointDao;
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelState;
Expand All @@ -43,6 +42,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.util.ExceptionUtil;

Expand All @@ -60,7 +60,7 @@ public CheckpointWriteWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/opensearch/ad/ratelimit/ColdEntityWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
package org.opensearch.ad.ratelimit;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_CHECKPOINT_READ_QUEUE_BATCH_SIZE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;

/**
* A queue slowly releasing low-priority requests to CheckpointReadQueue
Expand All @@ -52,7 +52,7 @@ public ColdEntityWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down Expand Up @@ -87,12 +87,12 @@ public ColdEntityWorker(
this.batchSize = AD_CHECKPOINT_READ_QUEUE_BATCH_SIZE.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_CHECKPOINT_READ_QUEUE_BATCH_SIZE, it -> this.batchSize = it);

this.expectedExecutionTimeInMilliSecsPerRequest = AnomalyDetectorSettings.EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS
this.expectedExecutionTimeInMilliSecsPerRequest = AnomalyDetectorSettings.AD_EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS
.get(settings);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(
EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS,
AD_EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS,
it -> this.expectedExecutionTimeInMilliSecsPerRequest = it
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.breaker.CircuitBreakerService;

/**
* A queue to run concurrent requests (either batch or single request).
Expand Down Expand Up @@ -74,7 +74,7 @@ public ConcurrentWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
Loading

0 comments on commit 5ac6390

Please sign in to comment.