From a3db266d85efc4a87fb1d98e8937f6aa0bf12085 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 12 Jul 2023 01:08:54 +0200 Subject: [PATCH] [BucketLevelMonitor] Multi-term agg support (#964) * added handling of multi-term agg in bucketlevel monitors Signed-off-by: Petar Dzepina * added handling of multi-term agg in bucketlevel monitors Signed-off-by: Petar Dzepina * added more asserts Signed-off-by: Petar Dzepina --------- Signed-off-by: Petar Dzepina --- .../org/opensearch/alerting/TriggerService.kt | 2 + .../alerting/MonitorRunnerServiceIT.kt | 86 +++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt index eaf087a63..85e5108d8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt @@ -124,6 +124,8 @@ class TriggerService(val scriptService: ScriptService) { val keyField = Aggregation.CommonFields.KEY.preferredName val keyValuesList = mutableListOf() when { + bucket[keyField] is List<*> && bucket.containsKey(Aggregation.CommonFields.KEY_AS_STRING.preferredName) -> + keyValuesList.add(bucket[Aggregation.CommonFields.KEY_AS_STRING.preferredName] as String) bucket[keyField] is String -> keyValuesList.add(bucket[keyField] as String) // In the case where the key field is an Int bucket[keyField] is Int -> keyValuesList.add(bucket[keyField].toString()) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 984acdad3..6a17b8622 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -42,7 +42,10 @@ import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder +import org.opensearch.search.aggregations.bucket.terms.MultiTermsAggregationBuilder import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder +import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder +import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig import org.opensearch.search.builder.SearchSourceBuilder import java.net.URLEncoder import java.time.Instant @@ -1179,6 +1182,89 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, buckets.size) } + fun `test execute bucket-level monitor returns search result with multi term agg`() { + val index = "test_index_1234" + indexDoc( + index, + "1", + """{"user_id": "1", + "ip_addr": "12345678", + "user_agent": "chrome" + } + """.trimIndent() + ) + indexDoc( + index, + "2", + """{"user_id": "2", + "ip_addr": "12345678", + "user_agent": "chrome" + } + """.trimIndent() + ) + indexDoc( + index, + "3", + """{"user_id": "2", + "ip_addr": "3443534", + "user_agent": "chrome" + } + """.trimIndent() + ) + + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("_value" to "distinct_user_count", "docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "hot", + filter = null + ) + ) + + val m = randomBucketLevelMonitor( + triggers = listOf(trigger), + inputs = listOf( + SearchInput( + listOf(index), + SearchSourceBuilder().aggregation( + MultiTermsAggregationBuilder("hot") + .terms( + listOf( + MultiTermsValuesSourceConfig.Builder().setFieldName("ip_addr.keyword").build(), + MultiTermsValuesSourceConfig.Builder().setFieldName("user_agent.keyword").build() + ) + ) + .subAggregation(CardinalityAggregationBuilder("distinct_user_count").field("user_id.keyword")) + ) + ) + ) + ) + val monitor = createMonitor(m) + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("hot")?.get("buckets") as List> + assertEquals("Incorrect search result", 2, buckets.size) + val distinctUserCountAgg1 = buckets.find { + it.get("key_as_string") == "12345678|chrome" + }!!.get("distinct_user_count") as Map + assertEquals(2, distinctUserCountAgg1.get("value")) + val distinctUserCountAgg2 = buckets.find { + it.get("key_as_string") == "3443534|chrome" + }!!.get("distinct_user_count") as Map + assertEquals(1, distinctUserCountAgg2.get("value")) + } + fun `test bucket-level monitor alert creation and completion`() { val testIndex = createTestIndex() insertSampleTimeSerializedData(