Skip to content

Commit

Permalink
add custom result index lifecycle management (opensearch-project#1232)
Browse files Browse the repository at this point in the history
* add custom result index lifecycle management

Signed-off-by: Jackie Han <[email protected]>

* only parse number value for custom result index conditions fields

Signed-off-by: Jackie Han <[email protected]>

* make custom result index no hidden index

Signed-off-by: Jackie Han <[email protected]>

---------

Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jun 11, 2024
1 parent 2843d1b commit 918c48a
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public ADIndexManagement(
ADIndex.RESULT.getMapping(),
xContentRegistry,
AnomalyDetector::parse,
ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "*"
ADCommonName.CUSTOM_RESULT_INDEX_PREFIX
);

this.indexStates = new EnumMap<ADIndex, IndexState>(ADIndex.class);
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/org/opensearch/ad/model/AnomalyDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -567,13 +567,13 @@ public static AnomalyDetector parse(
}
break;
case RESULT_INDEX_FIELD_MIN_SIZE:
customResultIndexMinSize = parser.intValue();
customResultIndexMinSize = onlyParseNumberValue(parser);
break;
case RESULT_INDEX_FIELD_MIN_AGE:
customResultIndexMinAge = parser.intValue();
customResultIndexMinAge = onlyParseNumberValue(parser);
break;
case RESULT_INDEX_FIELD_TTL:
customResultIndexTTL = parser.intValue();
customResultIndexTTL = onlyParseNumberValue(parser);
break;
default:
parser.skipChildren();
Expand Down Expand Up @@ -685,4 +685,11 @@ private List<Rule> getDefaultRule() {
}
return rules;
}

private static Integer onlyParseNumberValue(XContentParser parser) throws IOException {
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return parser.intValue();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public ForecastIndexManagement(
ForecastIndex.RESULT.getMapping(),
xContentRegistry,
Forecaster::parse,
ForecastCommonName.CUSTOM_RESULT_INDEX_PREFIX + "*"
ForecastCommonName.CUSTOM_RESULT_INDEX_PREFIX
);
this.indexStates = new EnumMap<ForecastIndex, IndexState>(ForecastIndex.class);

Expand Down
109 changes: 88 additions & 21 deletions src/main/java/org/opensearch/timeseries/indices/IndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParser.Token;
Expand Down Expand Up @@ -132,7 +134,7 @@ public abstract class IndexManagement<IndexType extends Enum<IndexType> & TimeSe
private String resultMapping;
private NamedXContentRegistry xContentRegistry;
protected BiCheckedFunction<XContentParser, String, ? extends Config, IOException> configParser;
protected String customResultIndexRegex;
protected String customResultIndexPrefix;

protected class IndexState {
// keep track of whether the mapping version is up-to-date
Expand Down Expand Up @@ -164,7 +166,7 @@ protected IndexManagement(
String resultMapping,
NamedXContentRegistry xContentRegistry,
BiCheckedFunction<XContentParser, String, ? extends Config, IOException> configParser,
String customResultIndexRegex
String customResultIndexPrefix
)
throws IOException {
this.client = client;
Expand All @@ -188,7 +190,7 @@ protected IndexManagement(
this.resultMapping = resultMapping;
this.xContentRegistry = xContentRegistry;
this.configParser = configParser;
this.customResultIndexRegex = customResultIndexRegex;
this.customResultIndexPrefix = customResultIndexPrefix;
}

/**
Expand Down Expand Up @@ -297,7 +299,8 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
long latest = Long.MIN_VALUE;
for (IndexMetadata indexMetaData : clusterStateResponse.getState().metadata().indices().values()) {
long creationTime = indexMetaData.getCreationDate();
if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis()) {
long indexAgeMillis = Instant.now().toEpochMilli() - creationTime;
if (indexAgeMillis > historyRetentionPeriod.millis()) {
String indexName = indexMetaData.getIndex().getName();
candidates.add(indexName);
if (latest < creationTime) {
Expand Down Expand Up @@ -788,7 +791,7 @@ private void getConfigsWithCustomResultIndexAlias(ActionListener<List<Config>> l
}
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
BoolQueryBuilder shouldQueries = new BoolQueryBuilder();
shouldQueries.should(QueryBuilders.wildcardQuery(Config.RESULT_INDEX_FIELD, customResultIndexRegex));
shouldQueries.should(QueryBuilders.wildcardQuery(Config.RESULT_INDEX_FIELD, customResultIndexPrefix + "*"));
if (shouldQueries.should().isEmpty() == false) {
boolQuery.filter(shouldQueries);
}
Expand Down Expand Up @@ -1099,6 +1102,7 @@ protected void rescheduleRollover() {
if (scheduledRollover != null) {
scheduledRollover.cancel();
}

scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
}
Expand Down Expand Up @@ -1233,35 +1237,98 @@ protected void rolloverAndDeleteHistoryIndex(
String rolloverIndexPattern,
IndexType resultIndex
) {
if (!doesDefaultResultIndexExist()) {
return;
// perform rollover and delete on default result index
if (doesDefaultResultIndexExist()) {
RolloverRequest defaultResultIndexRolloverRequest = buildRolloverRequest(resultIndexAlias, rolloverIndexPattern);
defaultResultIndexRolloverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());
proceedWithDefaultRolloverAndDelete(resultIndexAlias, defaultResultIndexRolloverRequest, allResultIndicesPattern, resultIndex);
}
// get config files that have custom result index alias to perform rollover on
getConfigsWithCustomResultIndexAlias(ActionListener.wrap(candidateResultAliases -> {
if (candidateResultAliases == null || candidateResultAliases.isEmpty()) {
logger.info("Candidate custom result indices are empty.");
return;
}

// We have to pass null for newIndexName in order to get Elastic to increment the index count.
RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
// perform rollover and delete on found custom result index alias
candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex));
}, e -> { logger.error("Failed to get configs with custom result index alias.", e); }));
}

private void handleCustomResultIndex(Config config, IndexType resultIndex) {
RolloverRequest rolloverRequest = buildRolloverRequest(
config.getCustomResultIndexOrAlias(),
getCustomResultIndexPattern(config.getCustomResultIndexOrAlias())
);

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge()));
}
if (config.getCustomResultIndexMinSize() != null) {
rolloverRequest.addMaxIndexSizeCondition(new ByteSizeValue(config.getCustomResultIndexMinSize(), ByteSizeUnit.MB));
}

// perform rollover and delete on custom result index alias
proceedWithRolloverAndDelete(
config.getCustomResultIndexOrAlias(),
rolloverRequest,
getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()),
resultIndex,
config.getCustomResultIndexTTL()
);
}

private void proceedWithDefaultRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rolloverRequest,
String allResultIndicesPattern,
IndexType resultIndex
) {
proceedWithRolloverAndDelete(resultIndexAlias, rolloverRequest, allResultIndicesPattern, resultIndex, null);
}

private RolloverRequest buildRolloverRequest(String resultIndexAlias, String rolloverIndexPattern) {
RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest();

createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON);
if (resultIndexAlias.startsWith(customResultIndexPrefix)) {
choosePrimaryShards(createRequest, false);
} else {
choosePrimaryShards(createRequest, true);
}
return rollOverRequest;
}

choosePrimaryShards(createRequest, true);

rollOverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());
private void proceedWithRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rollOverRequest,
String allResultIndicesPattern,
IndexType resultIndex,
Integer customResultIndexTtl
) {
if (rollOverRequest.getConditions().size() == 0) {
return;
}
adminClient.indices().rolloverIndex(rollOverRequest, ActionListener.wrap(response -> {
if (!response.isRolledOver()) {
logger.warn("{} not rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
} else {
IndexState indexStatetate = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexStatetate.mappingUpToDate = true;
IndexState indexState = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexState.mappingUpToDate = true;
logger.info("{} rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
if (resultIndexAlias.startsWith(customResultIndexPrefix)) {
// handle custom result index deletion
if (customResultIndexTtl != null) {
deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueHours(customResultIndexTtl * 24));
}
} else {
// handle default result index deletion
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
}
}
}, exception -> {
// e.g., we may roll over too often. Since the index pattern is opensearch-ad-plugin-result-d-history-{now/d}-000001,
// we cannot roll over twice in the same day as the index with the same name exists. We will get
// resource_already_exists_exception.
logger.error("Fail to roll over result index", exception);
}));
}, exception -> { logger.error("Fail to roll over result index", exception); }));
}

protected void initResultIndexDirectly(
Expand Down
Loading

0 comments on commit 918c48a

Please sign in to comment.