Skip to content

Commit

Permalink
[BucketLevelMonitor] Multi-term agg support (#964)
Browse files Browse the repository at this point in the history
* added handling of multi-term agg in bucketlevel monitors

Signed-off-by: Petar Dzepina <[email protected]>

* added handling of multi-term agg in bucketlevel monitors

Signed-off-by: Petar Dzepina <[email protected]>

* added more asserts

Signed-off-by: Petar Dzepina <[email protected]>

---------

Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
petardz authored Jul 11, 2023
1 parent a5ad3b9 commit a3db266
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class TriggerService(val scriptService: ScriptService) {
val keyField = Aggregation.CommonFields.KEY.preferredName
val keyValuesList = mutableListOf<String>()
when {
bucket[keyField] is List<*> && bucket.containsKey(Aggregation.CommonFields.KEY_AS_STRING.preferredName) ->
keyValuesList.add(bucket[Aggregation.CommonFields.KEY_AS_STRING.preferredName] as String)
bucket[keyField] is String -> keyValuesList.add(bucket[keyField] as String)
// In the case where the key field is an Int
bucket[keyField] is Int -> keyValuesList.add(bucket[keyField].toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder
import org.opensearch.search.aggregations.bucket.terms.MultiTermsAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder
import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig
import org.opensearch.search.builder.SearchSourceBuilder
import java.net.URLEncoder
import java.time.Instant
Expand Down Expand Up @@ -1179,6 +1182,89 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
assertEquals("Incorrect search result", 2, buckets.size)
}

fun `test execute bucket-level monitor returns search result with multi term agg`() {
val index = "test_index_1234"
indexDoc(
index,
"1",
"""{"user_id": "1",
"ip_addr": "12345678",
"user_agent": "chrome"
}
""".trimIndent()
)
indexDoc(
index,
"2",
"""{"user_id": "2",
"ip_addr": "12345678",
"user_agent": "chrome"
}
""".trimIndent()
)
indexDoc(
index,
"3",
"""{"user_id": "2",
"ip_addr": "3443534",
"user_agent": "chrome"
}
""".trimIndent()
)

val triggerScript = """
params.docCount > 0
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("_value" to "distinct_user_count", "docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "hot",
filter = null
)
)

val m = randomBucketLevelMonitor(
triggers = listOf(trigger),
inputs = listOf(
SearchInput(
listOf(index),
SearchSourceBuilder().aggregation(
MultiTermsAggregationBuilder("hot")
.terms(
listOf(
MultiTermsValuesSourceConfig.Builder().setFieldName("ip_addr.keyword").build(),
MultiTermsValuesSourceConfig.Builder().setFieldName("user_agent.keyword").build()
)
)
.subAggregation(CardinalityAggregationBuilder("distinct_user_count").field("user_id.keyword"))
)
)
)
)
val monitor = createMonitor(m)
val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR)
val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("hot")?.get("buckets") as List<Map<String, Any>>
assertEquals("Incorrect search result", 2, buckets.size)
val distinctUserCountAgg1 = buckets.find {
it.get("key_as_string") == "12345678|chrome"
}!!.get("distinct_user_count") as Map<String, Integer>
assertEquals(2, distinctUserCountAgg1.get("value"))
val distinctUserCountAgg2 = buckets.find {
it.get("key_as_string") == "3443534|chrome"
}!!.get("distinct_user_count") as Map<String, Integer>
assertEquals(1, distinctUserCountAgg2.get("value"))
}

fun `test bucket-level monitor alert creation and completion`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
Expand Down

0 comments on commit a3db266

Please sign in to comment.