Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring task cache manager for forecasting #982

Merged
merged 3 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void setThreadPool(ThreadPool threadPool) {

public void setSettings(Settings settings) {
this.settings = settings;
this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings);
this.maxRetryForEndRunException = AnomalyDetectorSettings.AD_MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings);
}

public void setAdTaskManager(ADTaskManager adTaskManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.ad.model.DetectorState;
import org.opensearch.ad.model.InitProgressProfile;
import org.opensearch.ad.settings.ADNumericSetting;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.ProfileAction;
import org.opensearch.ad.transport.ProfileRequest;
Expand Down Expand Up @@ -70,6 +69,7 @@
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
Expand Down Expand Up @@ -105,7 +105,7 @@ public AnomalyDetectorProfileRunner(
}
this.transportService = transportService;
this.adTaskManager = adTaskManager;
this.maxTotalEntitiesToTrack = AnomalyDetectorSettings.MAX_TOTAL_ENTITIES_TO_TRACK;
this.maxTotalEntitiesToTrack = TimeSeriesSettings.MAX_TOTAL_ENTITIES_TO_TRACK;
}

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profilesToCollect) {
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/opensearch/ad/caching/CacheBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelState;
import org.opensearch.ad.model.InitProgressProfile;
Expand All @@ -36,6 +34,8 @@
import org.opensearch.ad.ratelimit.RequestPriority;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.timeseries.ExpiringState;
import org.opensearch.timeseries.MemoryTracker;
import org.opensearch.timeseries.MemoryTracker.Origin;

/**
* We use a layered cache to manage active entities’ states. We have a two-level
Expand Down Expand Up @@ -159,7 +159,7 @@ private void put(String entityModelId, ModelState<EntityModel> value, float prio
// Since we have already considered them while allocating CacheBuffer,
// skip bookkeeping.
if (!sharedCacheEmpty()) {
memoryTracker.consumeMemory(memoryConsumptionPerEntity, false, Origin.HC_DETECTOR);
memoryTracker.consumeMemory(memoryConsumptionPerEntity, false, Origin.REAL_TIME_DETECTOR);
}
} else {
update(entityModelId);
Expand Down Expand Up @@ -267,7 +267,7 @@ public ModelState<EntityModel> remove(String keyToRemove, boolean saveCheckpoint
if (valueRemoved != null) {
if (!reserved) {
// release in shared memory
memoryTracker.releaseMemory(memoryConsumptionPerEntity, false, Origin.HC_DETECTOR);
memoryTracker.releaseMemory(memoryConsumptionPerEntity, false, Origin.REAL_TIME_DETECTOR);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good name change

}

EntityModel modelRemoved = valueRemoved.getModel();
Expand Down Expand Up @@ -460,9 +460,9 @@ public void clear() {
// not a problem as we are releasing memory in MemoryTracker.
// The newly added one loses references and soon GC will collect it.
// We have memory tracking correction to fix incorrect memory usage record.
memoryTracker.releaseMemory(getReservedBytes(), true, Origin.HC_DETECTOR);
memoryTracker.releaseMemory(getReservedBytes(), true, Origin.REAL_TIME_DETECTOR);
if (!sharedCacheEmpty()) {
memoryTracker.releaseMemory(getBytesInSharedCache(), false, Origin.HC_DETECTOR);
memoryTracker.releaseMemory(getBytesInSharedCache(), false, Origin.REAL_TIME_DETECTOR);
}
items.clear();
priorityTracker.clearPriority();
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/opensearch/ad/caching/PriorityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

package org.opensearch.ad.caching;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.DEDICATED_CACHE_SIZE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MODEL_MAX_SIZE_PERCENTAGE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_DEDICATED_CACHE_SIZE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_MODEL_MAX_SIZE_PERCENTAGE;

import java.time.Clock;
import java.time.Duration;
Expand All @@ -38,8 +38,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.ml.CheckpointDao;
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelManager.ModelType;
Expand All @@ -57,6 +55,8 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.MemoryTracker;
import org.opensearch.timeseries.MemoryTracker.Origin;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.common.exception.LimitExceededException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
Expand Down Expand Up @@ -116,12 +116,12 @@ public PriorityCache(

this.activeEnities = new ConcurrentHashMap<>();
this.dedicatedCacheSize = dedicatedCacheSize;
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEDICATED_CACHE_SIZE, (it) -> {
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_DEDICATED_CACHE_SIZE, (it) -> {
this.dedicatedCacheSize = it;
this.setDedicatedCacheSizeListener();
this.tryClearUpMemory();
}, this::validateDedicatedCacheSize);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MODEL_MAX_SIZE_PERCENTAGE, it -> this.tryClearUpMemory());
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_MODEL_MAX_SIZE_PERCENTAGE, it -> this.tryClearUpMemory());

this.memoryTracker = memoryTracker;
this.maintenanceLock = new ReentrantLock();
Expand Down Expand Up @@ -461,7 +461,7 @@ private CacheBuffer computeBufferIfAbsent(AnomalyDetector detector, String detec
if (buffer == null) {
long requiredBytes = getRequiredMemory(detector, dedicatedCacheSize);
if (memoryTracker.canAllocateReserved(requiredBytes)) {
memoryTracker.consumeMemory(requiredBytes, true, Origin.HC_DETECTOR);
memoryTracker.consumeMemory(requiredBytes, true, Origin.REAL_TIME_DETECTOR);
long intervalSecs = detector.getIntervalInSeconds();

buffer = new CacheBuffer(
Expand Down Expand Up @@ -621,7 +621,7 @@ private void recalculateUsedMemory() {
reserved += buffer.getReservedBytes();
shared += buffer.getBytesInSharedCache();
}
memoryTracker.syncMemoryState(Origin.HC_DETECTOR, reserved + shared, reserved);
memoryTracker.syncMemoryState(Origin.REAL_TIME_DETECTOR, reserved + shared, reserved);
}

/**
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/org/opensearch/ad/ml/ModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ad.DetectorModelSize;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.timeseries.MemoryTracker;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.ml.SingleStreamModelIdMapper;
import org.opensearch.timeseries.model.Entity;
Expand Down Expand Up @@ -624,7 +623,7 @@ public List<ThresholdingResult> getPreviewResults(double[][] dataPoints, int shi
.parallelExecutionEnabled(false)
.compact(true)
.precision(Precision.FLOAT_32)
.boundingBoxCacheFraction(AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.boundingBoxCacheFraction(TimeSeriesSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.anomalyRate(1 - this.thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

import java.util.concurrent.ConcurrentHashMap;

import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.timeseries.MemoryTracker;
import org.opensearch.timeseries.MemoryTracker.Origin;

import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

Expand All @@ -37,7 +37,7 @@ public ModelState<ThresholdedRandomCutForest> remove(Object key) {
ModelState<ThresholdedRandomCutForest> deletedModelState = super.remove(key);
if (deletedModelState != null && deletedModelState.getModel() != null) {
long memoryToRelease = memoryTracker.estimateTRCFModelSize(deletedModelState.getModel());
memoryTracker.releaseMemory(memoryToRelease, true, Origin.SINGLE_ENTITY_DETECTOR);
memoryTracker.releaseMemory(memoryToRelease, true, Origin.REAL_TIME_DETECTOR);
}
return deletedModelState;
}
Expand All @@ -47,7 +47,7 @@ public ModelState<ThresholdedRandomCutForest> put(K key, ModelState<ThresholdedR
ModelState<ThresholdedRandomCutForest> previousAssociatedState = super.put(key, value);
if (value != null && value.getModel() != null) {
long memoryToConsume = memoryTracker.estimateTRCFModelSize(value.getModel());
memoryTracker.consumeMemory(memoryToConsume, true, Origin.SINGLE_ENTITY_DETECTOR);
memoryTracker.consumeMemory(memoryToConsume, true, Origin.REAL_TIME_DETECTOR);
}
return previousAssociatedState;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/ratelimit/BatchWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.breaker.CircuitBreakerService;

/**
*
Expand All @@ -46,7 +46,7 @@ public BatchWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;

public class CheckpointMaintainWorker extends ScheduledWorker<CheckpointMaintainRequest, CheckpointWriteRequest> {
private static final Logger LOG = LogManager.getLogger(CheckpointMaintainWorker.class);
Expand All @@ -43,7 +43,7 @@ public CheckpointMaintainWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.opensearch.action.get.MultiGetItemResponse;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.caching.CacheProvider;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.ADIndex;
Expand All @@ -53,6 +52,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.model.Config;
Expand Down Expand Up @@ -91,7 +91,7 @@ public CheckpointReadWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.ml.CheckpointDao;
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelState;
Expand All @@ -43,6 +42,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.util.ExceptionUtil;

Expand All @@ -60,7 +60,7 @@ public CheckpointWriteWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
package org.opensearch.ad.ratelimit;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_CHECKPOINT_READ_QUEUE_BATCH_SIZE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;

/**
* A queue slowly releasing low-priority requests to CheckpointReadQueue
Expand All @@ -52,7 +52,7 @@ public ColdEntityWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down Expand Up @@ -87,12 +87,12 @@ public ColdEntityWorker(
this.batchSize = AD_CHECKPOINT_READ_QUEUE_BATCH_SIZE.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_CHECKPOINT_READ_QUEUE_BATCH_SIZE, it -> this.batchSize = it);

this.expectedExecutionTimeInMilliSecsPerRequest = AnomalyDetectorSettings.EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS
this.expectedExecutionTimeInMilliSecsPerRequest = AnomalyDetectorSettings.AD_EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS
.get(settings);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(
EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS,
AD_EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_MILLISECS,
it -> this.expectedExecutionTimeInMilliSecsPerRequest = it
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.breaker.CircuitBreakerService;

/**
* A queue to run concurrent requests (either batch or single request).
Expand Down Expand Up @@ -74,7 +74,7 @@ public ConcurrentWorker(
Setting<Float> maxHeapPercentForQueueSetting,
ClusterService clusterService,
Random random,
ADCircuitBreakerService adCircuitBreakerService,
CircuitBreakerService adCircuitBreakerService,
ThreadPool threadPool,
Settings settings,
float maxQueuedTaskRatio,
Expand Down
Loading
Loading