Skip to content

Commit

Permalink
rebase from main
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Aug 14, 2024
1 parent 9cb78cc commit f3bdf1b
Show file tree
Hide file tree
Showing 34 changed files with 265 additions and 131 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ List<String> jacocoExclusions = [
'org.opensearch.timeseries.transport.CronRequest',
'org.opensearch.ad.task.ADBatchTaskCache',
'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker',
'org.opensearch.timeseries.util.TimeUtil',
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
import org.opensearch.ad.ratelimit.ADColdStartWorker;
import org.opensearch.ad.ratelimit.ADSaveResultStrategy;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.ml.Inferencer;
import org.opensearch.timeseries.ml.RealTimeInferencer;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.timeseries.stats.Stats;

import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

public class ADInferencer extends
Inferencer<ThresholdedRandomCutForest, AnomalyResult, ThresholdingResult, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADColdStart, ADModelManager, ADSaveResultStrategy, ADPriorityCache, ADColdStartWorker> {
public class ADRealTimeInferencer extends
RealTimeInferencer<ThresholdedRandomCutForest, AnomalyResult, ThresholdingResult, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADColdStart, ADModelManager, ADSaveResultStrategy, ADPriorityCache, ADColdStartWorker> {

public ADInferencer(
public ADRealTimeInferencer(
ADModelManager modelManager,
Stats stats,
ADCheckpointDao checkpointDao,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.ml.ADCheckpointDao;
import org.opensearch.ad.ml.ADColdStart;
import org.opensearch.ad.ml.ADInferencer;
import org.opensearch.ad.ml.ADModelManager;
import org.opensearch.ad.ml.ADRealTimeInferencer;
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -54,7 +54,7 @@
*
*/
public class ADCheckpointReadWorker extends
CheckpointReadWorker<ThresholdedRandomCutForest, AnomalyResult, ThresholdingResult, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADColdStartWorker, ADInferencer> {
CheckpointReadWorker<ThresholdedRandomCutForest, AnomalyResult, ThresholdingResult, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADColdStartWorker, ADRealTimeInferencer> {
public static final String WORKER_NAME = "ad-checkpoint-read";

public ADCheckpointReadWorker(
Expand All @@ -79,7 +79,7 @@ public ADCheckpointReadWorker(
Provider<ADPriorityCache> cacheProvider,
Duration stateTtl,
ADCheckpointWriteWorker checkpointWriteQueue,
ADInferencer inferencer
ADRealTimeInferencer inferencer
) {
super(
WORKER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.ml.ADCheckpointDao;
import org.opensearch.ad.ml.ADColdStart;
import org.opensearch.ad.ml.ADInferencer;
import org.opensearch.ad.ml.ADModelManager;
import org.opensearch.ad.ml.ADRealTimeInferencer;
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -55,7 +55,7 @@
*
*/
public class ADColdEntityWorker extends
ColdEntityWorker<ThresholdedRandomCutForest, AnomalyResult, ADIndex, ADIndexManagement, ADCheckpointDao, ThresholdingResult, ADModelManager, ADCheckpointWriteWorker, ADColdStart, ADPriorityCache, ADSaveResultStrategy, ADColdStartWorker, ADInferencer, ADCheckpointReadWorker> {
ColdEntityWorker<ThresholdedRandomCutForest, AnomalyResult, ADIndex, ADIndexManagement, ADCheckpointDao, ThresholdingResult, ADModelManager, ADCheckpointWriteWorker, ADColdStart, ADPriorityCache, ADSaveResultStrategy, ADColdStartWorker, ADRealTimeInferencer, ADCheckpointReadWorker> {
public static final String WORKER_NAME = "ad-cold-entity";

public ADColdEntityWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.ad.caching.ADCacheProvider;
import org.opensearch.ad.ml.ADInferencer;
import org.opensearch.ad.ml.ADRealTimeInferencer;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -41,7 +41,7 @@ public class ADHCImputeTransportAction extends

private ADCacheProvider cache;
private NodeStateManager nodeStateManager;
private ADInferencer adInferencer;
private ADRealTimeInferencer adInferencer;

@Inject
public ADHCImputeTransportAction(
Expand All @@ -51,7 +51,7 @@ public ADHCImputeTransportAction(
ActionFilters actionFilters,
ADCacheProvider priorityCache,
NodeStateManager nodeStateManager,
ADInferencer adInferencer
ADRealTimeInferencer adInferencer
) {
super(
ADHCImputeAction.NAME,
Expand Down Expand Up @@ -104,9 +104,9 @@ protected ADHCImputeNodeResponse nodeOperation(ADHCImputeNodeRequest nodeRequest
long executionEndTime = dataEndMillis + windowDelayMillis;
String taskId = nodeRequest.getRequest().getTaskId();
for (ModelState<ThresholdedRandomCutForest> modelState : cache.get().getAllModels(configId)) {
// execution end time (when job starts execution in this interval) > last used time => the model state is updated in
// execution end time (when job starts execution in this interval) >= last used time => the model state is updated in
// previous intervals
if (executionEndTime > modelState.getLastUsedTime().toEpochMilli()) {
if (executionEndTime >= modelState.getLastUsedTime().toEpochMilli()) {
double[] nanArray = new double[featureSize];
Arrays.fill(nanArray, Double.NaN);
adInferencer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.ml.ADCheckpointDao;
import org.opensearch.ad.ml.ADColdStart;
import org.opensearch.ad.ml.ADInferencer;
import org.opensearch.ad.ml.ADModelManager;
import org.opensearch.ad.ml.ADRealTimeInferencer;
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.ratelimit.ADCheckpointMaintainWorker;
Expand All @@ -37,7 +37,7 @@
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

public class ADSingleStreamResultTransportAction extends
AbstractSingleStreamResultTransportAction<ThresholdedRandomCutForest, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADCheckpointMaintainWorker, ADCacheBuffer, ADPriorityCache, ADCacheProvider, AnomalyResult, ThresholdingResult, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADColdStartWorker, ADInferencer, ADCheckpointReadWorker, ADResultWriteRequest> {
AbstractSingleStreamResultTransportAction<ThresholdedRandomCutForest, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADCheckpointMaintainWorker, ADCacheBuffer, ADPriorityCache, ADCacheProvider, AnomalyResult, ThresholdingResult, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADColdStartWorker, ADRealTimeInferencer, ADCheckpointReadWorker, ADResultWriteRequest> {

@Inject
public ADSingleStreamResultTransportAction(
Expand All @@ -47,7 +47,7 @@ public ADSingleStreamResultTransportAction(
ADCacheProvider cache,
NodeStateManager stateManager,
ADCheckpointReadWorker checkpointReadQueue,
ADInferencer inferencer,
ADRealTimeInferencer inferencer,
ThreadPool threadPool
) {
super(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.ml.ADCheckpointDao;
import org.opensearch.ad.ml.ADColdStart;
import org.opensearch.ad.ml.ADInferencer;
import org.opensearch.ad.ml.ADModelManager;
import org.opensearch.ad.ml.ADRealTimeInferencer;
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.ratelimit.ADCheckpointReadWorker;
Expand Down Expand Up @@ -77,12 +77,12 @@ public class EntityADResultTransportAction extends HandledTransportAction<Entity
private CacheProvider<ThresholdedRandomCutForest, ADPriorityCache> cache;
private final NodeStateManager stateManager;
private ThreadPool threadPool;
private EntityResultProcessor<ThresholdedRandomCutForest, AnomalyResult, ThresholdingResult, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADColdStartWorker, ADInferencer, ADCheckpointReadWorker, ADColdEntityWorker> intervalDataProcessor;
private EntityResultProcessor<ThresholdedRandomCutForest, AnomalyResult, ThresholdingResult, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADColdStartWorker, ADRealTimeInferencer, ADCheckpointReadWorker, ADColdEntityWorker> intervalDataProcessor;

private final ADCacheProvider entityCache;
private final ADCheckpointReadWorker checkpointReadQueue;
private final ADColdEntityWorker coldEntityQueue;
private final ADInferencer inferencer;
private final ADRealTimeInferencer inferencer;

@Inject
public EntityADResultTransportAction(
Expand All @@ -95,7 +95,7 @@ public EntityADResultTransportAction(
ADCheckpointReadWorker checkpointReadQueue,
ADColdEntityWorker coldEntityQueue,
ThreadPool threadPool,
ADInferencer inferencer
ADRealTimeInferencer inferencer
) {
super(EntityADResultAction.NAME, transportService, actionFilters, EntityResultRequest::new);
this.adCircuitBreakerService = adCircuitBreakerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
import org.opensearch.forecast.ratelimit.ForecastColdStartWorker;
import org.opensearch.forecast.ratelimit.ForecastSaveResultStrategy;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.ml.Inferencer;
import org.opensearch.timeseries.ml.RealTimeInferencer;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.timeseries.stats.Stats;

import com.amazon.randomcutforest.parkservices.RCFCaster;

public class ForecastInferencer extends
Inferencer<RCFCaster, ForecastResult, RCFCasterResult, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastColdStart, ForecastModelManager, ForecastSaveResultStrategy, ForecastPriorityCache, ForecastColdStartWorker> {
public class ForecastRealTimeInferencer extends
RealTimeInferencer<RCFCaster, ForecastResult, RCFCasterResult, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastColdStart, ForecastModelManager, ForecastSaveResultStrategy, ForecastPriorityCache, ForecastColdStartWorker> {

public ForecastInferencer(
public ForecastRealTimeInferencer(
ForecastModelManager modelManager,
Stats stats,
ForecastCheckpointDao checkpointDao,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.opensearch.forecast.indices.ForecastIndexManagement;
import org.opensearch.forecast.ml.ForecastCheckpointDao;
import org.opensearch.forecast.ml.ForecastColdStart;
import org.opensearch.forecast.ml.ForecastInferencer;
import org.opensearch.forecast.ml.ForecastModelManager;
import org.opensearch.forecast.ml.ForecastRealTimeInferencer;
import org.opensearch.forecast.ml.RCFCasterResult;
import org.opensearch.forecast.model.ForecastResult;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -36,7 +36,7 @@
import com.amazon.randomcutforest.parkservices.RCFCaster;

public class ForecastCheckpointReadWorker extends
CheckpointReadWorker<RCFCaster, ForecastResult, RCFCasterResult, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, ForecastColdStartWorker, ForecastInferencer> {
CheckpointReadWorker<RCFCaster, ForecastResult, RCFCasterResult, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, ForecastColdStartWorker, ForecastRealTimeInferencer> {
public static final String WORKER_NAME = "forecast-checkpoint-read";

public ForecastCheckpointReadWorker(
Expand All @@ -61,7 +61,7 @@ public ForecastCheckpointReadWorker(
Provider<ForecastPriorityCache> cacheProvider,
Duration stateTtl,
ForecastCheckpointWriteWorker checkpointWriteQueue,
ForecastInferencer inferencer
ForecastRealTimeInferencer inferencer
) {
super(
WORKER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.opensearch.forecast.indices.ForecastIndexManagement;
import org.opensearch.forecast.ml.ForecastCheckpointDao;
import org.opensearch.forecast.ml.ForecastColdStart;
import org.opensearch.forecast.ml.ForecastInferencer;
import org.opensearch.forecast.ml.ForecastModelManager;
import org.opensearch.forecast.ml.ForecastRealTimeInferencer;
import org.opensearch.forecast.ml.RCFCasterResult;
import org.opensearch.forecast.model.ForecastResult;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -49,7 +49,7 @@
*
*/
public class ForecastColdEntityWorker extends
ColdEntityWorker<RCFCaster, ForecastResult, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, RCFCasterResult, ForecastModelManager, ForecastCheckpointWriteWorker, ForecastColdStart, ForecastPriorityCache, ForecastSaveResultStrategy, ForecastColdStartWorker, ForecastInferencer, ForecastCheckpointReadWorker> {
ColdEntityWorker<RCFCaster, ForecastResult, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, RCFCasterResult, ForecastModelManager, ForecastCheckpointWriteWorker, ForecastColdStart, ForecastPriorityCache, ForecastSaveResultStrategy, ForecastColdStartWorker, ForecastRealTimeInferencer, ForecastCheckpointReadWorker> {
public static final String WORKER_NAME = "forecast-cold-entity";

public ForecastColdEntityWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.opensearch.forecast.indices.ForecastIndexManagement;
import org.opensearch.forecast.ml.ForecastCheckpointDao;
import org.opensearch.forecast.ml.ForecastColdStart;
import org.opensearch.forecast.ml.ForecastInferencer;
import org.opensearch.forecast.ml.ForecastModelManager;
import org.opensearch.forecast.ml.ForecastRealTimeInferencer;
import org.opensearch.forecast.ml.RCFCasterResult;
import org.opensearch.forecast.model.ForecastResult;
import org.opensearch.forecast.ratelimit.ForecastCheckpointReadWorker;
Expand Down Expand Up @@ -77,12 +77,12 @@ public class EntityForecastResultTransportAction extends HandledTransportAction<
private CacheProvider<RCFCaster, ForecastPriorityCache> cache;
private final NodeStateManager stateManager;
private ThreadPool threadPool;
private EntityResultProcessor<RCFCaster, ForecastResult, RCFCasterResult, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, ForecastColdStartWorker, ForecastInferencer, ForecastCheckpointReadWorker, ForecastColdEntityWorker> intervalDataProcessor;
private EntityResultProcessor<RCFCaster, ForecastResult, RCFCasterResult, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, ForecastColdStartWorker, ForecastRealTimeInferencer, ForecastCheckpointReadWorker, ForecastColdEntityWorker> intervalDataProcessor;

private final ForecastCacheProvider entityCache;
private final ForecastCheckpointReadWorker checkpointReadQueue;
private final ForecastColdEntityWorker coldEntityQueue;
private final ForecastInferencer inferencer;
private final ForecastRealTimeInferencer inferencer;

@Inject
public EntityForecastResultTransportAction(
Expand All @@ -96,7 +96,7 @@ public EntityForecastResultTransportAction(
ForecastCheckpointReadWorker checkpointReadQueue,
ForecastColdEntityWorker coldEntityQueue,
ThreadPool threadPool,
ForecastInferencer inferencer
ForecastRealTimeInferencer inferencer
) {
super(EntityForecastResultAction.NAME, transportService, actionFilters, EntityResultRequest::new);
this.circuitBreakerService = adCircuitBreakerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.opensearch.forecast.indices.ForecastIndexManagement;
import org.opensearch.forecast.ml.ForecastCheckpointDao;
import org.opensearch.forecast.ml.ForecastColdStart;
import org.opensearch.forecast.ml.ForecastInferencer;
import org.opensearch.forecast.ml.ForecastModelManager;
import org.opensearch.forecast.ml.ForecastRealTimeInferencer;
import org.opensearch.forecast.ml.RCFCasterResult;
import org.opensearch.forecast.model.ForecastResult;
import org.opensearch.forecast.ratelimit.ForecastCheckpointMaintainWorker;
Expand All @@ -39,7 +39,7 @@
import com.amazon.randomcutforest.parkservices.RCFCaster;

public class ForecastSingleStreamResultTransportAction extends
AbstractSingleStreamResultTransportAction<RCFCaster, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastCheckpointMaintainWorker, ForecastCacheBuffer, ForecastPriorityCache, ForecastCacheProvider, ForecastResult, RCFCasterResult, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, ForecastColdStartWorker, ForecastInferencer, ForecastCheckpointReadWorker, ForecastResultWriteRequest> {
AbstractSingleStreamResultTransportAction<RCFCaster, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastCheckpointMaintainWorker, ForecastCacheBuffer, ForecastPriorityCache, ForecastCacheProvider, ForecastResult, RCFCasterResult, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, ForecastColdStartWorker, ForecastRealTimeInferencer, ForecastCheckpointReadWorker, ForecastResultWriteRequest> {

private static final Logger LOG = LogManager.getLogger(ForecastSingleStreamResultTransportAction.class);

Expand All @@ -51,7 +51,7 @@ public ForecastSingleStreamResultTransportAction(
ForecastCacheProvider cache,
NodeStateManager stateManager,
ForecastCheckpointReadWorker checkpointReadQueue,
ForecastInferencer inferencer,
ForecastRealTimeInferencer inferencer,
ThreadPool threadPool
) {
super(
Expand Down
Loading

0 comments on commit f3bdf1b

Please sign in to comment.