Skip to content

Commit

Permalink
rollover custom result index with conditions
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jun 9, 2024
1 parent 051acf0 commit 864de68
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 19 deletions.
114 changes: 97 additions & 17 deletions src/main/java/org/opensearch/timeseries/indices/IndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.opensearch.timeseries.indices;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.forecast.constant.ForecastCommonName.CUSTOM_RESULT_INDEX_PREFIX;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

import java.io.IOException;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
Expand All @@ -66,6 +68,8 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParser.Token;
Expand Down Expand Up @@ -298,7 +302,8 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
long latest = Long.MIN_VALUE;
for (IndexMetadata indexMetaData : clusterStateResponse.getState().metadata().indices().values()) {
long creationTime = indexMetaData.getCreationDate();
if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis()) {
long indexAgeMillis = Instant.now().toEpochMilli() - creationTime;
if (indexAgeMillis > historyRetentionPeriod.millis()) {
String indexName = indexMetaData.getIndex().getName();
candidates.add(indexName);
if (latest < creationTime) {
Expand Down Expand Up @@ -1077,7 +1082,7 @@ public void onClusterManager() {

// schedule the next rollover for approx MAX_AGE later
scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), TimeValue.timeValueMinutes(1), executorName());
} catch (Exception e) {
// This should be run on cluster startup
logger.error("Error rollover result indices. " + "Can't rollover result until clusterManager node is restarted.", e);
Expand All @@ -1100,6 +1105,7 @@ protected void rescheduleRollover() {
if (scheduledRollover != null) {
scheduledRollover.cancel();
}

scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
}
Expand Down Expand Up @@ -1234,35 +1240,109 @@ protected void rolloverAndDeleteHistoryIndex(
String rolloverIndexPattern,
IndexType resultIndex
) {
if (!doesDefaultResultIndexExist()) {
return;
// build rollover request for default result index
RolloverRequest defaultResultIndexRolloverRequest = buildRolloverRequest(resultIndexAlias, rolloverIndexPattern);
defaultResultIndexRolloverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());

// get config files that have custom result index alias to perform rollover on
getConfigsWithCustomResultIndexAlias(ActionListener.wrap(candidateResultAliases -> {
if (candidateResultAliases == null || candidateResultAliases.isEmpty()) {
// no custom result index alias found
if (!doesDefaultResultIndexExist()) {
// no default result index found either
return;
}
// perform rollover and delete on default result index
proceedWithDefaultRolloverAndDelete(
resultIndexAlias,
defaultResultIndexRolloverRequest,
allResultIndicesPattern,
resultIndex
);
logger.info("Candidate custom result indices are empty.");
return;
}

// perform rollover and delete on found custom result index alias
candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex));

}, e -> {
logger.error("Failed to get configs with custom result index alias.", e);
// perform rollover and delete on default result index if getting error on getting custom result index alias
proceedWithDefaultRolloverAndDelete(resultIndexAlias, defaultResultIndexRolloverRequest, allResultIndicesPattern, resultIndex);
}));
}

private void handleCustomResultIndex(Config config, IndexType resultIndex) {
RolloverRequest rolloverRequest = buildRolloverRequest(
config.getCustomResultIndexOrAlias(),
getCustomResultIndexPattern(config.getCustomResultIndexOrAlias())
);

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge()));
}
if (config.getCustomResultIndexMinSize() != null) {
rolloverRequest.addMaxIndexSizeCondition(new ByteSizeValue(config.getCustomResultIndexMinSize(), ByteSizeUnit.MB));
}

// We have to pass null for newIndexName in order to get Elastic to increment the index count.
RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
// perform rollover and delete on custom result index alias
proceedWithRolloverAndDelete(
config.getCustomResultIndexOrAlias(),
rolloverRequest,
getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()),
resultIndex,
config.getCustomResultIndexTTL()
);
}

private void proceedWithDefaultRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rolloverRequest,
String allResultIndicesPattern,
IndexType resultIndex
) {
proceedWithRolloverAndDelete(resultIndexAlias, rolloverRequest, allResultIndicesPattern, resultIndex, null);
}

private RolloverRequest buildRolloverRequest(String resultIndexAlias, String rolloverIndexPattern) {
RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest();

createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON);

choosePrimaryShards(createRequest, true);

rollOverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());
return rollOverRequest;
}

private void proceedWithRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rollOverRequest,
String allResultIndicesPattern,
IndexType resultIndex,
Integer customResultIndexTtl
) {
adminClient.indices().rolloverIndex(rollOverRequest, ActionListener.wrap(response -> {
if (!response.isRolledOver()) {
logger.warn("{} not rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
} else {
IndexState indexStatetate = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexStatetate.mappingUpToDate = true;
IndexState indexState = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexState.mappingUpToDate = true;
logger.info("{} rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
if (resultIndexAlias.startsWith(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX)
|| resultIndexAlias.startsWith(CUSTOM_RESULT_INDEX_PREFIX)) {
// handle custom result index deletion
if (customResultIndexTtl != null) {
deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueHours(customResultIndexTtl * 24));

}
} else {
// handle default result index deletion
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
}
}
}, exception -> {
// e.g., we may roll over too often. Since the index pattern is opensearch-ad-plugin-result-d-history-{now/d}-000001,
// we cannot roll over twice in the same day as the index with the same name exists. We will get
// resource_already_exists_exception.
logger.error("Fail to roll over result index", exception);
}));
}, exception -> { logger.error("Fail to roll over result index", exception); }));
}

protected void initResultIndexDirectly(
Expand Down
Loading

0 comments on commit 864de68

Please sign in to comment.