Skip to content

Commit

Permalink
make jvm heap usage a dynamic setting (#1212)
Browse files Browse the repository at this point in the history
* make jvm heap usage a dynamic setting

Signed-off-by: Jackie Han <[email protected]>

* move jvm_heap_usage_threshold setting to ADNumericSetting file

Signed-off-by: Jackie Han <[email protected]>

* update jvm threshold setting name

Signed-off-by: Jackie Han <[email protected]>

---------

Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed May 29, 2024
1 parent 8141c27 commit 2ce5a7c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 18 deletions.
15 changes: 15 additions & 0 deletions src/main/java/org/opensearch/ad/settings/ADNumericSetting.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class ADNumericSetting extends DynamicNumericSetting {
*/
public static final String CATEGORY_FIELD_LIMIT = "plugins.anomaly_detection.category_field_limit";

public static final String JVM_HEAP_USAGE_THRESHOLD = "plugins.anomaly_detection.jvm_heap_usage_threshold";

private static final Map<String, Setting<?>> settings = unmodifiableMap(new HashMap<String, Setting<?>>() {
{
// how many categorical fields we support
Expand All @@ -37,6 +39,11 @@ public class ADNumericSetting extends DynamicNumericSetting {
CATEGORY_FIELD_LIMIT,
Setting.intSetting(CATEGORY_FIELD_LIMIT, 2, 0, 5, Setting.Property.NodeScope, Setting.Property.Dynamic)
);
// JVM heap usage threshold setting
put(
JVM_HEAP_USAGE_THRESHOLD,
Setting.intSetting(JVM_HEAP_USAGE_THRESHOLD, 95, 0, 98, Setting.Property.NodeScope, Setting.Property.Dynamic)
);
}
});

Expand All @@ -57,4 +64,12 @@ public static synchronized ADNumericSetting getInstance() {
public static int maxCategoricalFields() {
return ADNumericSetting.getInstance().getSettingValue(ADNumericSetting.CATEGORY_FIELD_LIMIT);
}

/**
* Get the jvm_heap_usage threshold setting value
* @return jvm_heap_usage threshold setting value
*/
public static int getJVMHeapUsageThreshold() {
return ADNumericSetting.getInstance().getSettingValue(ADNumericSetting.JVM_HEAP_USAGE_THRESHOLD);
}
}
6 changes: 3 additions & 3 deletions src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_TOP_ENTITIES_FOR_HISTORICAL_ANALYSIS;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_TOP_ENTITIES_LIMIT_FOR_HISTORICAL_ANALYSIS;
import static org.opensearch.timeseries.TimeSeriesAnalyticsPlugin.AD_BATCH_TASK_THREAD_POOL_NAME;
import static org.opensearch.timeseries.breaker.MemoryCircuitBreaker.DEFAULT_JVM_HEAP_USAGE_THRESHOLD;
import static org.opensearch.timeseries.stats.InternalStatNames.JVM_HEAP_USAGE;
import static org.opensearch.timeseries.stats.StatNames.AD_EXECUTING_BATCH_TASK_COUNT;

Expand Down Expand Up @@ -49,6 +48,7 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.settings.ADEnabledSetting;
import org.opensearch.ad.settings.ADNumericSetting;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.stats.ADStats;
import org.opensearch.ad.transport.ADBatchAnomalyResultRequest;
Expand Down Expand Up @@ -704,12 +704,12 @@ private void dispatchTask(ADTask adTask, ActionListener<DiscoveryNode> listener)
List<StatsNodeResponse> candidateNodeResponse = adStatsResponse
.getNodes()
.stream()
.filter(stat -> (long) stat.getStatsMap().get(JVM_HEAP_USAGE.getName()) < DEFAULT_JVM_HEAP_USAGE_THRESHOLD)
.filter(stat -> (long) stat.getStatsMap().get(JVM_HEAP_USAGE.getName()) < ADNumericSetting.getJVMHeapUsageThreshold())
.collect(Collectors.toList());

if (candidateNodeResponse.size() == 0) {
StringBuilder errorMessageBuilder = new StringBuilder("All nodes' memory usage exceeds limitation ")
.append(DEFAULT_JVM_HEAP_USAGE_THRESHOLD)
.append(ADNumericSetting.getJVMHeapUsageThreshold())
.append("%. ")
.append(NO_ELIGIBLE_NODE_TO_RUN_DETECTOR)
.append(adTask.getConfigId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@

package org.opensearch.timeseries.breaker;

import org.opensearch.ad.settings.ADNumericSetting;
import org.opensearch.monitor.jvm.JvmService;

/**
* A circuit breaker for memory usage.
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Integer> {

public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 95;
private final JvmService jvmService;

public MemoryCircuitBreaker(JvmService jvmService) {
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
super(ADNumericSetting.getJVMHeapUsageThreshold());
this.jvmService = jvmService;
}

public MemoryCircuitBreaker(short threshold, JvmService jvmService) {
public MemoryCircuitBreaker(int threshold, JvmService jvmService) {
super(threshold);
this.jvmService = jvmService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;

import org.junit.Before;
Expand Down Expand Up @@ -44,32 +45,33 @@ public void setup() {
}

@Test
public void testIsOpen() {
public void testIsOpen_whenUsageIsBelowDefaultValue_shouldReturnFalse() {
CircuitBreaker breaker = new MemoryCircuitBreaker(jvmService);

assertThat(breaker.isOpen(), equalTo(false));
}

@Test
public void testIsOpen1() {
CircuitBreaker breaker = new MemoryCircuitBreaker((short) 90, jvmService);
public void testIsOpen_whenUsageIsAboveDefaultValue_shouldReturnTrue() {
CircuitBreaker breaker = new MemoryCircuitBreaker(jvmService);

assertThat(breaker.isOpen(), equalTo(false));
doReturn((short) 96).when(mem).getHeapUsedPercent();
assertThat(breaker.isOpen(), equalTo(true));
}

@Test
public void testIsOpen2() {
CircuitBreaker breaker = new MemoryCircuitBreaker(jvmService);
public void testIsOpen_whenUsageIsAboveSettingValue_shouldReturnTrue() {
CircuitBreaker breaker = new MemoryCircuitBreaker(90, jvmService);

when(mem.getHeapUsedPercent()).thenReturn((short) 96);
doReturn((short) 96).when(mem).getHeapUsedPercent();
assertThat(breaker.isOpen(), equalTo(true));
}

@Test
public void testIsOpen3() {
CircuitBreaker breaker = new MemoryCircuitBreaker((short) 90, jvmService);
public void testIsOpen_whenUsageIsBelowSettingValue_butAboveDefaultValue_shouldReturnFalse() {
CircuitBreaker breaker = new MemoryCircuitBreaker(97, jvmService);

when(mem.getHeapUsedPercent()).thenReturn((short) 96);
assertThat(breaker.isOpen(), equalTo(true));
doReturn((short) 96).when(mem).getHeapUsedPercent();
assertThat(breaker.isOpen(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public void testMaxCategoricalFields() {
assertEquals("Expected value is 3", 3, value);
}

public void testGetThresholdValue_shouldReturnThresholdValue() {
adSetting.setSettingValue(ADNumericSetting.JVM_HEAP_USAGE_THRESHOLD, 96);
int value = ADNumericSetting.getJVMHeapUsageThreshold();
assertEquals(96, value);
}

public void testGetSettingValue() {
Map<String, Setting<?>> settingsMap = new HashMap<>();
Setting<Integer> testSetting = Setting.intSetting("test.setting", 1, Setting.Property.NodeScope);
Expand Down

0 comments on commit 2ce5a7c

Please sign in to comment.