From 55c6945134ad72d1ed2b1ba63567c62e614e712d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 23 Aug 2024 18:59:13 +0000 Subject: [PATCH] Fix inference logic and standardize config index mapping (#1284) This commit addresses several issues and improvements in the inference logic and config index mapping: 1. Fixes in RealTimeInferencer: * Previously, we checked if the last update time of the model state was within the current interval and skipped inference if it was. However, this led to excessive skipping of inference because the last update time was updated when retrieving the model state from the cache. * Introduced lastSeenExecutionEndTime in the model state, which specifically tracks the last time a sample was processed during inference (not training). This ensures more accurate control over when inference should be skipped. 2. Consistent Naming in Config Index Mapping: * To maintain consistency across the codebase, changed defaultFill to default_fill in the Config index mapping, following the underscore naming convention used elsewhere. 3. Additional Null Checks: * Added more null checks for the defaultFill field in the Config constructor to improve robustness. Testing: * Added a smoke test to allow the job scheduler to trigger anomaly detection inferencing, successfully reproducing and verifying the fix for item #1.* added unit tests for item #3. Signed-off-by: Kaituo Li (cherry picked from commit 2922bbd47dd291d2ce3357cd22d8e946c6469281) Signed-off-by: github-actions[bot] --- .github/workflows/long_running.yml | 60 ++++++++++ build.gradle | 6 + .../dataprocessor/ImputationOption.java | 4 +- .../timeseries/ml/ModelManager.java | 1 + .../opensearch/timeseries/ml/ModelState.java | 10 ++ .../timeseries/ml/RealTimeInferencer.java | 12 +- .../opensearch/timeseries/model/Config.java | 25 +++-- src/main/resources/mappings/config.json | 2 +- .../ad/AbstractADSyntheticDataTest.java | 104 ++++++++++++++---- .../AbstractMissingSingleFeatureTestCase.java | 4 +- .../ad/e2e/HistoricalRuleModelPerfIT.java | 2 +- .../ad/e2e/MissingMultiFeatureIT.java | 2 +- .../ad/e2e/SingleStreamModelPerfIT.java | 19 ---- .../ad/e2e/SingleStreamSmokeIT.java | 78 +++++++++++++ .../opensearch/ad/indices/RolloverTests.java | 2 +- .../ad/model/AnomalyDetectorTests.java | 48 +++++++- .../dataprocessor/ImputationOptionTests.java | 2 +- 17 files changed, 314 insertions(+), 67 deletions(-) create mode 100644 .github/workflows/long_running.yml create mode 100644 src/test/java/org/opensearch/ad/e2e/SingleStreamSmokeIT.java diff --git a/.github/workflows/long_running.yml b/.github/workflows/long_running.yml new file mode 100644 index 000000000..e808b76d1 --- /dev/null +++ b/.github/workflows/long_running.yml @@ -0,0 +1,60 @@ +name: Run long running tests +on: + push: + branches: + - "*" + pull_request: + branches: + - "*" + +env: + ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true + +jobs: + Get-CI-Image-Tag: + uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main + with: + product: opensearch + + Run-Tests: + needs: Get-CI-Image-Tag + runs-on: ubuntu-latest + strategy: + matrix: + # each test scenario (rule, hc, single_stream) is treated as a separate job. + test: [smoke] + fail-fast: false + concurrency: + # The concurrency setting is used to limit the concurrency of each test scenario group to ensure they do not run concurrently on the same machine. + group: ${{ github.workflow }}-${{ matrix.test }} + name: Run long running tests + + container: + # using the same image which is used by opensearch-build team to build the OpenSearch Distribution + # this image tag is subject to change as more dependencies and updates will arrive over time + image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }} + # need to switch to root so that github actions can install runner binary on container without permission issues. + options: --user root + + steps: + - name: Setup Java + uses: actions/setup-java@v3 + with: + distribution: 'temurin' + java-version: 21 + + - name: Checkout AD + uses: actions/checkout@v3 + + - name: Build and Run Tests + run: | + chown -R 1000:1000 `pwd` + case ${{ matrix.test }} in + smoke) + su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.SingleStreamSmokeIT' \ + -Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \ + -Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \ + -Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dlong-running=true \ + -Dtests.timeoutSuite=3600000! -Dtest.logs=true" + ;; + esac diff --git a/build.gradle b/build.gradle index 1bc6092f3..efcb649f2 100644 --- a/build.gradle +++ b/build.gradle @@ -360,6 +360,12 @@ integTest { } } + if (System.getProperty("long-running") == null || System.getProperty("long-running") == "false") { + filter { + excludeTestsMatching "org.opensearch.ad.e2e.SingleStreamSmokeIT" + } + } + // The 'doFirst' delays till execution time. doFirst { // Tell the test JVM if the cluster JVM is running under a debugger so that tests can diff --git a/src/main/java/org/opensearch/timeseries/dataprocessor/ImputationOption.java b/src/main/java/org/opensearch/timeseries/dataprocessor/ImputationOption.java index 7147c753c..35d952170 100644 --- a/src/main/java/org/opensearch/timeseries/dataprocessor/ImputationOption.java +++ b/src/main/java/org/opensearch/timeseries/dataprocessor/ImputationOption.java @@ -26,7 +26,7 @@ public class ImputationOption implements Writeable, ToXContent { // field name in toXContent public static final String METHOD_FIELD = "method"; - public static final String DEFAULT_FILL_FIELD = "defaultFill"; + public static final String DEFAULT_FILL_FIELD = "default_fill"; private final ImputationMethod method; private final Map defaultFill; @@ -152,7 +152,7 @@ public int hashCode() { @Override public String toString() { - return new ToStringBuilder(this).append("method", method).append("defaultFill", defaultFill).toString(); + return new ToStringBuilder(this).append("method", method).append("default_fill", defaultFill).toString(); } public ImputationMethod getMethod() { diff --git a/src/main/java/org/opensearch/timeseries/ml/ModelManager.java b/src/main/java/org/opensearch/timeseries/ml/ModelManager.java index d2e557be3..efc774e02 100644 --- a/src/main/java/org/opensearch/timeseries/ml/ModelManager.java +++ b/src/main/java/org/opensearch/timeseries/ml/ModelManager.java @@ -169,6 +169,7 @@ public IntermediateResultType score( throw e; } finally { modelState.setLastUsedTime(clock.instant()); + modelState.setLastSeenExecutionEndTime(clock.instant()); } return createEmptyResult(); } diff --git a/src/main/java/org/opensearch/timeseries/ml/ModelState.java b/src/main/java/org/opensearch/timeseries/ml/ModelState.java index e2f914e8f..cf337cf3b 100644 --- a/src/main/java/org/opensearch/timeseries/ml/ModelState.java +++ b/src/main/java/org/opensearch/timeseries/ml/ModelState.java @@ -36,6 +36,7 @@ public class ModelState implements org.opensearch.timeseries.ExpiringState { // time when the ML model was used last time protected Instant lastUsedTime; protected Instant lastCheckpointTime; + protected Instant lastSeenExecutionEndTime; protected Clock clock; protected float priority; protected Deque samples; @@ -74,6 +75,7 @@ public ModelState( this.priority = priority; this.entity = entity; this.samples = samples; + this.lastSeenExecutionEndTime = Instant.MIN; } /** @@ -249,4 +251,12 @@ public Map getModelStateAsMap() { } }; } + + public Instant getLastSeenExecutionEndTime() { + return lastSeenExecutionEndTime; + } + + public void setLastSeenExecutionEndTime(Instant lastSeenExecutionEndTime) { + this.lastSeenExecutionEndTime = lastSeenExecutionEndTime; + } } diff --git a/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java b/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java index 9ca57309d..30b1a79a7 100644 --- a/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java +++ b/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java @@ -5,6 +5,7 @@ package org.opensearch.timeseries.ml; +import java.time.Instant; import java.util.Collections; import java.util.Locale; import java.util.Map; @@ -134,11 +135,14 @@ private boolean processWithTimeout( } private boolean tryProcess(Sample sample, ModelState modelState, Config config, String taskId, long curExecutionEnd) { - // execution end time (when job starts execution in this interval) >= last used time => the model state is updated in + // execution end time (when job starts execution in this interval) >= last seen execution end time => the model state is updated in // previous intervals - // This can happen while scheduled to waiting some other threads have already scored the same interval (e.g., during tests - // when everything happens fast) - if (curExecutionEnd < modelState.getLastUsedTime().toEpochMilli()) { + // This branch being true can happen while scheduled to waiting some other threads have already scored the same interval + // (e.g., during tests when everything happens fast) + // We cannot use last used time as it will be updated whenever we update its priority in CacheBuffer.update when there is a + // PriorityCache.get. + if (modelState.getLastSeenExecutionEndTime() != Instant.MIN + && curExecutionEnd < modelState.getLastSeenExecutionEndTime().toEpochMilli()) { return false; } String modelId = modelState.getModelId(); diff --git a/src/main/java/org/opensearch/timeseries/model/Config.java b/src/main/java/org/opensearch/timeseries/model/Config.java index f814a8832..ace9525f3 100644 --- a/src/main/java/org/opensearch/timeseries/model/Config.java +++ b/src/main/java/org/opensearch/timeseries/model/Config.java @@ -225,29 +225,36 @@ protected Config( : features.stream().filter(Feature::getEnabled).collect(Collectors.toList()); Map defaultFill = imputationOption.getDefaultFill(); - if (defaultFill.isEmpty() && enabledFeatures.size() > 0) { + + // Case 1: enabledFeatures == null && defaultFill != null + if (enabledFeatures == null && defaultFill != null && !defaultFill.isEmpty()) { + issueType = ValidationIssueType.IMPUTATION; + errorMessage = "Enabled features list is null, but default fill values are provided."; + return; + } + + // Case 2: enabledFeatures != null && defaultFill == null + if (enabledFeatures != null && (defaultFill == null || defaultFill.isEmpty())) { issueType = ValidationIssueType.IMPUTATION; - errorMessage = "No given values for fixed value imputation"; + errorMessage = "Enabled features are present, but no default fill values are provided."; return; } - // Check if the length of the defaultFill array matches the number of expected features - if (enabledFeatures == null || defaultFill.size() != enabledFeatures.size()) { + // Case 3: enabledFeatures.size() != defaultFill.size() + if (enabledFeatures != null && defaultFill != null && defaultFill.size() != enabledFeatures.size()) { issueType = ValidationIssueType.IMPUTATION; errorMessage = String .format( Locale.ROOT, - "Incorrect number of values to fill. Got: %d. Expected: %d.", + "Mismatch between the number of enabled features and default fill values. Number of default fill values: %d. Number of enabled features: %d.", defaultFill.size(), - enabledFeatures == null ? 0 : enabledFeatures.size() + enabledFeatures.size() ); return; } - Map defaultFills = imputationOption.getDefaultFill(); - for (int i = 0; i < enabledFeatures.size(); i++) { - if (!defaultFills.containsKey(enabledFeatures.get(i).getName())) { + if (!defaultFill.containsKey(enabledFeatures.get(i).getName())) { issueType = ValidationIssueType.IMPUTATION; errorMessage = String.format(Locale.ROOT, "Missing feature name: %s.", enabledFeatures.get(i).getName()); return; diff --git a/src/main/resources/mappings/config.json b/src/main/resources/mappings/config.json index 36663ad37..89b334f90 100644 --- a/src/main/resources/mappings/config.json +++ b/src/main/resources/mappings/config.json @@ -174,7 +174,7 @@ "method": { "type": "keyword" }, - "defaultFill": { + "default_fill": { "type": "nested", "properties": { "feature_name": { diff --git a/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java b/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java index ef5afa1c5..f840bce73 100644 --- a/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java +++ b/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java @@ -19,6 +19,7 @@ import java.nio.charset.Charset; import java.time.Duration; import java.time.Instant; +import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; @@ -163,13 +164,59 @@ protected Map previewWithFailure(String detector, Instant begin, return entityAsMap(response); } + protected List getAnomalyResultByDataTime( + String detectorId, + Instant end, + int entitySize, + RestClient client, + boolean approximateEndTime, + long rangeDurationMillis + ) throws InterruptedException { + return getAnomalyResult( + detectorId, + end, + entitySize, + client, + approximateEndTime, + rangeDurationMillis, + "data_end_time", + (h, eSize) -> h.size() == eSize, + entitySize + ); + } + + protected List getAnomalyResultByExecutionTime( + String detectorId, + Instant end, + int entitySize, + RestClient client, + boolean approximateEndTime, + long rangeDurationMillis, + int expectedResultSize + ) throws InterruptedException { + return getAnomalyResult( + detectorId, + end, + entitySize, + client, + approximateEndTime, + rangeDurationMillis, + "execution_end_time", + (h, eSize) -> h.size() >= eSize, + expectedResultSize + ); + } + protected List getAnomalyResult( String detectorId, Instant end, int entitySize, RestClient client, - boolean approximateDataEndTime, - long intervalMillis + boolean approximateEndTime, + long rangeDurationMillis, + String endTimeField, + ConditionChecker checker, + int expectedResultSize ) throws InterruptedException { Request request = new Request("POST", "/_plugins/_anomaly_detection/detectors/results/_search"); @@ -191,12 +238,12 @@ protected List getAnomalyResult( + " },\n" + " {\n" + " \"range\": {\n" - + " \"data_end_time\": {\n"; + + " \"%s\": {\n"; StringBuilder jsonTemplate = new StringBuilder(); jsonTemplate.append(jsonTemplatePrefix); - if (approximateDataEndTime) { + if (approximateEndTime) { // we may get two interval results if using gte jsonTemplate.append(" \"gt\": %d,\n \"lte\": %d\n"); } else { @@ -217,10 +264,11 @@ protected List getAnomalyResult( long dateEndTime = end.toEpochMilli(); String formattedJson = null; - if (approximateDataEndTime) { - formattedJson = String.format(Locale.ROOT, jsonTemplate.toString(), detectorId, dateEndTime - intervalMillis, dateEndTime); + if (approximateEndTime) { + formattedJson = String + .format(Locale.ROOT, jsonTemplate.toString(), detectorId, endTimeField, dateEndTime - rangeDurationMillis, dateEndTime); } else { - formattedJson = String.format(Locale.ROOT, jsonTemplate.toString(), detectorId, dateEndTime, dateEndTime); + formattedJson = String.format(Locale.ROOT, jsonTemplate.toString(), detectorId, endTimeField, dateEndTime, dateEndTime); } request.setJsonEntity(formattedJson); @@ -231,25 +279,16 @@ protected List getAnomalyResult( do { try { JsonArray hits = getHits(client, request); - if (hits != null && hits.size() == entitySize) { - assertTrue("empty response", hits != null); - assertTrue("returned more than " + hits.size() + " results.", hits.size() == entitySize); + if (hits != null && checker.checkCondition(hits, entitySize)) { List res = new ArrayList<>(); - for (int i = 0; i < entitySize; i++) { + for (int i = 0; i < hits.size(); i++) { JsonObject source = hits.get(i).getAsJsonObject().get("_source").getAsJsonObject(); res.add(source); } return res; } else { - LOG - .info( - "wait for result, previous result: {}, size: {}, eval result {}, expected {}", - hits, - hits.size(), - hits != null && hits.size() == entitySize, - entitySize - ); + LOG.info("wait for result, previous result: {}, size: {}", hits, hits.size()); client.performRequest(new Request("POST", String.format(Locale.ROOT, "/%s/_refresh", ".opendistro-anomaly-results*"))); } Thread.sleep(2_000 * entitySize); @@ -275,7 +314,7 @@ protected List getAnomalyResult( protected List getRealTimeAnomalyResult(String detectorId, Instant end, int entitySize, RestClient client) throws InterruptedException { - return getAnomalyResult(detectorId, end, entitySize, client, false, 0); + return getAnomalyResultByDataTime(detectorId, end, entitySize, client, false, 0); } public double getAnomalyGrade(JsonObject source) { @@ -462,7 +501,7 @@ protected List waitForHistoricalDetector( Thread.sleep(1_000); - List sourceList = getAnomalyResult(detectorId, end, entitySize, client, true, intervalMillis); + List sourceList = getAnomalyResultByDataTime(detectorId, end, entitySize, client, true, intervalMillis); if (sourceList.size() > 0 && getAnomalyGrade(sourceList.get(0)) >= 0) { return sourceList; } @@ -624,7 +663,30 @@ protected List startHistoricalDetector( ); } + protected long getWindowDelayMinutes(List data, int trainTestSplit, String timestamp) { + // e.g., "2019-11-02T00:59:00Z" + String trainTimeStr = data.get(trainTestSplit - 1).get("timestamp").getAsString(); + Instant trainTime = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(trainTimeStr)); + /* + * The {@code CompositeRetriever.PageIterator.hasNext()} method checks if a request is expired + * relative to the current system time. This method is designed to ensure that the execution time + * is set to either the current time or a future time to prevent premature expirations in our tests. + * + * Also, AD accepts windowDelay in the unit of minutes. Thus, we need to convert the delay in minutes. This will + * make it easier to search for results based on data end time. Otherwise, real data time and the converted + * data time from request time. + * Assume x = real data time. y= real window delay. y'= window delay in minutes. If y and y' are different, + * x + y - y' != x. + */ + return Duration.between(trainTime, Instant.now()).toMinutes(); + } + public static boolean areDoublesEqual(double d1, double d2) { return Math.abs(d1 - d2) < EPSILON; } + + @FunctionalInterface + public interface ConditionChecker { + boolean checkCondition(JsonArray hits, int expectedSize); + } } diff --git a/src/test/java/org/opensearch/ad/e2e/AbstractMissingSingleFeatureTestCase.java b/src/test/java/org/opensearch/ad/e2e/AbstractMissingSingleFeatureTestCase.java index 3eec05657..a14da76cb 100644 --- a/src/test/java/org/opensearch/ad/e2e/AbstractMissingSingleFeatureTestCase.java +++ b/src/test/java/org/opensearch/ad/e2e/AbstractMissingSingleFeatureTestCase.java @@ -66,7 +66,7 @@ protected String genDetector( case FIXED_VALUES: sb .append( - "\"imputation_option\": { \"method\": \"fixed_values\", \"defaultFill\": [{ \"feature_name\" : \"feature 1\", \"data\": 1 }] }," + "\"imputation_option\": { \"method\": \"fixed_values\", \"default_fill\": [{ \"feature_name\" : \"feature 1\", \"data\": 1 }] }," ); break; } @@ -212,7 +212,7 @@ && scoreOneResult( if (realTime) { sourceList = getRealTimeAnomalyResult(detectorId, end, numberOfEntities, client()); } else { - sourceList = getAnomalyResult(detectorId, end, numberOfEntities, client(), true, intervalMillis); + sourceList = getAnomalyResultByDataTime(detectorId, end, numberOfEntities, client(), true, intervalMillis); } assertTrue( diff --git a/src/test/java/org/opensearch/ad/e2e/HistoricalRuleModelPerfIT.java b/src/test/java/org/opensearch/ad/e2e/HistoricalRuleModelPerfIT.java index a5c19d4c7..17ca5cb6d 100644 --- a/src/test/java/org/opensearch/ad/e2e/HistoricalRuleModelPerfIT.java +++ b/src/test/java/org/opensearch/ad/e2e/HistoricalRuleModelPerfIT.java @@ -106,7 +106,7 @@ private Triple, Integer, Map>> getTes Instant begin = Instant.ofEpochMilli(Long.parseLong(beginTimeStampAsString)); Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); try { - List sourceList = getAnomalyResult(detectorId, end, entitySize, client, true, intervalMillis); + List sourceList = getAnomalyResultByDataTime(detectorId, end, entitySize, client, true, intervalMillis); analyzeResults(anomalies, res, foundWindow, beginTimeStampAsString, entitySize, begin, sourceList); } catch (Exception e) { errors++; diff --git a/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java b/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java index 2f715041f..0b5708c5a 100644 --- a/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java +++ b/src/test/java/org/opensearch/ad/e2e/MissingMultiFeatureIT.java @@ -218,7 +218,7 @@ protected String genDetector( case FIXED_VALUES: sb .append( - "\"imputation_option\": { \"method\": \"fixed_values\", \"defaultFill\": [{ \"feature_name\" : \"feature 1\", \"data\": 1 }, { \"feature_name\" : \"feature 2\", \"data\": 2 }] }," + "\"imputation_option\": { \"method\": \"fixed_values\", \"default_fill\": [{ \"feature_name\" : \"feature 1\", \"data\": 1 }, { \"feature_name\" : \"feature 2\", \"data\": 2 }] }," ); break; } diff --git a/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java b/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java index ea311efa0..63d1fdcb6 100644 --- a/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java +++ b/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java @@ -97,7 +97,6 @@ private void verifyAnomaly( .from(DateTimeFormatter.ISO_INSTANT.parse(data.get(trainTestSplit - 1).get("timestamp").getAsString())); Instant dataEndTime = dataStartTime.plus(intervalMinutes, ChronoUnit.MINUTES); Instant trainTime = dataToExecutionTime(dataStartTime, windowDelay); - ; Instant executionStartTime = trainTime; Instant executionEndTime = executionStartTime.plus(intervalMinutes, ChronoUnit.MINUTES); @@ -222,22 +221,4 @@ private void bulkIndexTestData(List data, String datasetName, int tr Thread.sleep(1_000); waitAllSyncheticDataIngested(data.size(), datasetName, client); } - - protected long getWindowDelayMinutes(List data, int trainTestSplit, String timestamp) { - // e.g., "2019-11-02T00:59:00Z" - String trainTimeStr = data.get(trainTestSplit - 1).get("timestamp").getAsString(); - Instant trainTime = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(trainTimeStr)); - /* - * The {@code CompositeRetriever.PageIterator.hasNext()} method checks if a request is expired - * relative to the current system time. This method is designed to ensure that the execution time - * is set to either the current time or a future time to prevent premature expirations in our tests. - * - * Also, AD accepts windowDelay in the unit of minutes. Thus, we need to convert the delay in minutes. This will - * make it easier to search for results based on data end time. Otherwise, real data time and the converted - * data time from request time. - * Assume x = real data time. y= real window delay. y'= window delay in minutes. If y and y' are different, - * x + y - y' != x. - */ - return Duration.between(trainTime, Instant.now()).toMinutes(); - } } diff --git a/src/test/java/org/opensearch/ad/e2e/SingleStreamSmokeIT.java b/src/test/java/org/opensearch/ad/e2e/SingleStreamSmokeIT.java new file mode 100644 index 000000000..f8127d672 --- /dev/null +++ b/src/test/java/org/opensearch/ad/e2e/SingleStreamSmokeIT.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.e2e; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Locale; + +import org.opensearch.ad.AbstractADSyntheticDataTest; + +import com.google.gson.JsonObject; + +/** + * Test that is meant to run with job scheduler to test if we have at least consecutive results generated. + * + */ +public class SingleStreamSmokeIT extends AbstractADSyntheticDataTest { + + public void testGenerateResult() throws Exception { + String datasetName = "synthetic"; + String dataFileName = String.format(Locale.ROOT, "data/%s.data", datasetName); + int intervalsToWait = 3; + + List data = getData(dataFileName); + + String mapping = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\": \"date\"}," + + " \"Feature1\": { \"type\": \"double\" }, \"Feature2\": { \"type\": \"double\" } } } }"; + int trainTestSplit = 1500; + // train data plus a few data points for real time inference + bulkIndexTrainData(datasetName, data, trainTestSplit + intervalsToWait + 3, client(), mapping); + + long windowDelayMinutes = getWindowDelayMinutes(data, trainTestSplit - 1, "timestamp"); + int intervalMinutes = 1; + + // single-stream detector can use window delay 0 here because we give the run api the actual data time + String detector = String + .format( + Locale.ROOT, + "{ \"name\": \"test\", \"description\": \"test\", \"time_field\": \"timestamp\"" + + ", \"indices\": [\"%s\"], \"feature_attributes\": [{ \"feature_name\": \"feature 1\", \"feature_enabled\": " + + "\"true\", \"aggregation_query\": { \"Feature1\": { \"sum\": { \"field\": \"Feature1\" } } } }, { \"feature_name\"" + + ": \"feature 2\", \"feature_enabled\": \"true\", \"aggregation_query\": { \"Feature2\": { \"sum\": { \"field\": " + + "\"Feature2\" } } } }], \"detection_interval\": { \"period\": { \"interval\": %d, \"unit\": \"Minutes\" } }, " + + "\"window_delay\": { \"period\": {\"interval\": %d, \"unit\": \"MINUTES\"}}," + + "\"schema_version\": 0 }", + datasetName, + intervalMinutes, + windowDelayMinutes + ); + String detectorId = createDetector(client(), detector); + + startDetector(detectorId, client()); + + long waitMinutes = intervalMinutes * (intervalsToWait + 1); + // wait for scheduler to trigger AD + Thread.sleep(Duration.ofMinutes(waitMinutes)); + + List results = getAnomalyResultByExecutionTime( + detectorId, + Instant.now(), + 1, + client(), + true, + waitMinutes * 60000, + intervalsToWait + ); + + assertTrue( + String.format(Locale.ROOT, "Expect at least %d but got %d", intervalsToWait, results.size()), + results.size() >= intervalsToWait + ); + } + +} diff --git a/src/test/java/org/opensearch/ad/indices/RolloverTests.java b/src/test/java/org/opensearch/ad/indices/RolloverTests.java index 2d847d266..865858dcd 100644 --- a/src/test/java/org/opensearch/ad/indices/RolloverTests.java +++ b/src/test/java/org/opensearch/ad/indices/RolloverTests.java @@ -304,7 +304,7 @@ private void setUpGetConfigs_withNoCustomResultIndexAlias() { + "\"feature_attributes\":[{\"feature_id\":\"OTYJs\",\"feature_name\":\"eYYCM\",\"feature_enabled\":false," + "\"aggregation_query\":{\"XzewX\":{\"value_count\":{\"field\":\"ok\"}}}}],\"recency_emphasis\":3342," + "\"history\":62,\"last_update_time\":1717192049845,\"category_field\":[\"Tcqcb\"],\"customResultIndexOrAlias\":" - + "\"\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"defaultFill\"" + + "\"\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"default_fill\"" + ":[],\"integerSensitive\":false},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":" + "{\"interval\":5,\"unit\":\"Minutes\"}},\"detector_type\":\"MULTI_ENTITY\",\"rules\":[]}"; diff --git a/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java b/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java index c5d236b29..d88558ab3 100644 --- a/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java +++ b/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java @@ -43,9 +43,12 @@ import org.opensearch.timeseries.TestHelpers; import org.opensearch.timeseries.common.exception.ValidationException; import org.opensearch.timeseries.constant.CommonMessages; +import org.opensearch.timeseries.dataprocessor.ImputationMethod; +import org.opensearch.timeseries.dataprocessor.ImputationOption; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.Feature; import org.opensearch.timeseries.model.IntervalTimeConfiguration; +import org.opensearch.timeseries.model.ValidationIssueType; import org.opensearch.timeseries.settings.TimeSeriesSettings; import com.google.common.collect.ImmutableList; @@ -894,7 +897,7 @@ public void testParseAnomalyDetector_withCustomIndex_withCustomResultIndexMinSiz + "\"feature_attributes\":[{\"feature_id\":\"OTYJs\",\"feature_name\":\"eYYCM\",\"feature_enabled\":true," + "\"aggregation_query\":{\"XzewX\":{\"value_count\":{\"field\":\"ok\"}}}}],\"recency_emphasis\":3342," + "\"history\":62,\"last_update_time\":1717192049845,\"category_field\":[\"Tcqcb\"],\"result_index\":" - + "\"opensearch-ad-plugin-result-test\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"defaultFill\"" + + "\"opensearch-ad-plugin-result-test\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"default_fill\"" + ":[{\"feature_name\":\"eYYCM\", \"data\": 3}]},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":" + "{\"interval\":5,\"unit\":\"Minutes\"}},\"detector_type\":\"MULTI_ENTITY\",\"rules\":[],\"result_index_min_size\":1500}"; AnomalyDetector parsedDetector = AnomalyDetector.parse(TestHelpers.parser(detectorString), "id", 1L, null, null); @@ -921,7 +924,7 @@ public void testParseAnomalyDetector_withCustomIndex_withCustomResultIndexMinAge + "\"feature_attributes\":[{\"feature_id\":\"OTYJs\",\"feature_name\":\"eYYCM\",\"feature_enabled\":true," + "\"aggregation_query\":{\"XzewX\":{\"value_count\":{\"field\":\"ok\"}}}}],\"recency_emphasis\":3342," + "\"history\":62,\"last_update_time\":1717192049845,\"category_field\":[\"Tcqcb\"],\"result_index\":" - + "\"opensearch-ad-plugin-result-test\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"defaultFill\"" + + "\"opensearch-ad-plugin-result-test\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"default_fill\"" + ":[{\"feature_name\":\"eYYCM\", \"data\": 3}]},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":" + "{\"interval\":5,\"unit\":\"Minutes\"}},\"detector_type\":\"MULTI_ENTITY\",\"rules\":[],\"result_index_min_age\":7}"; AnomalyDetector parsedDetector = AnomalyDetector.parse(TestHelpers.parser(detectorString), "id", 1L, null, null); @@ -936,7 +939,7 @@ public void testParseAnomalyDetector_withCustomIndex_withCustomResultIndexTTL() + "\"feature_attributes\":[{\"feature_id\":\"OTYJs\",\"feature_name\":\"eYYCM\",\"feature_enabled\":true," + "\"aggregation_query\":{\"XzewX\":{\"value_count\":{\"field\":\"ok\"}}}}],\"recency_emphasis\":3342," + "\"history\":62,\"last_update_time\":1717192049845,\"category_field\":[\"Tcqcb\"],\"result_index\":" - + "\"opensearch-ad-plugin-result-test\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"defaultFill\"" + + "\"opensearch-ad-plugin-result-test\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"default_fill\"" + ":[{\"feature_name\":\"eYYCM\", \"data\": 3}]},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":" + "{\"interval\":5,\"unit\":\"Minutes\"}},\"detector_type\":\"MULTI_ENTITY\",\"rules\":[],\"result_index_ttl\":30}"; AnomalyDetector parsedDetector = AnomalyDetector.parse(TestHelpers.parser(detectorString), "id", 1L, null, null); @@ -951,8 +954,7 @@ public void testParseAnomalyDetector_withCustomIndex_withFlattenResultIndexMappi + "\"feature_attributes\":[{\"feature_id\":\"OTYJs\",\"feature_name\":\"eYYCM\",\"feature_enabled\":false," + "\"aggregation_query\":{\"XzewX\":{\"value_count\":{\"field\":\"ok\"}}}}],\"recency_emphasis\":3342," + "\"history\":62,\"last_update_time\":1717192049845,\"category_field\":[\"Tcqcb\"],\"result_index\":" - + "\"opensearch-ad-plugin-result-test\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"defaultFill\"" - + ":[],\"integerSensitive\":false},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":" + + "\"opensearch-ad-plugin-result-test\",\"imputation_option\":{\"method\":\"ZERO\"},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":" + "{\"interval\":5,\"unit\":\"Minutes\"}},\"detector_type\":\"MULTI_ENTITY\",\"rules\":[],\"flatten_result_index_mapping\":true}"; AnomalyDetector parsedDetector = AnomalyDetector.parse(TestHelpers.parser(detectorString), "id", 1L, null, null); assertEquals(true, (boolean) parsedDetector.getFlattenResultIndexMapping()); @@ -992,4 +994,40 @@ public void testSerializeAndDeserializeAnomalyDetector() throws IOException { Assert.assertEquals(deserializedDetector, detector); Assert.assertEquals(deserializedDetector.getSeasonIntervals(), detector.getSeasonIntervals()); } + + public void testNullFixedValue() throws IOException { + org.opensearch.timeseries.common.exception.ValidationException e = assertThrows( + org.opensearch.timeseries.common.exception.ValidationException.class, + () -> new AnomalyDetector( + randomAlphaOfLength(5), + randomLong(), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + ImmutableList.of(randomAlphaOfLength(5)), + ImmutableList.of(TestHelpers.randomFeature()), + TestHelpers.randomQuery(), + TestHelpers.randomIntervalTimeConfiguration(), + TestHelpers.randomIntervalTimeConfiguration(), + null, + null, + 1, + Instant.now(), + null, + TestHelpers.randomUser(), + null, + new ImputationOption(ImputationMethod.FIXED_VALUES, null), + randomIntBetween(1, 10000), + randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), + randomIntBetween(1, 1000), + null, + null, + null, + null, + null + ) + ); + assertEquals("Got: " + e.getMessage(), "Enabled features are present, but no default fill values are provided.", e.getMessage()); + assertEquals("Got :" + e.getType(), ValidationIssueType.IMPUTATION, e.getType()); + } } diff --git a/src/test/java/org/opensearch/timeseries/dataprocessor/ImputationOptionTests.java b/src/test/java/org/opensearch/timeseries/dataprocessor/ImputationOptionTests.java index df1d4a2e9..cbefcfe0f 100644 --- a/src/test/java/org/opensearch/timeseries/dataprocessor/ImputationOptionTests.java +++ b/src/test/java/org/opensearch/timeseries/dataprocessor/ImputationOptionTests.java @@ -41,7 +41,7 @@ public static void setUpOnce() { xContent = "{" + "\"method\":\"FIXED_VALUES\"," - + "\"defaultFill\":[{\"feature_name\":\"a\", \"data\":1.0},{\"feature_name\":\"b\", \"data\":2.0},{\"feature_name\":\"c\", \"data\":3.0}]}"; + + "\"default_fill\":[{\"feature_name\":\"a\", \"data\":1.0},{\"feature_name\":\"b\", \"data\":2.0},{\"feature_name\":\"c\", \"data\":3.0}]}"; } private Map randomMap(double[] defaultFill) {