From bf1e82bf1f586b59051e71ca7dc8ac488c79326a Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Wed, 19 Jun 2024 12:42:41 -0700 Subject: [PATCH] Update BWC Test Version and Enhance Code Coverage This PR updates the BWC test version to 2.16 in the build.gradle file and includes additional integration and unit tests to improve code coverage. Testing Performed: * Executed gradle build to ensure successful build and integration. Signed-off-by: Kaituo Li --- .github/workflows/benchmark.yml | 4 +- .github/workflows/test_security.yml | 6 +- build-tools/coverage.gradle | 49 ++ build.gradle | 18 +- .../settings/ForecastEnabledSetting.java | 9 +- .../opensearch/timeseries/JobProcessor.java | 4 + .../opensearch/timeseries/util/TaskUtil.java | 41 -- ...ndexAnomalyDetectorActionHandlerTests.java | 2 - ....java => AbstractADSyntheticDataTest.java} | 268 ++++------- .../ad/AnomalyDetectorJobRunnerTests.java | 164 ++++++- .../ad/AnomalyDetectorRestTestCase.java | 74 +-- .../ad/HistoricalAnalysisRestTestCase.java | 25 +- .../ad/caching/PriorityCacheTests.java | 38 ++ .../ad/e2e/AbstractRuleTestCase.java | 215 +++++++++ .../ad/e2e/DetectionResultEvalutationIT.java | 3 +- .../java/org/opensearch/ad/e2e/RuleIT.java | 92 ++++ .../opensearch/ad/e2e/RuleModelPerfIT.java | 117 +---- .../ad/e2e/SingleStreamModelPerfIT.java | 3 +- .../ad/rest/HistoricalAnalysisRestApiIT.java | 25 +- .../AbstractForecastSyntheticDataTest.java | 57 +++ .../forecast/rest/ForecastRestApiIT.java | 455 ++++++++++++++++++ .../settings/ForecastEnabledSettingTests.java | 4 +- .../timeseries/AbstractSyntheticDataTest.java | 157 ++++++ .../{ad => timeseries}/ODFERestTestCase.java | 118 +++-- .../opensearch/timeseries/TestHelpers.java | 100 ++++ 25 files changed, 1568 insertions(+), 480 deletions(-) create mode 100644 build-tools/coverage.gradle delete mode 100644 src/main/java/org/opensearch/timeseries/util/TaskUtil.java rename src/test/java/org/opensearch/ad/{e2e/AbstractSyntheticDataTest.java => AbstractADSyntheticDataTest.java} (57%) create mode 100644 src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java create mode 100644 src/test/java/org/opensearch/ad/e2e/RuleIT.java create mode 100644 src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java create mode 100644 src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java create mode 100644 src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java rename src/test/java/org/opensearch/{ad => timeseries}/ODFERestTestCase.java (77%) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 9c5797f1d..8a16b0790 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -58,12 +58,12 @@ jobs: su `id -un 1000` -c "./gradlew ':test' --tests 'org.opensearch.ad.ml.HCADModelPerfTests' \ -Dtests.seed=2AEBDBBAE75AC5E0 -Dtests.security.manager=false \ -Dtests.locale=es-CU -Dtests.timezone=Chile/EasterIsland -Dtest.logs=true \ - -Dmodel-benchmark=true" + -Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true" ;; single_stream) su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.SingleStreamModelPerfIT' \ -Dtests.seed=60CDDB34427ACD0C -Dtests.security.manager=false \ -Dtests.locale=kab-DZ -Dtests.timezone=Asia/Hebron -Dtest.logs=true \ - -Dmodel-benchmark=true" + -Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true" ;; esac diff --git a/.github/workflows/test_security.yml b/.github/workflows/test_security.yml index 935b6d166..1f9ce669b 100644 --- a/.github/workflows/test_security.yml +++ b/.github/workflows/test_security.yml @@ -37,9 +37,9 @@ jobs: - name: Pull and Run Docker run: | plugin=`basename $(ls build/distributions/*.zip)` - version=`echo $plugin|awk -F- '{print $5}'| cut -d. -f 1-3` - plugin_version=`echo $plugin|awk -F- '{print $5}'| cut -d. -f 1-4` - qualifier=`echo $plugin|awk -F- '{print $6}'| cut -d. -f 1-1` + version=`echo $plugin|awk -F- '{print $4}'| cut -d. -f 1-3` + plugin_version=`echo $plugin|awk -F- '{print $4}'| cut -d. -f 1-4` + qualifier=`echo $plugin|awk -F- '{print $5}'| cut -d. -f 1-1` if $qualifier!=SNAPSHOT then diff --git a/build-tools/coverage.gradle b/build-tools/coverage.gradle new file mode 100644 index 000000000..eb3582dab --- /dev/null +++ b/build-tools/coverage.gradle @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +apply plugin: 'jacoco' + +jacoco { + toolVersion = "0.8.10" +} + +/** + * This code sets up coverage reporting manually for the k-NN plugin tests. This is complicated because: + * 1. The OS integTest Task doesn't implement Gradle's JavaForkOptions so we have to manually start the jacoco agent with the test JVM + * 2. The cluster nodes are stopped using 'kill -9' which means jacoco can't dump it's execution output to a file on VM shutdown + * 3. The Java Security Manager prevents JMX from writing execution output to the file. + * + * To workaround these we start the cluster with jmx enabled and then use Jacoco's JMX MBean to get the execution data before the + * cluster is stopped and dump it to a file. Luckily our current security policy seems to allow this. This will also probably + * break if there are multiple nodes in the integTestCluster. But for now... it sorta works. + */ +integTest { + jacoco { + jmx = true + } + + systemProperty 'jacoco.dir', project.layout.buildDirectory.get().file("jacoco").asFile.absolutePath + systemProperty 'jmx.serviceUrl', "service:jmx:rmi:///jndi/rmi://127.0.0.1:7777/jmxrmi" +} + +jacocoTestReport { + dependsOn integTest, test + executionData.from = [integTest.jacoco.destinationFile, test.jacoco.destinationFile] + reports { + html.getRequired().set(true) // human readable + csv.getRequired().set(true) + xml.getRequired().set(true) // for coverlay + } +} + +testClusters.integTest { + jvmArgs " ${integTest.jacoco.getAsJvmArg()}" + + systemProperty 'com.sun.management.jmxremote', "true" + systemProperty 'com.sun.management.jmxremote.authenticate', "false" + systemProperty 'com.sun.management.jmxremote.port', "7777" + systemProperty 'com.sun.management.jmxremote.ssl', "false" + systemProperty 'java.rmi.server.hostname', "127.0.0.1" +} diff --git a/build.gradle b/build.gradle index 052e0f536..b31561343 100644 --- a/build.gradle +++ b/build.gradle @@ -35,7 +35,7 @@ buildscript { js_resource_folder = "src/test/resources/job-scheduler" common_utils_version = System.getProperty("common_utils.version", opensearch_build) job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) - bwcVersionShort = "2.15.0" + bwcVersionShort = "2.16.0" bwcVersion = bwcVersionShort + ".0" bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' + 'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip' @@ -662,6 +662,13 @@ task release(type: Copy, group: 'build') { eachFile { it.path = it.path - "opensearch/" } } +def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster') +def usingMultiNode = project.properties.containsKey('numNodes') +// Only apply jacoco test coverage if we are running a local single node cluster +if (!usingRemoteCluster && !usingMultiNode) { + apply from: 'build-tools/coverage.gradle' +} + List jacocoExclusions = [ // code for configuration, settings, etc is excluded from coverage 'org.opensearch.timeseries.TimeSeriesAnalyticsPlugin', @@ -701,6 +708,8 @@ List jacocoExclusions = [ jacocoTestCoverageVerification { + dependsOn(jacocoTestReport) + executionData.from = [integTest.jacoco.destinationFile, test.jacoco.destinationFile] violationRules { rule { element = 'CLASS' @@ -722,13 +731,6 @@ jacocoTestCoverageVerification { } } -jacocoTestReport { - reports { - xml.required = true // for coverlay - html.required = true // human readable - } -} - check.dependsOn jacocoTestCoverageVerification jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java b/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java index b1635f9d3..4984b822d 100644 --- a/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java +++ b/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java @@ -27,7 +27,8 @@ public class ForecastEnabledSetting extends DynamicNumericSetting { */ public static final String FORECAST_ENABLED = "plugins.forecast.enabled"; - public static final boolean enabled = false; + // TODO: remove the field when releasing forecasting + public static boolean enabled = false; public static final Map> settings = unmodifiableMap(new HashMap>() { { @@ -55,8 +56,10 @@ public static synchronized ForecastEnabledSetting getInstance() { * @return whether forecasting is enabled. */ public static boolean isForecastEnabled() { - // return ForecastEnabledSetting.getInstance().getSettingValue(ForecastEnabledSetting.FORECAST_ENABLED); + // keep the dynamic setting in main branch to enable writing tests. + // will make it hardcoded false in the 2.x branch. + return ForecastEnabledSetting.getInstance().getSettingValue(ForecastEnabledSetting.FORECAST_ENABLED); // TODO: enable forecasting before released - return enabled; + // return enabled; } } diff --git a/src/main/java/org/opensearch/timeseries/JobProcessor.java b/src/main/java/org/opensearch/timeseries/JobProcessor.java index 03a555845..f9b4863e9 100644 --- a/src/main/java/org/opensearch/timeseries/JobProcessor.java +++ b/src/main/java/org/opensearch/timeseries/JobProcessor.java @@ -579,5 +579,9 @@ private void releaseLock(Job jobParameter, LockService lockService, LockModel lo ); } + public Integer getEndRunExceptionCount(String configId) { + return endRunExceptionCount.getOrDefault(configId, 0); + } + protected abstract ResultRequest createResultRequest(String configID, long start, long end); } diff --git a/src/main/java/org/opensearch/timeseries/util/TaskUtil.java b/src/main/java/org/opensearch/timeseries/util/TaskUtil.java deleted file mode 100644 index a92d81043..000000000 --- a/src/main/java/org/opensearch/timeseries/util/TaskUtil.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.timeseries.util; - -import static org.opensearch.ad.model.ADTaskType.ALL_HISTORICAL_TASK_TYPES; -import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES; -import static org.opensearch.ad.model.ADTaskType.REALTIME_TASK_TYPES; - -import java.util.List; - -import org.opensearch.forecast.model.ForecastTaskType; -import org.opensearch.timeseries.AnalysisType; -import org.opensearch.timeseries.model.DateRange; -import org.opensearch.timeseries.model.TaskType; - -public class TaskUtil { - public static List getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag, AnalysisType analysisType) { - if (analysisType == AnalysisType.FORECAST) { - if (dateRange == null) { - return ForecastTaskType.REALTIME_TASK_TYPES; - } else { - throw new UnsupportedOperationException("Forecasting does not support historical tasks"); - } - } else { - if (dateRange == null) { - return REALTIME_TASK_TYPES; - } else { - if (resetLatestTaskStateFlag) { - // return all task types include HC entity task to make sure we can reset all tasks latest flag - return ALL_HISTORICAL_TASK_TYPES; - } else { - return HISTORICAL_DETECTOR_TASK_TYPES; - } - } - } - - } -} diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index 9679c1ce1..e1e86e1ec 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -122,8 +122,6 @@ public void setUp() throws Exception { clientUtil = new SecurityClientUtil(nodeStateManager, settings); transportService = mock(TransportService.class); - // channel = mock(ActionListener.class); - anomalyDetectionIndices = mock(ADIndexManagement.class); when(anomalyDetectionIndices.doesConfigIndexExist()).thenReturn(true); diff --git a/src/test/java/org/opensearch/ad/e2e/AbstractSyntheticDataTest.java b/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java similarity index 57% rename from src/test/java/org/opensearch/ad/e2e/AbstractSyntheticDataTest.java rename to src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java index 5cfadcd1c..0adfb7c0f 100644 --- a/src/test/java/org/opensearch/ad/e2e/AbstractSyntheticDataTest.java +++ b/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java @@ -9,14 +9,13 @@ * GitHub history for details. */ -package org.opensearch.ad.e2e; +package org.opensearch.ad; import static org.opensearch.timeseries.TestHelpers.toHttpEntity; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.io.InputStreamReader; import java.nio.charset.Charset; import java.time.Instant; import java.util.ArrayList; @@ -29,16 +28,10 @@ import org.apache.hc.core5.http.message.BasicHeader; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Logger; -import org.opensearch.ad.ODFERestTestCase; import org.opensearch.client.Request; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.Response; import org.opensearch.client.RestClient; -import org.opensearch.client.WarningsHandler; -import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.timeseries.AbstractSyntheticDataTest; import org.opensearch.timeseries.TestHelpers; -import org.opensearch.timeseries.settings.TimeSeriesSettings; import com.google.common.collect.ImmutableList; import com.google.gson.JsonArray; @@ -46,62 +39,11 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; -public class AbstractSyntheticDataTest extends ODFERestTestCase { - protected static final Logger LOG = (Logger) LogManager.getLogger(AbstractSyntheticDataTest.class); +public class AbstractADSyntheticDataTest extends AbstractSyntheticDataTest { + public static final Logger LOG = (Logger) LogManager.getLogger(AbstractADSyntheticDataTest.class); private static int batchSize = 1000; - /** - * In real time AD, we mute a node for a detector if that node keeps returning - * ResourceNotFoundException (5 times in a row). This is a problem for batch mode - * testing as we issue a large amount of requests quickly. Due to the speed, we - * won't be able to finish cold start before the ResourceNotFoundException mutes - * a node. Since our test case has only one node, there is no other nodes to fall - * back on. Here we disable such fault tolerance by setting max retries before - * muting to a large number and the actual wait time during muting to 0. - * - * @throws IOException when failing to create http request body - */ - protected void disableResourceNotFoundFaultTolerence() throws IOException { - XContentBuilder settingCommand = JsonXContent.contentBuilder(); - - settingCommand.startObject(); - settingCommand.startObject("persistent"); - settingCommand.field(TimeSeriesSettings.MAX_RETRY_FOR_UNRESPONSIVE_NODE.getKey(), 100_000); - settingCommand.field(TimeSeriesSettings.BACKOFF_MINUTES.getKey(), 0); - settingCommand.endObject(); - settingCommand.endObject(); - Request request = new Request("PUT", "/_cluster/settings"); - request.setJsonEntity(settingCommand.toString()); - - adminClient().performRequest(request); - } - - protected List getData(String datasetFileName) throws Exception { - JsonArray jsonArray = JsonParser - .parseReader(new FileReader(new File(getClass().getResource(datasetFileName).toURI()), Charset.defaultCharset())) - .getAsJsonArray(); - List list = new ArrayList<>(jsonArray.size()); - jsonArray.iterator().forEachRemaining(i -> list.add(i.getAsJsonObject())); - return list; - } - - protected JsonArray getHits(RestClient client, Request request) throws IOException { - Response response = client.performRequest(request); - return parseHits(response); - } - - protected JsonArray parseHits(Response response) throws IOException { - JsonObject json = JsonParser - .parseReader(new InputStreamReader(response.getEntity().getContent(), Charset.defaultCharset())) - .getAsJsonObject(); - JsonObject hits = json.getAsJsonObject("hits"); - if (hits == null) { - return null; - } - return hits.getAsJsonArray("hits"); - } - protected void runDetectionResult(String detectorId, Instant begin, Instant end, RestClient client, int entitySize) throws IOException, InterruptedException { // trigger run in current interval @@ -117,47 +59,48 @@ protected void runDetectionResult(String detectorId, Instant begin, Instant end, Thread.sleep(50 * entitySize); } - protected List getAnomalyResult(String detectorId, Instant end, int entitySize, RestClient client) { - try { - Request request = new Request("POST", "/_plugins/_anomaly_detection/detectors/results/_search"); - - String jsonTemplate = "{\n" - + " \"query\": {\n" - + " \"bool\": {\n" - + " \"filter\": [\n" - + " {\n" - + " \"term\": {\n" - + " \"detector_id\": \"%s\"\n" - + " }\n" - + " },\n" - + " {\n" - + " \"range\": {\n" - + " \"anomaly_grade\": {\n" - + " \"gte\": 0\n" - + " }\n" - + " }\n" - + " },\n" - + " {\n" - + " \"range\": {\n" - + " \"data_end_time\": {\n" - + " \"gte\": %d,\n" - + " \"lte\": %d\n" - + " }\n" - + " }\n" - + " }\n" - + " ]\n" - + " }\n" - + " }\n" - + "}"; - - long dateEndTime = end.toEpochMilli(); - String formattedJson = String.format(Locale.ROOT, jsonTemplate, detectorId, dateEndTime, dateEndTime); - request.setJsonEntity(formattedJson); - - // wait until results are available - // max wait for 60_000 milliseconds - int maxWaitCycles = 30; - do { + protected List getAnomalyResult(String detectorId, Instant end, int entitySize, RestClient client) + throws InterruptedException { + Request request = new Request("POST", "/_plugins/_anomaly_detection/detectors/results/_search"); + + String jsonTemplate = "{\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"filter\": [\n" + + " {\n" + + " \"term\": {\n" + + " \"detector_id\": \"%s\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"range\": {\n" + + " \"anomaly_grade\": {\n" + + " \"gte\": 0\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"range\": {\n" + + " \"data_end_time\": {\n" + + " \"gte\": %d,\n" + + " \"lte\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + "}"; + + long dateEndTime = end.toEpochMilli(); + String formattedJson = String.format(Locale.ROOT, jsonTemplate, detectorId, dateEndTime, dateEndTime); + request.setJsonEntity(formattedJson); + + // wait until results are available + // max wait for 60_000 milliseconds + int maxWaitCycles = 30; + do { + try { JsonArray hits = getHits(client, request); if (hits != null && hits.size() == entitySize) { assertTrue("empty response", hits != null); @@ -170,16 +113,34 @@ protected List getAnomalyResult(String detectorId, Instant end, int return res; } else { - LOG.info("wait for result, previous result: {}", hits); + LOG + .info( + "wait for result, previous result: {}, size: {}, eval result {}, expected {}", + hits, + hits.size(), + hits != null && hits.size() == entitySize, + entitySize + ); client.performRequest(new Request("POST", String.format(Locale.ROOT, "/%s/_refresh", ".opendistro-anomaly-results*"))); } Thread.sleep(2_000 * entitySize); - } while (maxWaitCycles-- >= 0); + } catch (Exception e) { + LOG.warn("Exception while waiting for result", e); + Thread.sleep(2_000 * entitySize); + } + } while (maxWaitCycles-- >= 0); - return new ArrayList<>(); + // leave some debug information before returning empty + try { + String matchAll = "{\n" + " \"size\": 1000,\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " }\n" + "}"; + request.setJsonEntity(matchAll); + JsonArray hits = getHits(client, request); + LOG.info("match all result: {}", hits); } catch (Exception e) { - throw new RuntimeException(e); + LOG.warn("Exception while waiting for match all result", e); } + + return new ArrayList<>(); } protected double getAnomalyGrade(JsonObject source) { @@ -205,36 +166,6 @@ protected Instant getAnomalyTime(JsonObject source, Instant defaultVal) { return defaultVal; } - protected void createIndex(String datasetName, RestClient client, String mapping) throws IOException, InterruptedException { - Request request = new Request("PUT", datasetName); - request.setJsonEntity(mapping); - setWarningHandler(request, false); - client.performRequest(request); - Thread.sleep(1_000); - } - - protected void bulkIndexTrainData(String datasetName, List data, int trainTestSplit, RestClient client, String mapping) - throws Exception { - createIndex(datasetName, client, mapping); - - StringBuilder bulkRequestBuilder = new StringBuilder(); - for (int i = 0; i < trainTestSplit; i++) { - bulkRequestBuilder.append("{ \"index\" : { \"_index\" : \"" + datasetName + "\", \"_id\" : \"" + i + "\" } }\n"); - bulkRequestBuilder.append(data.get(i).toString()).append("\n"); - } - TestHelpers - .makeRequest( - client, - "POST", - "_bulk?refresh=true", - null, - toHttpEntity(bulkRequestBuilder.toString()), - ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) - ); - Thread.sleep(1_000); - waitAllSyncheticDataIngested(trainTestSplit, datasetName, client); - } - protected String createDetector(RestClient client, String detectorJson) throws Exception { Request request = new Request("POST", "/_plugins/_anomaly_detection/detectors/"); @@ -245,50 +176,6 @@ protected String createDetector(RestClient client, String detectorJson) throws E return detectorId; } - @Override - protected void waitAllSyncheticDataIngested(int expectedSize, String datasetName, RestClient client) throws Exception { - int maxWaitCycles = 3; - do { - Request request = new Request("POST", String.format(Locale.ROOT, "/%s/_search", datasetName)); - request - .setJsonEntity( - String - .format( - Locale.ROOT, - "{\"query\": {" - + " \"match_all\": {}" - + " }," - + " \"size\": 1," - + " \"sort\": [" - + " {" - + " \"timestamp\": {" - + " \"order\": \"desc\"" - + " }" - + " }" - + " ]}" - ) - ); - // Make sure all of the test data has been ingested - JsonArray hits = getHits(client, request); - LOG.info("Latest synthetic data:" + hits); - if (hits != null - && hits.size() == 1 - && expectedSize - 1 == hits.get(0).getAsJsonObject().getAsJsonPrimitive("_id").getAsLong()) { - break; - } else { - request = new Request("POST", String.format(Locale.ROOT, "/%s/_refresh", datasetName)); - client.performRequest(request); - } - Thread.sleep(1_000); - } while (maxWaitCycles-- >= 0); - } - - protected void setWarningHandler(Request request, boolean strictDeprecationMode) { - RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); - options.setWarningsHandler(strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE); - request.setOptions(options.build()); - } - protected void startDetector(String detectorId, RestClient client) throws Exception { Request request = new Request("POST", String.format(Locale.ROOT, "/_plugins/_anomaly_detection/detectors/%s/_start", detectorId)); @@ -359,7 +246,7 @@ protected void waitForInitDetector(String detectorId, RestClient client) throws * @param detectorId Detector Id * @param client OpenSearch Client * @param end date end time of the most recent detection period - * @param entitySize the number of entities + * @param entitySize the number of entity results to wait for * @throws Exception when failing to query/indexing from/to OpenSearch */ protected void simulateWaitForInitDetector(String detectorId, RestClient client, Instant end, int entitySize) throws Exception { @@ -381,16 +268,18 @@ protected void simulateWaitForInitDetector(String detectorId, RestClient client, assertTrue("time out while waiting for initing detector", duration <= 60_000); } - protected void bulkIndexData(List data, String datasetName, RestClient client, String mapping) throws Exception { + protected void bulkIndexData(List data, String datasetName, RestClient client, String mapping, int ingestDataSize) + throws Exception { createIndex(datasetName, client, mapping); StringBuilder bulkRequestBuilder = new StringBuilder(); LOG.info("data size {}", data.size()); int count = 0; - for (int i = 0; i < data.size(); i++) { + int pickedIngestSize = Math.min(ingestDataSize, data.size()); + for (int i = 0; i < pickedIngestSize; i++) { bulkRequestBuilder.append("{ \"index\" : { \"_index\" : \"" + datasetName + "\", \"_id\" : \"" + i + "\" } }\n"); bulkRequestBuilder.append(data.get(i).toString()).append("\n"); count++; - if (count >= batchSize || i == data.size() - 1) { + if (count >= batchSize || i == pickedIngestSize - 1) { count = 0; TestHelpers .makeRequest( @@ -433,4 +322,13 @@ protected int isAnomaly(Instant time, List> labels) { } return -1; } + + protected List getData(String datasetFileName) throws Exception { + JsonArray jsonArray = JsonParser + .parseReader(new FileReader(new File(getClass().getResource(datasetFileName).toURI()), Charset.defaultCharset())) + .getAsJsonArray(); + List list = new ArrayList<>(jsonArray.size()); + jsonArray.iterator().forEachRemaining(i -> list.add(i.getAsJsonObject())); + return list; + } } diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java index 77ed1226e..fe9776f11 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java @@ -48,7 +48,9 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; @@ -65,7 +67,12 @@ import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.AnomalyResultAction; import org.opensearch.ad.transport.AnomalyResultResponse; +import org.opensearch.client.AdminClient; import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -97,6 +104,7 @@ import org.opensearch.timeseries.common.exception.EndRunException; import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.constant.CommonName; +import org.opensearch.timeseries.function.ExecutorFunction; import org.opensearch.timeseries.model.FeatureData; import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.model.Job; @@ -163,6 +171,8 @@ public class AnomalyDetectorJobRunnerTests extends AbstractTimeSeriesTest { private ADIndexManagement anomalyDetectionIndices; + private Settings settings; + @BeforeClass public static void setUpBeforeClass() { setUpThreadPool(AnomalyDetectorJobRunnerTests.class.getSimpleName()); @@ -198,7 +208,7 @@ public void setup() throws Exception { adJobProcessor.setTaskManager(adTaskManager); - Settings settings = Settings + settings = Settings .builder() .put("plugins.anomaly_detection.max_retry_for_backoff", 2) .put("plugins.anomaly_detection.backoff_initial_delay", TimeValue.timeValueMillis(1)) @@ -861,4 +871,156 @@ public void testMarkResultIndexQueried() throws IOException { assertEquals(TimeSeriesSettings.NUM_MIN_SAMPLES, totalUpdates.getValue().longValue()); assertEquals(true, adTaskCacheManager.hasQueriedResultIndex(detector.getId())); } + + public void testValidateCustomResult() throws IOException { + String resultIndex = ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "index"; + when(jobParameter.getCustomResultIndexOrAlias()).thenReturn(resultIndex); + doAnswer(invocation -> { + ExecutorFunction function = invocation.getArgument(4); + function.execute(); + return null; + }).when(anomalyDetectionIndices).validateCustomIndexForBackendJob(any(), any(), any(), any(), any(ExecutorFunction.class), any()); + LockModel lock = new LockModel(CommonName.JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + Instant executionStartTime = confirmInitializedSetup(); + + adJobProcessor.runJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + verify(client, times(1)).execute(eq(AnomalyResultAction.INSTANCE), any(), any()); + } + + enum CreateIndexMode { + NOT_ACKED, + RESOUCE_EXISTS, + RUNTIME_EXCEPTION + } + + private void setUpValidateCustomResultIndex(CreateIndexMode mode) throws IOException { + String resultIndexAlias = ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "index"; + when(jobParameter.getCustomResultIndexOrAlias()).thenReturn(resultIndexAlias); + + ClusterName clusterName = new ClusterName("test"); + ClusterState clusterState = ClusterState.builder(clusterName).metadata(Metadata.builder().build()).build(); + when(clusterService.state()).thenReturn(clusterState); + ClusterSettings clusterSettings = new ClusterSettings( + Settings.EMPTY, + Collections + .unmodifiableSet( + new HashSet<>( + Arrays + .asList( + AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD, + AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD, + AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD, + AnomalyDetectorSettings.AD_MAX_PRIMARY_SHARDS + ) + ) + ) + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + AdminClient adminClient = mock(AdminClient.class); + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + if (CreateIndexMode.NOT_ACKED == mode) { + // not acked + listener.onResponse(new CreateIndexResponse(false, false, resultIndexAlias)); + } else if (CreateIndexMode.RESOUCE_EXISTS == mode) { + listener.onFailure(new ResourceAlreadyExistsException("index created")); + } else { + listener.onFailure(new RuntimeException()); + } + + return null; + }).when(indicesAdminClient).create(any(), any(ActionListener.class)); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); + + anomalyDetectionIndices = new ADIndexManagement( + client, + clusterService, + threadPool, + settings, + nodeFilter, + 1, + NamedXContentRegistry.EMPTY + ); + adJobProcessor.setIndexManagement(anomalyDetectionIndices); + } + + public void testNotAckedValidCustomResultCreation() throws IOException { + setUpValidateCustomResultIndex(CreateIndexMode.NOT_ACKED); + LockModel lock = new LockModel(CommonName.JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + Instant executionStartTime = confirmInitializedSetup(); + + assertEquals( + "Expect 0 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 0, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + adJobProcessor.runJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + assertEquals( + "Expect 1 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 1, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + } + + public void testCustomResultExistsWhileCreation() throws IOException { + setUpValidateCustomResultIndex(CreateIndexMode.RESOUCE_EXISTS); + LockModel lock = new LockModel(CommonName.JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + Instant executionStartTime = confirmInitializedSetup(); + + assertEquals( + "Expect 0 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 0, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + adJobProcessor.runJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + assertEquals( + "Expect 0 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 0, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + verify(client, times(1)).index(any(), any()); + verify(client, times(1)).delete(any(), any()); + } + + public void testUnexpectedWhileCreation() throws IOException { + setUpValidateCustomResultIndex(CreateIndexMode.RUNTIME_EXCEPTION); + LockModel lock = new LockModel(CommonName.JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + Instant executionStartTime = confirmInitializedSetup(); + + assertEquals( + "Expect 0 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 0, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + adJobProcessor.runJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + assertEquals( + "Expect 1 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 1, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + verify(client, times(0)).index(any(), any()); + verify(client, times(0)).delete(any(), any()); + } } diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java b/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java index ef2047466..dc0ea5b4f 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java @@ -15,44 +15,32 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Locale; import java.util.Map; -import javax.management.MBeanServerInvocationHandler; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; - import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Logger; -import org.junit.AfterClass; import org.opensearch.ad.model.ADTask; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorExecutionInput; -import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContent; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.test.rest.OpenSearchRestTestCase; +import org.opensearch.timeseries.ODFERestTestCase; import org.opensearch.timeseries.TestHelpers; import org.opensearch.timeseries.model.DateRange; import org.opensearch.timeseries.model.Job; @@ -63,7 +51,7 @@ import com.google.gson.JsonArray; public abstract class AnomalyDetectorRestTestCase extends ODFERestTestCase { - private static final Logger LOG = (Logger) LogManager.getLogger(AnomalyDetectorRestTestCase.class); + public static final Logger LOG = (Logger) LogManager.getLogger(AnomalyDetectorRestTestCase.class); public static final int MAX_RETRY_TIMES = 10; @@ -337,21 +325,6 @@ protected final XContentParser createAdParser(XContent xContent, InputStream dat return xContent.createParser(TestHelpers.xContentRegistry(), LoggingDeprecationHandler.INSTANCE, data); } - public void updateClusterSettings(String settingKey, Object value) throws Exception { - XContentBuilder builder = XContentFactory - .jsonBuilder() - .startObject() - .startObject("persistent") - .field(settingKey, value) - .endObject() - .endObject(); - Request request = new Request("PUT", "_cluster/settings"); - request.setJsonEntity(builder.toString()); - Response response = client().performRequest(request); - assertEquals(RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); - Thread.sleep(2000); // sleep some time to resolve flaky test - } - public Response getDetectorProfile(String detectorId, boolean all, String customizedProfile, RestClient client) throws IOException { return TestHelpers .makeRequest( @@ -683,47 +656,4 @@ protected Response validateAnomalyDetector(AnomalyDetector detector, RestClient null ); } - - /** - * We need to be able to dump the jacoco coverage before cluster is shut down. - * The new internal testing framework removed some of the gradle tasks we were listening to - * to choose a good time to do it. This will dump the executionData to file after each test. - * TODO: This is also currently just overwriting integTest.exec with the updated execData without - * resetting after writing each time. This can be improved to either write an exec file per test - * or by letting jacoco append to the file - */ - public interface IProxy { - byte[] getExecutionData(boolean reset); - - void dump(boolean reset); - - void reset(); - } - - @AfterClass - public static void dumpCoverage() throws IOException, MalformedObjectNameException { - // jacoco.dir is set in esplugin-coverage.gradle, if it doesn't exist we don't - // want to collect coverage so we can return early - String jacocoBuildPath = System.getProperty("jacoco.dir"); - if (org.opensearch.core.common.Strings.isNullOrEmpty(jacocoBuildPath)) { - return; - } - - String serverUrl = System.getProperty("jmx.serviceUrl"); - if (serverUrl == null) { - LOG.error("Failed to dump coverage because JMX Service URL is null"); - throw new IllegalArgumentException("JMX Service URL is null"); - } - - try (JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(serverUrl))) { - IProxy proxy = MBeanServerInvocationHandler - .newProxyInstance(connector.getMBeanServerConnection(), new ObjectName("org.jacoco:type=Runtime"), IProxy.class, false); - - Path path = Path.of(Path.of(jacocoBuildPath, "integTest.exec").toFile().getCanonicalPath()); - Files.write(path, proxy.getExecutionData(false)); - } catch (Exception ex) { - LOG.error("Failed to dump coverage: ", ex); - throw new RuntimeException("Failed to dump coverage: " + ex); - } - } } diff --git a/src/test/java/org/opensearch/ad/HistoricalAnalysisRestTestCase.java b/src/test/java/org/opensearch/ad/HistoricalAnalysisRestTestCase.java index 588598400..d71b3370b 100644 --- a/src/test/java/org/opensearch/ad/HistoricalAnalysisRestTestCase.java +++ b/src/test/java/org/opensearch/ad/HistoricalAnalysisRestTestCase.java @@ -28,7 +28,6 @@ import org.apache.hc.core5.http.message.BasicHeader; import org.junit.Before; import org.opensearch.ad.mock.model.MockSimpleLog; -import org.opensearch.ad.model.ADTask; import org.opensearch.ad.model.ADTaskProfile; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.client.Response; @@ -37,7 +36,6 @@ import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.aggregations.AggregationBuilder; -import org.opensearch.timeseries.TaskProfile; import org.opensearch.timeseries.TestHelpers; import org.opensearch.timeseries.model.DateRange; import org.opensearch.timeseries.model.Feature; @@ -253,7 +251,7 @@ protected List waitUntilTaskDone(String detectorId) throws InterruptedEx protected List waitUntilTaskReachState(String detectorId, Set targetStates) throws InterruptedException { List results = new ArrayList<>(); int i = 0; - TaskProfile adTaskProfile = null; + ADTaskProfile adTaskProfile = null; // Increase retryTimes if some task can't reach done state while ((adTaskProfile == null || !targetStates.contains(adTaskProfile.getTask().getState())) && i < MAX_RETRY_TIMES) { try { @@ -270,4 +268,25 @@ protected List waitUntilTaskReachState(String detectorId, Set ta results.add(i); return results; } + + protected List waitUntilEntityCountAvailable(String detectorId) throws InterruptedException { + List results = new ArrayList<>(); + int i = 0; + ADTaskProfile adTaskProfile = null; + // Increase retryTimes if some task can't reach done state + while ((adTaskProfile == null || adTaskProfile.getTotalEntitiesCount() == null) && i < MAX_RETRY_TIMES) { + try { + adTaskProfile = getADTaskProfile(detectorId); + } catch (Exception e) { + logger.error("failed to get ADTaskProfile", e); + } finally { + Thread.sleep(1000); + } + i++; + } + assertNotNull(adTaskProfile); + results.add(adTaskProfile); + results.add(i); + return results; + } } diff --git a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java index 1e26f4ac6..7c09709f0 100644 --- a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java +++ b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -45,6 +46,7 @@ import org.apache.logging.log4j.Logger; import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.opensearch.ad.ml.ADCheckpointDao; import org.opensearch.ad.ml.ADModelManager; import org.opensearch.ad.model.AnomalyDetector; @@ -750,4 +752,40 @@ public void testRemoveEntityModel() { verify(checkpoint, times(1)).deleteModelCheckpoint(eq(entity2.getModelId(detectorId).get()), any()); verify(checkpointWriteQueue, never()).write(any(), anyBoolean(), any()); } + + public void testGetTotalUpdates_orElseGetBranch() { + // Mock the ModelState and its methods + ModelState mockModelState = Mockito.mock(ModelState.class); + + // Mock the getModel method to return an empty Optional + when(mockModelState.getModel()).thenReturn(Optional.empty()); + + // Mock the getSamples method to return a list with a specific size + Deque mockSamples = Mockito.mock(Deque.class); + when(mockSamples.size()).thenReturn(5); + when(mockModelState.getSamples()).thenReturn(mockSamples); + + // Call the method under test + long result = entityCache.getTotalUpdates(mockModelState); + + // Assert that the result is the size of the samples + assertEquals(5L, result); + } + + public void testGetTotalUpdates_orElseGetBranchWithNullSamples() { + // Mock the ModelState and its methods + ModelState mockModelState = Mockito.mock(ModelState.class); + + // Mock the getModel method to return an empty Optional + when(mockModelState.getModel()).thenReturn(Optional.empty()); + + // Mock the getSamples method to return null + when(mockModelState.getSamples()).thenReturn(null); + + // Call the method under test + long result = entityCache.getTotalUpdates(mockModelState); + + // Assert that the result is 0L + assertEquals(0L, result); + } } diff --git a/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java b/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java new file mode 100644 index 000000000..a5d6c8686 --- /dev/null +++ b/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java @@ -0,0 +1,215 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.e2e; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Locale; +import java.util.TreeMap; + +import org.opensearch.ad.AbstractADSyntheticDataTest; +import org.opensearch.client.RestClient; + +import com.google.gson.JsonObject; + +public class AbstractRuleTestCase extends AbstractADSyntheticDataTest { + + protected static class TrainResult { + String detectorId; + List data; + // actual index of training data. As we have multiple entities, + // trainTestSplit means how many groups of entities are used for training. + // rawDataTrainTestSplit is the actual index of training data. + int rawDataTrainTestSplit; + Duration windowDelay; + + public TrainResult(String detectorId, List data, int rawDataTrainTestSplit, Duration windowDelay) { + this.detectorId = detectorId; + this.data = data; + this.rawDataTrainTestSplit = rawDataTrainTestSplit; + this.windowDelay = windowDelay; + } + } + + /** + * Ingest all of the data in file datasetName + * + * @param datasetName data set file name + * @param intervalMinutes detector interval + * @param numberOfEntities number of entities in the file + * @param trainTestSplit used to calculate train start time + * @param useDateNanos whether to use nano date type in detector timestamp + * @return TrainResult for the following method calls + * @throws Exception failing to ingest data + */ + protected TrainResult ingestTrainData( + String datasetName, + int intervalMinutes, + int numberOfEntities, + int trainTestSplit, + boolean useDateNanos + ) throws Exception { + return ingestTrainData(datasetName, intervalMinutes, numberOfEntities, trainTestSplit, useDateNanos, -1); + } + + protected TrainResult ingestTrainData( + String datasetName, + int intervalMinutes, + int numberOfEntities, + int trainTestSplit, + boolean useDateNanos, + int ingestDataSize + ) throws Exception { + String dataFileName = String.format(Locale.ROOT, "data/%s.data", datasetName); + + List data = getData(dataFileName); + + RestClient client = client(); + String categoricalField = "componentName"; + String mapping = String + .format( + Locale.ROOT, + "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\":" + + (useDateNanos ? "\"date_nanos\"" : "\"date\"") + + "}," + + " \"transform._doc_count\": { \"type\": \"integer\" }," + + "\"%s\": { \"type\": \"keyword\"} } } }", + categoricalField + ); + + if (ingestDataSize <= 0) { + bulkIndexData(data, datasetName, client, mapping, data.size()); + } else { + bulkIndexData(data, datasetName, client, mapping, ingestDataSize); + } + + // we need to account that interval can have multiple entity record + int rawDataTrainTestSplit = trainTestSplit * numberOfEntities; + String trainTimeStr = data.get(rawDataTrainTestSplit - 1).get("timestamp").getAsString(); + Instant trainTime = Instant.ofEpochMilli(Long.parseLong(trainTimeStr)); + /* + * The {@code CompositeRetriever.PageIterator.hasNext()} method checks if a request is expired + * relative to the current system time. This method is designed to ensure that the execution time + * is set to either the current time or a future time to prevent premature expirations in our tests. + * + * Also, AD accepts windowDelay in the unit of minutes. Thus, we need to convert the delay in minutes. This will + * make it easier to search for results based on data end time. Otherwise, real data time and the converted + * data time from request time. + * Assume x = real data time. y= real window delay. y'= window delay in minutes. If y and y' are different, + * x + y - y' != x. + */ + long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes(); + Duration windowDelay = Duration.ofMinutes(windowDelayMinutes); + + String detector = String + .format( + Locale.ROOT, + "{ \"name\": \"test\", \"description\": \"test\", \"time_field\": \"timestamp\"" + + ", \"indices\": [\"%s\"], \"feature_attributes\": [{ \"feature_name\": \"feature 1\", \"feature_enabled\": " + + "\"true\", \"aggregation_query\": { \"Feature1\": { \"sum\": { \"field\": \"transform._doc_count\" } } } }" + + "], \"detection_interval\": { \"period\": { \"interval\": %d, \"unit\": \"Minutes\" } }, " + + "\"category_field\": [\"%s\"], " + + "\"window_delay\": { \"period\": {\"interval\": %d, \"unit\": \"MINUTES\"}}," + + "\"history\": %d," + + "\"schema_version\": 0," + + "\"rules\": [{\"action\": \"ignore_anomaly\", \"conditions\": [{\"feature_name\": \"feature 1\", \"threshold_type\": \"actual_over_expected_ratio\", \"operator\": \"lte\", \"value\": 0.3}, " + + "{\"feature_name\": \"feature 1\", \"threshold_type\": \"expected_over_actual_ratio\", \"operator\": \"lte\", \"value\": 0.3}" + + "]}]" + + "}", + datasetName, + intervalMinutes, + categoricalField, + windowDelayMinutes, + trainTestSplit - 1 + ); + String detectorId = createDetector(client, detector); + LOG.info("Created detector {}", detectorId); + + Instant executeBegin = dataToExecutionTime(trainTime, windowDelay); + Instant executeEnd = executeBegin.plus(intervalMinutes, ChronoUnit.MINUTES); + Instant dataEnd = trainTime.plus(intervalMinutes, ChronoUnit.MINUTES); + + LOG.info("start detector {}", detectorId); + simulateStartDetector(detectorId, executeBegin, executeEnd, client, numberOfEntities); + int resultsToWait = findTrainTimeEntities(rawDataTrainTestSplit - 1, data); + LOG.info("wait for initting detector {}. {} results are expected.", detectorId, resultsToWait); + simulateWaitForInitDetector(detectorId, client, dataEnd, resultsToWait); + + return new TrainResult(detectorId, data, rawDataTrainTestSplit, windowDelay); + } + + /** + * Assume the data is sorted in time. The method look up and below startIndex + * and return how many timestamps equal to timestampStr. + * @param startIndex where to start look for timestamp + * @return how many timestamps equal to timestampStr + */ + protected int findTrainTimeEntities(int startIndex, List data) { + String timestampStr = data.get(startIndex).get("timestamp").getAsString(); + int count = 1; + for (int i = startIndex - 1; i >= 0; i--) { + String trainTimeStr = data.get(i).get("timestamp").getAsString(); + if (trainTimeStr.equals(timestampStr)) { + count++; + } else { + break; + } + } + for (int i = startIndex + 1; i < data.size(); i++) { + String trainTimeStr = data.get(i).get("timestamp").getAsString(); + if (trainTimeStr.equals(timestampStr)) { + count++; + } else { + break; + } + } + return count; + } + + protected Instant dataToExecutionTime(Instant instant, Duration windowDelay) { + return instant.plus(windowDelay); + } + + /** + * + * @param testData current data to score + * @param entityMap a map to record the number of times we have seen a timestamp. Used to detect missing values. + * @param windowDelay ingestion delay + * @param intervalMinutes detector interval + * @param detectorId detector Id + * @param client RestFul client + * @param numberOfEntities the number of entities. + * @return whether we erred out. + */ + protected boolean scoreOneResult( + JsonObject testData, + TreeMap entityMap, + Duration windowDelay, + int intervalMinutes, + String detectorId, + RestClient client, + int numberOfEntities + ) { + String beginTimeStampAsString = testData.get("timestamp").getAsString(); + Integer newCount = entityMap.compute(beginTimeStampAsString, (key, oldValue) -> (oldValue == null) ? 1 : oldValue + 1); + if (newCount > 1) { + // we have seen this timestamp before. Without this line, we will get rcf IllegalArgumentException about out of order tuples + return false; + } + Instant begin = dataToExecutionTime(Instant.ofEpochMilli(Long.parseLong(beginTimeStampAsString)), windowDelay); + Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); + try { + runDetectionResult(detectorId, begin, end, client, numberOfEntities); + } catch (Exception e) { + LOG.error("failed to run detection result", e); + return true; + } + return false; + } + +} diff --git a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java index fd1aa5cc9..09aa9d418 100644 --- a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java +++ b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Logger; +import org.opensearch.ad.AbstractADSyntheticDataTest; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; @@ -39,7 +40,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; -public class DetectionResultEvalutationIT extends AbstractSyntheticDataTest { +public class DetectionResultEvalutationIT extends AbstractADSyntheticDataTest { protected static final Logger LOG = (Logger) LogManager.getLogger(DetectionResultEvalutationIT.class); public void testValidationIntervalRecommendation() throws Exception { diff --git a/src/test/java/org/opensearch/ad/e2e/RuleIT.java b/src/test/java/org/opensearch/ad/e2e/RuleIT.java new file mode 100644 index 000000000..92f733f01 --- /dev/null +++ b/src/test/java/org/opensearch/ad/e2e/RuleIT.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.e2e; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; + +import com.google.gson.JsonObject; + +public class RuleIT extends AbstractRuleTestCase { + public void testRuleWithDateNanos() throws Exception { + // TODO: this test case will run for a much longer time and timeout with security enabled + if (!isHttps()) { + disableResourceNotFoundFaultTolerence(); + + String datasetName = "rule"; + int intervalMinutes = 10; + int numberOfEntities = 2; + int trainTestSplit = 100; + + TrainResult trainResult = ingestTrainData( + datasetName, + intervalMinutes, + numberOfEntities, + trainTestSplit, + true, + // ingest just enough for finish the test + (trainTestSplit + 1) * numberOfEntities + ); + + List data = trainResult.data; + LOG.info("scoring data at {}", data.get(trainResult.rawDataTrainTestSplit).get("timestamp").getAsString()); + + // one run call will evaluate all entities within an interval + int numberEntitiesScored = findTrainTimeEntities(trainResult.rawDataTrainTestSplit, data); + // an entity might have missing values (e.g., at timestamp 1694713200000). + // Use a map to record the number of times we have seen them. + // data start time -> the number of entities + TreeMap entityMap = new TreeMap<>(); + // rawDataTrainTestSplit is the actual index of next test data. + assertFalse( + scoreOneResult( + data.get(trainResult.rawDataTrainTestSplit), + entityMap, + trainResult.windowDelay, + intervalMinutes, + trainResult.detectorId, + client(), + numberEntitiesScored + ) + ); + + assertEquals("expected 1 timestamp but got " + entityMap.size(), 1, entityMap.size()); + + for (Map.Entry entry : entityMap.entrySet()) { + String beginTimeStampAsString = entry.getKey(); + Instant begin = Instant.ofEpochMilli(Long.parseLong(beginTimeStampAsString)); + Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); + try { + List sourceList = getAnomalyResult(trainResult.detectorId, end, numberEntitiesScored, client()); + + assertTrue( + String + .format( + Locale.ROOT, + "the number of results is %d at %s, expected %d ", + sourceList.size(), + beginTimeStampAsString, + numberEntitiesScored + ), + sourceList.size() == numberEntitiesScored + ); + for (int j = 0; j < numberEntitiesScored; j++) { + JsonObject source = sourceList.get(j); + double anomalyGrade = getAnomalyGrade(source); + assertTrue("anomalyGrade cannot be negative", anomalyGrade >= 0); + } + } catch (Exception e) { + LOG.error("failed to get detection results", e); + assertTrue(false); + } + } + } + } +} diff --git a/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java b/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java index b8e8cff90..208cfbe48 100644 --- a/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java +++ b/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java @@ -32,8 +32,8 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; -public class RuleModelPerfIT extends AbstractSyntheticDataTest { - private static final Logger LOG = (Logger) LogManager.getLogger(RuleModelPerfIT.class); +public class RuleModelPerfIT extends AbstractRuleTestCase { + static final Logger LOG = (Logger) LogManager.getLogger(RuleModelPerfIT.class); public void testRule() throws Exception { // TODO: this test case will run for a much longer time and timeout with security enabled @@ -50,21 +50,6 @@ public void testRule() throws Exception { } } - public void testRuleWithDateNanos() throws Exception { - // TODO: this test case will run for a much longer time and timeout with security enabled - if (!isHttps()) { - disableResourceNotFoundFaultTolerence(); - // there are 8 entities in the data set. Each one needs 1500 rows as training data. - Map minPrecision = new HashMap<>(); - minPrecision.put("Phoenix", 0.5); - minPrecision.put("Scottsdale", 0.5); - Map minRecall = new HashMap<>(); - minRecall.put("Phoenix", 0.9); - minRecall.put("Scottsdale", 0.6); - verifyRule("rule", 10, minPrecision.size(), 1500, minPrecision, minRecall, 20, true); - } - } - private void verifyTestResults( Triple, Integer, Map>> testResults, Map>> anomalies, @@ -144,96 +129,29 @@ public void verifyRule( int maxError, boolean useDateNanos ) throws Exception { - String dataFileName = String.format(Locale.ROOT, "data/%s.data", datasetName); - String labelFileName = String.format(Locale.ROOT, "data/%s.label", datasetName); - List data = getData(dataFileName); + String labelFileName = String.format(Locale.ROOT, "data/%s.label", datasetName); Map>> anomalies = getAnomalyWindowsMap(labelFileName); - RestClient client = client(); - String categoricalField = "componentName"; - String mapping = String - .format( - Locale.ROOT, - "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\":" - + (useDateNanos ? "\"date_nanos\"" : "\"date\"") - + "}," - + " \"transform._doc_count\": { \"type\": \"integer\" }," - + "\"%s\": { \"type\": \"keyword\"} } } }", - categoricalField - ); - - bulkIndexData(data, datasetName, client, mapping); - - // we need to account that interval can have multiple entity record - int rawDataTrainTestSplit = trainTestSplit * numberOfEntities; - Instant trainTime = Instant.ofEpochMilli(Long.parseLong(data.get(rawDataTrainTestSplit - 1).get("timestamp").getAsString())); - /* - * The {@code CompositeRetriever.PageIterator.hasNext()} method checks if a request is expired - * relative to the current system time. This method is designed to ensure that the execution time - * is set to either the current time or a future time to prevent premature expirations in our tests. - * - * Also, AD accepts windowDelay in the unit of minutes. Thus, we need to convert the delay in minutes. This will - * make it easier to search for results based on data end time. Otherwise, real data time and the converted - * data time from request time. - * Assume x = real data time. y= real window delay. y'= window delay in minutes. If y and y' are different, - * x + y - y' != x. - */ - long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes(); - Duration windowDelay = Duration.ofMinutes(windowDelayMinutes); - - String detector = String - .format( - Locale.ROOT, - "{ \"name\": \"test\", \"description\": \"test\", \"time_field\": \"timestamp\"" - + ", \"indices\": [\"%s\"], \"feature_attributes\": [{ \"feature_name\": \"feature 1\", \"feature_enabled\": " - + "\"true\", \"aggregation_query\": { \"Feature1\": { \"sum\": { \"field\": \"transform._doc_count\" } } } }" - + "], \"detection_interval\": { \"period\": { \"interval\": %d, \"unit\": \"Minutes\" } }, " - + "\"category_field\": [\"%s\"], " - + "\"window_delay\": { \"period\": {\"interval\": %d, \"unit\": \"MINUTES\"}}," - + "\"history\": %d," - + "\"schema_version\": 0," - + "\"rules\": [{\"action\": \"ignore_anomaly\", \"conditions\": [{\"feature_name\": \"feature 1\", \"threshold_type\": \"actual_over_expected_ratio\", \"operator\": \"lte\", \"value\": 0.3}, " - + "{\"feature_name\": \"feature 1\", \"threshold_type\": \"expected_over_actual_ratio\", \"operator\": \"lte\", \"value\": 0.3}" - + "]}]" - + "}", - datasetName, - intervalMinutes, - categoricalField, - windowDelayMinutes, - trainTestSplit - 1 - ); - String detectorId = createDetector(client, detector); - LOG.info("Created detector {}", detectorId); - - Instant executeBegin = dataToExecutionTime(trainTime, windowDelay); - Instant executeEnd = executeBegin.plus(intervalMinutes, ChronoUnit.MINUTES); - Instant dataEnd = trainTime.plus(intervalMinutes, ChronoUnit.MINUTES); - - simulateStartDetector(detectorId, executeBegin, executeEnd, client, numberOfEntities); - simulateWaitForInitDetector(detectorId, client, dataEnd, numberOfEntities); + TrainResult trainResult = ingestTrainData(datasetName, intervalMinutes, numberOfEntities, trainTestSplit, useDateNanos); Triple, Integer, Map>> results = getTestResults( - detectorId, - data, - rawDataTrainTestSplit, + trainResult.detectorId, + trainResult.data, + trainResult.rawDataTrainTestSplit, intervalMinutes, anomalies, - client, + client(), numberOfEntities, - windowDelay + trainResult.windowDelay ); verifyTestResults(results, anomalies, minPrecision, minRecall, maxError); } - private Instant dataToExecutionTime(Instant instant, Duration windowDelay) { - return instant.plus(windowDelay); - } - private Triple, Integer, Map>> getTestResults( String detectorId, List data, - int trainTestSplit, + int rawTrainTestSplit, int intervalMinutes, Map>> anomalies, RestClient client, @@ -247,20 +165,9 @@ private Triple, Integer, Map>> getTes // Use a map to record the number of times we have seen them. // data start time -> the number of entities TreeMap entityMap = new TreeMap<>(); - for (int i = trainTestSplit; i < data.size(); i++) { - String beginTimeStampAsString = data.get(i).get("timestamp").getAsString(); - Integer newCount = entityMap.compute(beginTimeStampAsString, (key, oldValue) -> (oldValue == null) ? 1 : oldValue + 1); - if (newCount > 1) { - // we have seen this timestamp before. Without this line, we will get rcf IllegalArgumentException about out of order tuples - continue; - } - Instant begin = dataToExecutionTime(Instant.ofEpochMilli(Long.parseLong(beginTimeStampAsString)), windowDelay); - Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); - try { - runDetectionResult(detectorId, begin, end, client, numberOfEntities); - } catch (Exception e) { + for (int i = rawTrainTestSplit; i < data.size(); i++) { + if (scoreOneResult(data.get(i), entityMap, windowDelay, intervalMinutes, detectorId, client, numberOfEntities)) { errors++; - LOG.error("failed to run detection result", e); } } diff --git a/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java b/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java index 873e033f8..75855a1a8 100644 --- a/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java +++ b/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java @@ -31,6 +31,7 @@ import org.apache.hc.core5.http.message.BasicHeader; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Logger; +import org.opensearch.ad.AbstractADSyntheticDataTest; import org.opensearch.client.RestClient; import org.opensearch.timeseries.TestHelpers; @@ -39,7 +40,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; -public class SingleStreamModelPerfIT extends AbstractSyntheticDataTest { +public class SingleStreamModelPerfIT extends AbstractADSyntheticDataTest { protected static final Logger LOG = (Logger) LogManager.getLogger(SingleStreamModelPerfIT.class); public void testDataset() throws Exception { diff --git a/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java b/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java index 2c6ebd6b3..0e1078968 100644 --- a/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java @@ -11,6 +11,7 @@ package org.opensearch.ad.rest; +import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_MODEL_MAX_SIZE_PERCENTAGE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS; @@ -28,6 +29,7 @@ import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.opensearch.ad.HistoricalAnalysisRestTestCase; @@ -59,6 +61,16 @@ public void setUp() throws Exception { updateClusterSettings(MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS.getKey(), 2); updateClusterSettings(BATCH_TASK_PIECE_INTERVAL_SECONDS.getKey(), 5); updateClusterSettings(MAX_BATCH_TASK_PER_NODE.getKey(), 10); + // increase the AD memory percentage. Since enabling jacoco coverage instrumentation, + // the memory is not enough to finish HistoricalAnalysisRestApiIT. + updateClusterSettings(AD_MODEL_MAX_SIZE_PERCENTAGE.getKey(), 0.5); + } + + @After + @Override + public void tearDown() throws Exception { + updateClusterSettings(AD_MODEL_MAX_SIZE_PERCENTAGE.getKey(), 0.1); + super.tearDown(); } public void testHistoricalAnalysisForSingleEntityDetector() throws Exception { @@ -104,7 +116,6 @@ private void checkIfTaskCanFinishCorrectly(String detectorId, String taskId, Set } } - @SuppressWarnings("unchecked") private List startHistoricalAnalysis(int categoryFieldSize) throws Exception { return startHistoricalAnalysis(categoryFieldSize, null); } @@ -123,6 +134,9 @@ private List startHistoricalAnalysis(int categoryFieldSize, String resul if (!TaskState.RUNNING.name().equals(adTaskProfile.getTask().getState())) { adTaskProfile = (ADTaskProfile) waitUntilTaskReachState(detectorId, ImmutableSet.of(TaskState.RUNNING.name())).get(0); } + // if (adTaskProfile.getTotalEntitiesCount() == null) { + // adTaskProfile = (ADTaskProfile) waitUntilEntityCountAvailable(detectorId).get(0); + // } assertEquals((int) Math.pow(categoryFieldDocCount, categoryFieldSize), adTaskProfile.getTotalEntitiesCount().intValue()); assertTrue(adTaskProfile.getPendingEntitiesCount() > 0); assertTrue(adTaskProfile.getRunningEntitiesCount() > 0); @@ -170,8 +184,13 @@ public void testStopHistoricalAnalysis() throws Exception { waitUntilGetTaskProfile(detectorId); // stop historical detector - Response stopDetectorResponse = stopAnomalyDetector(detectorId, client(), false); - assertEquals(RestStatus.OK, TestHelpers.restStatus(stopDetectorResponse)); + try { + Response stopDetectorResponse = stopAnomalyDetector(detectorId, client(), false); + assertEquals(RestStatus.OK, TestHelpers.restStatus(stopDetectorResponse)); + } catch (Exception e) { + // it is possible the tasks has already stopped + assertTrue("expected No running task found but actual is " + e.getMessage(), e.getMessage().contains("No running task found")); + } // get task profile checkIfTaskCanFinishCorrectly(detectorId, taskId, ImmutableSet.of(TaskState.STOPPED.name())); diff --git a/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java b/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java new file mode 100644 index 000000000..a2575a457 --- /dev/null +++ b/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.forecast; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.opensearch.timeseries.AbstractSyntheticDataTest; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.stream.JsonReader; + +public class AbstractForecastSyntheticDataTest extends AbstractSyntheticDataTest { + /** + * Read data from a json array file up to a specified size + * @param datasetFileName data set file name + * @param size the limit of json elements to read + * @return the read JsonObject list + * @throws URISyntaxException when failing to find datasetFileName + * @throws Exception when there is a parsing error. + */ + public static List readJsonArrayWithLimit(String datasetFileName, int limit) throws URISyntaxException { + List jsonObjects = new ArrayList<>(); + + try ( + FileReader fileReader = new FileReader( + new File(AbstractForecastSyntheticDataTest.class.getClassLoader().getResource(datasetFileName).toURI()), + Charset.defaultCharset() + ); + JsonReader jsonReader = new JsonReader(fileReader) + ) { + + Gson gson = new Gson(); + JsonArray jsonArray = gson.fromJson(jsonReader, JsonArray.class); + + for (int i = 0; i < limit && i < jsonArray.size(); i++) { + JsonObject jsonObject = jsonArray.get(i).getAsJsonObject(); + jsonObjects.add(jsonObject); + } + + } catch (IOException e) { + LOG.error("fail to read json array", e); + } + + return jsonObjects; + } +} diff --git a/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java b/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java new file mode 100644 index 000000000..8776ec971 --- /dev/null +++ b/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java @@ -0,0 +1,455 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.forecast.rest; + +import static org.opensearch.timeseries.util.RestHandlerUtils.SUGGEST; + +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.junit.Before; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.client.RestClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.forecast.AbstractForecastSyntheticDataTest; +import org.opensearch.forecast.model.Forecaster; +import org.opensearch.forecast.settings.ForecastEnabledSetting; +import org.opensearch.timeseries.TestHelpers; +import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; +import org.opensearch.timeseries.constant.CommonMessages; +import org.opensearch.timeseries.model.Config; + +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonObject; + +/** + * Test the following Restful API: + * - Suggest + * + */ +public class ForecastRestApiIT extends AbstractForecastSyntheticDataTest { + private static final String SYNTHETIC_DATASET_NAME = "synthetic"; + private static final String RULE_DATASET_NAME = "rule"; + private static final String SUGGEST_INTERVAL_URI; + private static final String SUGGEST_INTERVAL_HORIZON_HISTORY_URI; + + static { + SUGGEST_INTERVAL_URI = String + .format( + Locale.ROOT, + "%s/%s/%s", + TimeSeriesAnalyticsPlugin.FORECAST_FORECASTERS_URI, + SUGGEST, + Forecaster.FORECAST_INTERVAL_FIELD + ); + SUGGEST_INTERVAL_HORIZON_HISTORY_URI = String + .format( + Locale.ROOT, + "%s/%s/%s,%s,%s", + TimeSeriesAnalyticsPlugin.FORECAST_FORECASTERS_URI, + SUGGEST, + Forecaster.FORECAST_INTERVAL_FIELD, + Forecaster.HORIZON_FIELD, + Config.HISTORY_INTERVAL_FIELD + ); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + updateClusterSettings(ForecastEnabledSetting.FORECAST_ENABLED, true); + } + + private static void loadData(String datasetName, int trainTestSplit) throws Exception { + RestClient client = client(); + + String dataFileName = String.format(Locale.ROOT, "org/opensearch/ad/e2e/data/%s.data", datasetName); + + List data = readJsonArrayWithLimit(dataFileName, trainTestSplit); + + String mapping = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\": \"date\"}," + + " \"Feature1\": { \"type\": \"double\" }, \"Feature2\": { \"type\": \"double\" } } } }"; + bulkIndexTrainData(datasetName, data, trainTestSplit, client, mapping); + } + + /** + * Test suggest API. I wrote multiple cases together to save time in loading data. Cannot load data once for + * all tests as client is set up in the instance method of OpenSearchRestTestCase and we need client to + * ingest data. Also, the JUnit 5 feature @BeforeAll is not supported in OpenSearchRestTestCase and the code + * inside @BeforeAll won't be executed even if I upgrade junit from 4 to 5 in build.gradle. + * @throws Exception when loading data + */ + public void testSuggestOneMinute() throws Exception { + loadData(SYNTHETIC_DATASET_NAME, 200); + // case 1: suggest 1 minute interval for a time series with 1 minute cadence + String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24\n" + + "}"; + + String formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Suggest forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + Map suggestions = (Map) ((Map) responseMap.get("interval")).get("period"); + assertEquals(1, (int) suggestions.get("interval")); + assertEquals("Minutes", suggestions.get("unit")); + + // case 2: If you provide forecaster interval in the request body, we will ignore it. + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Detector-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 4,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + responseMap = entityAsMap(response); + suggestions = (Map) ((Map) responseMap.get("interval")).get("period"); + assertEquals(1, (int) suggestions.get("interval")); + assertEquals("Minutes", suggestions.get("unit")); + + // case 3: We also support horizon and history. + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Detector-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"shingle_size\": 5,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 4,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_HORIZON_HISTORY_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + responseMap = entityAsMap(response); + Map intervalSuggestions = (Map) ((Map) responseMap.get("interval")).get("period"); + assertEquals(1, (int) intervalSuggestions.get("interval")); + assertEquals("Minutes", intervalSuggestions.get("unit")); + + int horizonSuggestions = ((Integer) responseMap.get("horizon")); + assertEquals(15, horizonSuggestions); + + int historySuggestions = ((Integer) responseMap.get("history")); + assertEquals(37, historySuggestions); + } + + public void testSuggestTenMinute() throws Exception { + loadData(RULE_DATASET_NAME, 200); + // case 1: The following request validates against the source data to see if model training might succeed. In this example, the data + // is ingested at a rate of every ten minutes. + final String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24\n" + + "}"; + + String formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Suggest forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + Map suggestions = (Map) ((Map) responseMap.get("interval")).get("period"); + assertEquals(10, (int) suggestions.get("interval")); + assertEquals("Minutes", suggestions.get("unit")); + + // case 2: If you provide unsupported parameters, we will throw 400 error. + ResponseException exception = expectThrows( + ResponseException.class, + () -> TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI + "2"), + ImmutableMap.of(), + TestHelpers.toHttpEntity(forecasterDef), + null + ) + ); + assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); + assertTrue( + "Expect contains" + CommonMessages.NOT_EXISTENT_SUGGEST_TYPE + " ; but got" + exception.getMessage(), + exception.getMessage().contains(CommonMessages.NOT_EXISTENT_SUGGEST_TYPE) + ); + + // case 3: If the provided configuration lacks enough information (e.g., source index), we will throw 400 error. + final String errorForecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24\n" + + "}"; + + ResponseException noSourceIndexException = expectThrows( + ResponseException.class, + () -> TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(errorForecasterDef), + null + ) + ); + assertEquals(RestStatus.BAD_REQUEST.getStatus(), noSourceIndexException.getResponse().getStatusLine().getStatusCode()); + assertTrue( + "Expect contains" + CommonMessages.EMPTY_INDICES + " ; but got" + noSourceIndexException.getMessage(), + noSourceIndexException.getMessage().contains(CommonMessages.EMPTY_INDICES) + ); + } + + public void testSuggestSparseData() throws Exception { + loadData(SYNTHETIC_DATASET_NAME, 10); + // case 1: suggest 1 minute interval for a time series with 1 minute cadence + String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24\n" + + "}"; + + String formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Suggest forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + // no suggestion + assertEquals(0, responseMap.size()); + } +} diff --git a/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java b/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java index 880ca5a02..09e561e28 100644 --- a/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java +++ b/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java @@ -14,8 +14,8 @@ public void testIsForecastEnabled() { try { ForecastEnabledSetting.getInstance().setSettingValue(ForecastEnabledSetting.FORECAST_ENABLED, true); // TODO: currently, we disable forecasting - // assertTrue(ForecastEnabledSetting.isForecastEnabled()); - assertTrue(!ForecastEnabledSetting.isForecastEnabled()); + assertTrue(ForecastEnabledSetting.isForecastEnabled()); + // assertTrue(!ForecastEnabledSetting.isForecastEnabled()); ForecastEnabledSetting.getInstance().setSettingValue(ForecastEnabledSetting.FORECAST_ENABLED, false); assertTrue(!ForecastEnabledSetting.isForecastEnabled()); } finally { diff --git a/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java b/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java new file mode 100644 index 000000000..929670d2b --- /dev/null +++ b/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java @@ -0,0 +1,157 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.timeseries; + +import static org.opensearch.timeseries.TestHelpers.toHttpEntity; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Locale; + +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Logger; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.client.WarningsHandler; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.timeseries.settings.TimeSeriesSettings; + +import com.google.common.collect.ImmutableList; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +public class AbstractSyntheticDataTest extends ODFERestTestCase { + public static final Logger LOG = (Logger) LogManager.getLogger(AbstractSyntheticDataTest.class); + + /** + * In real time AD, we mute a node for a detector if that node keeps returning + * ResourceNotFoundException (5 times in a row). This is a problem for batch mode + * testing as we issue a large amount of requests quickly. Due to the speed, we + * won't be able to finish cold start before the ResourceNotFoundException mutes + * a node. Since our test case has only one node, there is no other nodes to fall + * back on. Here we disable such fault tolerance by setting max retries before + * muting to a large number and the actual wait time during muting to 0. + * + * @throws IOException when failing to create http request body + */ + protected void disableResourceNotFoundFaultTolerence() throws IOException { + XContentBuilder settingCommand = JsonXContent.contentBuilder(); + + settingCommand.startObject(); + settingCommand.startObject("persistent"); + settingCommand.field(TimeSeriesSettings.MAX_RETRY_FOR_UNRESPONSIVE_NODE.getKey(), 100_000); + settingCommand.field(TimeSeriesSettings.BACKOFF_MINUTES.getKey(), 0); + settingCommand.endObject(); + settingCommand.endObject(); + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity(settingCommand.toString()); + + adminClient().performRequest(request); + } + + public static void waitAllSyncheticDataIngested(int expectedSize, String datasetName, RestClient client) throws Exception { + int maxWaitCycles = 3; + do { + Request request = new Request("POST", String.format(Locale.ROOT, "/%s/_search", datasetName)); + request + .setJsonEntity( + String + .format( + Locale.ROOT, + "{\"query\": {" + + " \"match_all\": {}" + + " }," + + " \"size\": 1," + + " \"sort\": [" + + " {" + + " \"timestamp\": {" + + " \"order\": \"desc\"" + + " }" + + " }" + + " ]}" + ) + ); + // Make sure all of the test data has been ingested + JsonArray hits = getHits(client, request); + LOG.info("Latest synthetic data:" + hits); + if (hits != null + && hits.size() == 1 + && expectedSize - 1 == hits.get(0).getAsJsonObject().getAsJsonPrimitive("_id").getAsLong()) { + break; + } else { + request = new Request("POST", String.format(Locale.ROOT, "/%s/_refresh", datasetName)); + client.performRequest(request); + } + Thread.sleep(1_000); + } while (maxWaitCycles-- >= 0); + } + + public static JsonArray getHits(RestClient client, Request request) throws IOException { + Response response = client.performRequest(request); + return parseHits(response); + } + + public static JsonArray parseHits(Response response) throws IOException { + JsonObject json = JsonParser + .parseReader(new InputStreamReader(response.getEntity().getContent(), Charset.defaultCharset())) + .getAsJsonObject(); + JsonObject hits = json.getAsJsonObject("hits"); + if (hits == null) { + return null; + } + return hits.getAsJsonArray("hits"); + } + + protected static void bulkIndexTrainData( + String datasetName, + List data, + int trainTestSplit, + RestClient client, + String mapping + ) throws Exception { + createIndex(datasetName, client, mapping); + + StringBuilder bulkRequestBuilder = new StringBuilder(); + for (int i = 0; i < trainTestSplit; i++) { + bulkRequestBuilder.append("{ \"index\" : { \"_index\" : \"" + datasetName + "\", \"_id\" : \"" + i + "\" } }\n"); + bulkRequestBuilder.append(data.get(i).toString()).append("\n"); + } + TestHelpers + .makeRequest( + client, + "POST", + "_bulk?refresh=true", + null, + toHttpEntity(bulkRequestBuilder.toString()), + ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) + ); + Thread.sleep(1_000); + waitAllSyncheticDataIngested(trainTestSplit, datasetName, client); + } + + public static void createIndex(String datasetName, RestClient client, String mapping) throws IOException, InterruptedException { + Request request = new Request("PUT", datasetName); + request.setJsonEntity(mapping); + setWarningHandler(request, false); + client.performRequest(request); + Thread.sleep(1_000); + } + + public static void setWarningHandler(Request request, boolean strictDeprecationMode) { + RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); + options.setWarningsHandler(strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE); + request.setOptions(options.build()); + } + +} diff --git a/src/test/java/org/opensearch/ad/ODFERestTestCase.java b/src/test/java/org/opensearch/timeseries/ODFERestTestCase.java similarity index 77% rename from src/test/java/org/opensearch/ad/ODFERestTestCase.java rename to src/test/java/org/opensearch/timeseries/ODFERestTestCase.java index 5ee0fec98..eccc2ee53 100644 --- a/src/test/java/org/opensearch/ad/ODFERestTestCase.java +++ b/src/test/java/org/opensearch/timeseries/ODFERestTestCase.java @@ -9,7 +9,7 @@ * GitHub history for details. */ -package org.opensearch.ad; +package org.opensearch.timeseries; import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_PER_ROUTE; import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_TOTAL; @@ -20,19 +20,23 @@ import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_PEMCERT_FILEPATH; import java.io.IOException; -import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; -import java.nio.charset.Charset; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; import javax.net.ssl.SSLEngine; import org.apache.hc.client5.http.auth.AuthScope; @@ -50,7 +54,10 @@ import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.ssl.SSLContextBuilder; import org.apache.hc.core5.util.Timeout; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Logger; import org.junit.After; +import org.junit.AfterClass; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; @@ -59,22 +66,21 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.commons.rest.SecureRestClientBuilder; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.test.rest.OpenSearchRestTestCase; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; - /** * ODFE integration test base class to support both security disabled and enabled ODFE cluster. */ public abstract class ODFERestTestCase extends OpenSearchRestTestCase { + private static final Logger LOG = (Logger) LogManager.getLogger(ODFERestTestCase.class); protected boolean isHttps() { boolean isHttps = Optional.ofNullable(System.getProperty("https")).map("true"::equalsIgnoreCase).orElse(false); @@ -240,6 +246,33 @@ public TlsDetails create(final SSLEngine sslEngine) { } } + @AfterClass + public static void dumpCoverage() throws IOException, MalformedObjectNameException { + // jacoco.dir is set in esplugin-coverage.gradle, if it doesn't exist we don't + // want to collect coverage so we can return early + String jacocoBuildPath = System.getProperty("jacoco.dir"); + if (org.opensearch.core.common.Strings.isNullOrEmpty(jacocoBuildPath)) { + return; + } + + String serverUrl = System.getProperty("jmx.serviceUrl"); + if (serverUrl == null) { + LOG.error("Failed to dump coverage because JMX Service URL is null"); + throw new IllegalArgumentException("JMX Service URL is null"); + } + + try (JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(serverUrl))) { + IProxy proxy = MBeanServerInvocationHandler + .newProxyInstance(connector.getMBeanServerConnection(), new ObjectName("org.jacoco:type=Runtime"), IProxy.class, false); + + Path path = Path.of(Path.of(jacocoBuildPath, "integTest.exec").toFile().getCanonicalPath()); + Files.write(path, proxy.getExecutionData(false)); + } catch (Exception ex) { + LOG.error("Failed to dump coverage: ", ex); + throw new RuntimeException("Failed to dump coverage: " + ex); + } + } + /** * wipeAllIndices won't work since it cannot delete security index. Use wipeAllODFEIndices instead. */ @@ -248,45 +281,34 @@ protected boolean preserveIndicesUponCompletion() { return true; } - protected void waitAllSyncheticDataIngested(int expectedSize, String datasetName, RestClient client) throws Exception { - int maxWaitCycles = 3; - do { - Request request = new Request("POST", String.format(Locale.ROOT, "/%s/_search", datasetName)); - request - .setJsonEntity( - String - .format( - Locale.ROOT, - "{\"query\": {" - + " \"match_all\": {}" - + " }," - + " \"size\": 1," - + " \"sort\": [" - + " {" - + " \"timestamp\": {" - + " \"order\": \"desc\"" - + " }" - + " }" - + " ]}" - ) - ); - // Make sure all of the test data has been ingested - // Expected response: - // "_index":"synthetic","_type":"_doc","_id":"10080","_score":null,"_source":{"timestamp":"2019-11-08T00:00:00Z","Feature1":156.30028000000001,"Feature2":100.211205,"host":"host1"},"sort":[1573171200000]} - Response response = client.performRequest(request); - JsonObject json = JsonParser - .parseReader(new InputStreamReader(response.getEntity().getContent(), Charset.defaultCharset())) - .getAsJsonObject(); - JsonArray hits = json.getAsJsonObject("hits").getAsJsonArray("hits"); - if (hits != null - && hits.size() == 1 - && expectedSize - 1 == hits.get(0).getAsJsonObject().getAsJsonPrimitive("_id").getAsLong()) { - break; - } else { - request = new Request("POST", String.format(Locale.ROOT, "/%s/_refresh", datasetName)); - client.performRequest(request); - } - Thread.sleep(1_000); - } while (maxWaitCycles-- >= 0); + public void updateClusterSettings(String settingKey, Object value) throws Exception { + XContentBuilder builder = XContentFactory + .jsonBuilder() + .startObject() + .startObject("persistent") + .field(settingKey, value) + .endObject() + .endObject(); + Request request = new Request("PUT", "_cluster/settings"); + request.setJsonEntity(builder.toString()); + Response response = client().performRequest(request); + assertEquals(RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + Thread.sleep(2000); // sleep some time to resolve flaky test + } + + /** + * We need to be able to dump the jacoco coverage before cluster is shut down. + * The new internal testing framework removed some of the gradle tasks we were listening to + * to choose a good time to do it. This will dump the executionData to file after each test. + * TODO: This is also currently just overwriting integTest.exec with the updated execData without + * resetting after writing each time. This can be improved to either write an exec file per test + * or by letting jacoco append to the file + */ + public interface IProxy { + byte[] getExecutionData(boolean reset); + + void dump(boolean reset); + + void reset(); } } diff --git a/src/test/java/org/opensearch/timeseries/TestHelpers.java b/src/test/java/org/opensearch/timeseries/TestHelpers.java index 224f6c68c..e633f2734 100644 --- a/src/test/java/org/opensearch/timeseries/TestHelpers.java +++ b/src/test/java/org/opensearch/timeseries/TestHelpers.java @@ -124,6 +124,7 @@ import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; import org.opensearch.search.SearchModule; @@ -1855,6 +1856,11 @@ public ForecasterBuilder setNullImputationOption() { return this; } + public ForecasterBuilder setHorizon(Integer horizon) { + this.horizon = horizon; + return this; + } + public Forecaster build() { return new Forecaster( forecasterId, @@ -2051,4 +2057,98 @@ public ForecastResult build() { ); } } + + public static class JobBuilder { + private String name = randomAlphaOfLength(10); + private Schedule schedule = randomIntervalSchedule(); + private TimeConfiguration windowDelay = randomIntervalTimeConfiguration(); + private Boolean isEnabled = true; + private Instant enabledTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); + private Instant disabledTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); + private Instant lastUpdateTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); + private Long lockDurationSeconds = 60L; + private User user = randomUser(); + private String resultIndex = null; + private AnalysisType analysisType = AnalysisType.AD; + + public JobBuilder() { + + } + + public static JobBuilder newInstance() { + return new JobBuilder(); + } + + public JobBuilder name(String name) { + this.name = name; + return this; + } + + public JobBuilder schedule(Schedule schedule) { + this.schedule = schedule; + return this; + } + + public JobBuilder windowDelay(TimeConfiguration windowDelay) { + this.windowDelay = windowDelay; + return this; + } + + public JobBuilder isEnabled(Boolean isEnabled) { + this.isEnabled = isEnabled; + return this; + } + + public JobBuilder enabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + return this; + } + + public JobBuilder disabledTime(Instant disabledTime) { + this.disabledTime = disabledTime; + return this; + } + + public JobBuilder lastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + return this; + } + + public JobBuilder lockDurationSeconds(Long lockDurationSeconds) { + this.lockDurationSeconds = lockDurationSeconds; + return this; + } + + public JobBuilder user(User user) { + this.user = user; + return this; + } + + public JobBuilder resultIndex(String resultIndex) { + this.resultIndex = resultIndex; + return this; + } + + public JobBuilder analysisType(AnalysisType analysisType) { + this.analysisType = analysisType; + return this; + } + + public Job build() { + return new Job( + name, + schedule, + windowDelay, + isEnabled, + enabledTime, + disabledTime, + lastUpdateTime, + lockDurationSeconds, + user, + resultIndex, + analysisType + ); + } + } + }