Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add distributed locking to jobs in alerting #1542

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
Expand Down Expand Up @@ -259,6 +260,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
): Collection<Any> {
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
val lockService = LockService(client, clusterService)
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
val alertService = AlertService(client, xContentRegistry, alertIndices)
val triggerService = TriggerService(scriptService)
Expand All @@ -277,6 +279,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerJvmStats(JvmStats.jvmStats())
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerLockService(lockService)
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
Expand All @@ -301,7 +304,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
settings
)

DeleteMonitorService.initialize(client)
DeleteMonitorService.initialize(client, lockService)

return listOf(
sweeper,
Expand All @@ -311,7 +314,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
docLevelMonitorQueries,
destinationMigrationCoordinator,
alertService,
triggerService
triggerService,
lockService
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
Expand Down Expand Up @@ -60,5 +61,6 @@ data class MonitorRunnerExecutionContext(
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
@Volatile var docLevelMonitorShardFetchSize: Int =
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
@Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES
@Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES,
@Volatile var lockService: LockService? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
Expand Down Expand Up @@ -242,6 +244,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerLockService(lockService: LockService): MonitorRunnerService {
monitorCtx.lockService = lockService
return this
}

// Updates destination settings when the reload API is called so that new keystore values are visible
fun reloadDestinationSettings(settings: Settings) {
monitorCtx.destinationSettings = loadDestinationSettings(settings)
Expand Down Expand Up @@ -313,36 +320,64 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
monitorCtx.client!!.suspendUntil<Client, ExecuteWorkflowResponse> {
monitorCtx.client!!.execute(
ExecuteWorkflowAction.INSTANCE,
ExecuteWorkflowRequest(
false,
TimeValue(periodEnd.toEpochMilli()),
job.id,
job,
TimeValue(periodStart.toEpochMilli())
),
it
var lock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing workflow ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
monitorCtx.client!!.suspendUntil<Client, ExecuteWorkflowResponse> {
monitorCtx.client!!.execute(
ExecuteWorkflowAction.INSTANCE,
ExecuteWorkflowRequest(
false,
TimeValue(periodEnd.toEpochMilli()),
job.id,
job,
TimeValue(periodStart.toEpochMilli())
),
it
)
}
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
}
}
}
is Monitor -> {
launch {
val executeMonitorRequest = ExecuteMonitorRequest(
false,
TimeValue(periodEnd.toEpochMilli()),
job.id,
job,
TimeValue(periodStart.toEpochMilli())
)
monitorCtx.client!!.suspendUntil<Client, ExecuteMonitorResponse> {
monitorCtx.client!!.execute(
ExecuteMonitorAction.INSTANCE,
executeMonitorRequest,
it
var lock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
val executeMonitorRequest = ExecuteMonitorRequest(
false,
TimeValue(periodEnd.toEpochMilli()),
job.id,
job,
TimeValue(periodStart.toEpochMilli())
)
monitorCtx.client!!.suspendUntil<Client, ExecuteMonitorResponse> {
monitorCtx.client!!.execute(
ExecuteMonitorAction.INSTANCE,
executeMonitorRequest,
it
)
}
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH
Expand All @@ -48,11 +50,14 @@ object DeleteMonitorService :
private val log = LogManager.getLogger(this.javaClass)

private lateinit var client: Client
private lateinit var lockService: LockService

fun initialize(
client: Client,
lockService: LockService
) {
DeleteMonitorService.client = client
DeleteMonitorService.lockService = lockService
}

/**
Expand All @@ -64,6 +69,7 @@ object DeleteMonitorService :
val deleteResponse = deleteMonitor(monitor.id, refreshPolicy)
deleteDocLevelMonitorQueriesAndIndices(monitor)
deleteMetadata(monitor)
deleteLock(monitor)
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
}

Expand Down Expand Up @@ -147,6 +153,10 @@ object DeleteMonitorService :
}
}

private suspend fun deleteLock(monitor: Monitor) {
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) }
}

/**
* Checks if the monitor is part of the workflow
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.WorkflowMetadata
import org.opensearch.alerting.opensearchapi.addFilter
Expand Down Expand Up @@ -73,6 +75,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry,
val lockService: LockService
) : HandledTransportAction<ActionRequest, DeleteWorkflowResponse>(
AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest
),
Expand Down Expand Up @@ -180,6 +183,12 @@ class TransportDeleteWorkflowAction @Inject constructor(
} catch (t: Exception) {
log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t)
}
try {
// Delete the workflow lock
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(workflowId), it) }
} catch (t: Exception) {
log.error("Failed to delete workflow lock for $workflowId")
}
actionListener.onResponse(deleteWorkflowResponse)
} else {
actionListener.onFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
Expand Down Expand Up @@ -473,6 +474,86 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1"))
}

fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
Thread.sleep(240000)

val inputMap = HashMap<String, Any>()
inputMap["searchString"] = monitor.name

val responseMap = getAlerts(inputMap).asMap()
val alerts = (responseMap["alerts"] as ArrayList<Map<String, Any>>)
alerts.forEach {
assertTrue(it["error_message"] == null)
}
}

@AwaitsFix(bugUrl = "")
fun `test monitor run generate lock and monitor delete removes lock`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
OpenSearchTestCase.waitUntil({
val response = client().makeRequest("HEAD", LockService.LOCK_INDEX_NAME)
return@waitUntil (response.restStatus().status == 200)
}, 240, TimeUnit.SECONDS)

var response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
var responseMap = entityAsMap(response)
var noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
assertEquals(1, noOfLocks)

deleteMonitor(monitor)
refreshIndex(LockService.LOCK_INDEX_NAME)
response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
responseMap = entityAsMap(response)
noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
assertEquals(0, noOfLocks)
}

fun `test execute monitor with tag as trigger condition generates alerts and findings`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.junit.annotations.TestLogging
import java.time.Instant
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.Collections
import java.util.Locale
Expand Down Expand Up @@ -1185,4 +1187,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
}

fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = false,
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)
createWorkflow(
randomWorkflow(
monitorIds = listOf(monitor.id),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
Thread.sleep(240000)

val alerts = searchAlerts(monitor)
alerts.forEach {
assertTrue(it.errorMessage == null)
}
}
}
Loading
Loading