diff --git a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java index e7fe43106..eb189e363 100644 --- a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java @@ -52,6 +52,7 @@ import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; import org.opensearch.securityanalytics.action.*; +import org.opensearch.securityanalytics.correlation.alert.CorrelationAlertService; import org.opensearch.securityanalytics.correlation.index.codec.CorrelationCodecService; import org.opensearch.securityanalytics.correlation.index.mapper.CorrelationVectorFieldMapper; import org.opensearch.securityanalytics.correlation.index.query.CorrelationQueryBuilder; @@ -60,6 +61,7 @@ import org.opensearch.securityanalytics.logtype.LogTypeService; import org.opensearch.securityanalytics.mapper.IndexTemplateManager; import org.opensearch.securityanalytics.mapper.MapperService; +import org.opensearch.securityanalytics.model.CorrelationAlert; import org.opensearch.securityanalytics.model.CustomLogType; import org.opensearch.securityanalytics.model.ThreatIntelFeedData; import org.opensearch.securityanalytics.resthandler.*; @@ -171,7 +173,8 @@ public Collection createComponents(Client client, return List.of( detectorIndices, correlationIndices, correlationRuleIndices, ruleTopicIndices, customLogTypeIndices, ruleIndices, mapperService, indexTemplateManager, builtinLogTypeLoader, builtInTIFMetadataLoader, threatIntelFeedDataService, detectorThreatIntelService, - tifJobUpdateService, tifJobParameterService, threatIntelLockService); + tifJobUpdateService, tifJobParameterService, threatIntelLockService, + new CorrelationAlertService(client, clusterService, xContentRegistry)); } @Override @@ -239,6 +242,7 @@ public ScheduledJobParser getJobParser() { public List getNamedXContent() { return List.of( Detector.XCONTENT_REGISTRY, + CorrelationAlert.XCONTENT_REGISTRY, DetectorInput.XCONTENT_REGISTRY, Rule.XCONTENT_REGISTRY, CustomLogType.XCONTENT_REGISTRY, diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/alert/CorrelationAlertService.java b/src/main/java/org/opensearch/securityanalytics/correlation/alert/CorrelationAlertService.java new file mode 100644 index 000000000..a17b53046 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/correlation/alert/CorrelationAlertService.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.securityanalytics.correlation.alert; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.commons.alerting.model.Table; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.sort.FieldSortBuilder; +import org.opensearch.search.sort.SortBuilders; +import org.opensearch.search.sort.SortOrder; +import org.opensearch.securityanalytics.model.CorrelationAlert; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class CorrelationAlertService { + public static final String CORRELATION_ALERT_INDEX = ".opensearch-sap-correlations-alerts"; + private static final Logger log = LogManager.getLogger(CorrelationAlertService.class); + private final Client client; + private final ClusterService clusterService; + private final NamedXContentRegistry xContentRegistry; + + public CorrelationAlertService(Client client, ClusterService clusterService, NamedXContentRegistry xContentRegistry) { + this.client = client; + this.clusterService = clusterService; + this.xContentRegistry = xContentRegistry; + } + + public void getCorrelationAlerts(ActionListener listener, + Table table, + String severityLevel, + String alertState) { + try { + if (false == correlationAlertsIndexExists()) { + listener.onResponse(new CorrelationAlertsList(Collections.emptyList(), 0)); + } else { + FieldSortBuilder sortBuilder = SortBuilders + .fieldSort(table.getSortString()) + .order(SortOrder.fromString(table.getSortOrder())); + if (null != table.getMissing() && false == table.getMissing().isEmpty()) { + sortBuilder.missing(table.getMissing()); + } + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + + if (false == Objects.equals(severityLevel, "ALL")) { + queryBuilder.filter(QueryBuilders.termQuery("severity", severityLevel)); + } + if (false == Objects.equals(alertState, "ALL")) { + queryBuilder.filter(QueryBuilders.termQuery("state", alertState)); + } + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .version(true) + .seqNoAndPrimaryTerm(true) + .query(queryBuilder) + .sort(sortBuilder) + .size(table.getSize()) + .from(table.getStartIndex()); + + SearchRequest searchRequest = new SearchRequest(CORRELATION_ALERT_INDEX).source(searchSourceBuilder); + client.search(searchRequest, ActionListener.wrap( + searchResponse -> { + if (0 == searchResponse.getHits().getHits().length) { + listener.onResponse(new CorrelationAlertsList(Collections.emptyList(), 0)); + } else { + listener.onResponse( + new CorrelationAlertsList( + parseCorrelationAlerts(searchResponse), + searchResponse.getHits() != null && searchResponse.getHits().getTotalHits() != null ? + (int) searchResponse.getHits().getTotalHits().value : 0) + ); + } + }, + e -> { + log.error("Search request to fetch correlation alerts failed", e); + listener.onFailure(e); + } + )); + } + } catch (Exception e) { + log.error("Unexpected error when fetch correlation alerts", e); + listener.onFailure(e); + } + } + + public boolean correlationAlertsIndexExists() { + ClusterState clusterState = clusterService.state(); + return clusterState.getRoutingTable().hasIndex(CORRELATION_ALERT_INDEX); + } + + public List parseCorrelationAlerts(final SearchResponse response) throws IOException { + List alerts = new ArrayList<>(); + for (SearchHit hit : response.getHits()) { + XContentParser xcp = XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString()); + CorrelationAlert correlationAlert = CorrelationAlert.docParse(xcp, hit.getId(), hit.getVersion()); + alerts.add(correlationAlert); + } + return alerts; + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/alert/CorrelationAlertsList.java b/src/main/java/org/opensearch/securityanalytics/correlation/alert/CorrelationAlertsList.java new file mode 100644 index 000000000..88cdd2cc3 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/correlation/alert/CorrelationAlertsList.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.securityanalytics.correlation.alert; + +import org.opensearch.securityanalytics.model.CorrelationAlert; + +import java.util.List; + +/** + * Wrapper class that holds list of correlation alerts and total number of alerts available. + * Useful for pagination. + */ +public class CorrelationAlertsList { + + private final List correlationAlertList; + private final Integer totalAlerts; + + public CorrelationAlertsList(List correlationAlertList, Integer totalAlerts) { + this.correlationAlertList = correlationAlertList; + this.totalAlerts = totalAlerts; + } + + public List getCorrelationAlertList() { + return correlationAlertList; + } + + public Integer getTotalAlerts() { + return totalAlerts; + } + +} diff --git a/src/main/java/org/opensearch/securityanalytics/model/CorrelationAlert.java b/src/main/java/org/opensearch/securityanalytics/model/CorrelationAlert.java new file mode 100644 index 000000000..11cd53997 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/model/CorrelationAlert.java @@ -0,0 +1,424 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.securityanalytics.model; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.commons.alerting.model.ActionExecutionResult; +import org.opensearch.commons.alerting.model.Alert; +import org.opensearch.commons.authuser.User; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * Model for docs store in .opensearch-sap-correlation-alerts index. + * Correlation alerts are created when a detector finding triggers correlation + */ +public class CorrelationAlert implements Writeable, ToXContentObject { + + private static final Logger log = LogManager.getLogger(CorrelationAlert.class); + private static final String ID_FIELD = "id"; + private static final String START_TIME_FIELD = "start_time"; + private static final String ACKNOWLEDGED_TIME_FIELD = "acknowledged_time"; + private static final String LAST_NOTIFICATION_TIME_FIELD = "last_notification_time"; + private static final String END_TIME_FIELD = "end_time"; + private static final String ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results"; + private static final String VERSION_FIELD = "version"; + private static final String SCHEMA_VERSION_FIELD = "schema_version"; + private static final String TRIGGER_ID_FIELD = "trigger_id"; + private static final String TRIGGER_NAME_FIELD = "trigger_name"; + private static final String ERROR_MESSAGE_FIELD = "error_message"; + private static final String CORRELATED_FINDING_IDS_FIELD = "correlated_finding_ids"; + private static final String CORRELATED_RULE_NAMES_FIELD = "correlated_rule_names"; + private static final String CORRELATION_RULE_IDS_FIELD = "correlation_rule_ids"; + private static final String USER_FIELD = "user"; + private static final String SEVERITY_FIELD = "severity"; + private static final String STATE_FIELD = "state"; + public static final String NO_ID = ""; + public static final Long NO_VERSION = 1L; + public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY = new NamedXContentRegistry.Entry( + CorrelationAlert.class, + new ParseField(ID_FIELD), + xcp -> parse(xcp, null, null) + ); + + private String id; + private final Instant startTime; + private final Instant acknowledgedTime; + private final Instant lastNotificationTime; + private final Instant endTime; + private final List actionExecutionResults; + private Long version; + private final Long schemaVersion; + private final String triggerName; + private final String triggerId; + private final String errorMessage; + private final List correlatedFindingIds; + private final List correlationRuleNames; + private final List correlationIds; + private final User user; + private final String severity; + private final Alert.State state; + + + public CorrelationAlert(String id, Instant startTime, Instant acknowledgedTime, + Instant lastNotificationTime, Instant endTime, List correlationIds, + List actionExecutionResults, Long version, Long schemaVersion, + String triggerName, String triggerId, String errorMessage, + List correlatedFindingIds, List correlationRuleNames, User user, + String severity, Alert.State state) { + this.id = id; + this.startTime = startTime; + this.acknowledgedTime = acknowledgedTime; + this.lastNotificationTime = lastNotificationTime; + this.endTime = endTime; + this.correlationIds = correlationIds; + this.actionExecutionResults = actionExecutionResults; + this.version = version; + this.schemaVersion = schemaVersion; + this.triggerName = triggerName; + this.triggerId = triggerId; + this.errorMessage = errorMessage; + this.correlatedFindingIds = correlatedFindingIds; + this.correlationRuleNames = correlationRuleNames; + this.user = user; + this.severity = severity; + this.state = state; + } + + public CorrelationAlert(StreamInput sin) throws IOException { + this( + sin.readString(), + sin.readInstant(), + sin.readOptionalInstant(), + sin.readInstant(), + sin.readOptionalInstant(), + sin.readStringList(), + sin.readList(ActionExecutionResult::new), + sin.readLong(), + sin.readLong(), + sin.readString(), + sin.readString(), + sin.readString(), + sin.readStringList(), + sin.readStringList(), + sin.readBoolean() ? new User(sin) : null, + sin.readString(), + sin.readEnum(Alert.State.class) + ); + } + + public static CorrelationAlert docParse(XContentParser xcp, String id, Long version) throws IOException { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp); + CorrelationAlert correlationAlert = xcp.namedObject(CorrelationAlert.class, xcp.currentName(), null); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp); + + correlationAlert.setId(id); + correlationAlert.setVersion(version); + return correlationAlert; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeInstant(startTime); + out.writeOptionalInstant(acknowledgedTime); + out.writeInstant(lastNotificationTime); + out.writeOptionalInstant(endTime); + out.writeStringCollection(correlationIds); + out.writeCollection(actionExecutionResults); + out.writeLong(version); + out.writeLong(schemaVersion); + out.writeString(triggerName); + out.writeString(triggerId); + out.writeString(errorMessage); + out.writeStringCollection(correlatedFindingIds); + out.writeStringCollection(correlationRuleNames); + out.writeBoolean(user != null); + if (user != null) { + user.writeTo(out); + } + out.writeString(severity); + out.writeEnum(state); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return createXContentBuilder(builder, params, true); + } + + public XContentBuilder toXContentWithUser(XContentBuilder builder, Params params) throws IOException { + return createXContentBuilder(builder, params, false); + } + + private XContentBuilder createXContentBuilder(XContentBuilder builder, Params params, boolean secure) throws IOException { + builder.startObject() + .field(ID_FIELD, id) + .field(START_TIME_FIELD, startTime) + .field(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime) + .field(LAST_NOTIFICATION_TIME_FIELD, lastNotificationTime) + .field(END_TIME_FIELD, endTime) + .field(CORRELATED_FINDING_IDS_FIELD, correlatedFindingIds) + .field(ACTION_EXECUTION_RESULTS_FIELD, actionExecutionResults) + .field(VERSION_FIELD, version) + .field(SCHEMA_VERSION_FIELD, schemaVersion) + .field(TRIGGER_NAME_FIELD, triggerName) + .field(TRIGGER_ID_FIELD, triggerId) + .field(ERROR_MESSAGE_FIELD, errorMessage) + .field(CORRELATION_RULE_IDS_FIELD, correlationIds) + .field(CORRELATED_RULE_NAMES_FIELD, correlationRuleNames); + if (!secure) { + if (user == null) { + builder.nullField(USER_FIELD); + } else { + builder.field(USER_FIELD, user); + } + } + builder.field(SEVERITY_FIELD, severity); + builder.field(STATE_FIELD, state); + return builder; + } + + public static CorrelationAlert parse(XContentParser xcp, String id, Long version) throws IOException { + if (id == null) { + id = NO_ID; + } + if (version == null) { + version = NO_VERSION; + } + Instant startTime = null; + Instant acknowledgedTime = null; + Instant lastNotificationTime = null; + Instant endTime = null; + List actionExecutionResults = new ArrayList<>(); + Long schemaVersion = NO_VERSION; + String triggerName = ""; + String triggerId = ""; + String errorMessage = ""; + List correlatedFindingIds = new ArrayList<>(); + List correlationRuleNames = new ArrayList<>(); + List correlationIds = new ArrayList<>(); + User user = null; + String severity = ""; + Alert.State state = null; + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = xcp.currentName(); + xcp.nextToken(); + + switch (fieldName) { + case ID_FIELD: + id = xcp.text(); + break; + case START_TIME_FIELD: + if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + startTime = null; + } else if (xcp.currentToken().isValue()) { + startTime = Instant.ofEpochMilli(xcp.longValue()); + } else { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.getTokenLocation()); + startTime = null; + } + break; + case ACKNOWLEDGED_TIME_FIELD: + if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + acknowledgedTime = null; + } else if (xcp.currentToken().isValue()) { + acknowledgedTime = Instant.ofEpochMilli(xcp.longValue()); + } else { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.getTokenLocation()); + acknowledgedTime = null; + } + break; + case LAST_NOTIFICATION_TIME_FIELD: + if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + lastNotificationTime = null; + } else if (xcp.currentToken().isValue()) { + lastNotificationTime = Instant.ofEpochMilli(xcp.longValue()); + } else { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.getTokenLocation()); + lastNotificationTime = null; + } + break; + case END_TIME_FIELD: + if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + endTime = null; + } else if (xcp.currentToken().isValue()) { + endTime = Instant.ofEpochMilli(xcp.longValue()); + } else { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.getTokenLocation()); + endTime = null; + } + break; + case ACTION_EXECUTION_RESULTS_FIELD: + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + actionExecutionResults.add(ActionExecutionResult.parse(xcp)); + } + break; + case VERSION_FIELD: + version = xcp.longValue(); + break; + case SCHEMA_VERSION_FIELD: + schemaVersion = xcp.longValue(); + break; + case TRIGGER_ID_FIELD: + triggerId = xcp.text(); + break; + case TRIGGER_NAME_FIELD: + triggerName = xcp.text(); + break; + case ERROR_MESSAGE_FIELD: + errorMessage = xcp.text(); + break; + case CORRELATED_FINDING_IDS_FIELD: + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + String correlatedFindingId = xcp.text(); + correlatedFindingIds.add(correlatedFindingId); + } + break; + case CORRELATED_RULE_NAMES_FIELD: + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + String correlatedRuleName = xcp.text(); + correlationRuleNames.add(correlatedRuleName); + } + break; + case CORRELATION_RULE_IDS_FIELD: + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + String correlatedRuleName = xcp.text(); + correlationIds.add(correlatedRuleName); + } + break; + case USER_FIELD: + if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + user = null; + } else { + user = User.parse(xcp); + } + break; + case SEVERITY_FIELD: + severity = xcp.text(); + break; + case STATE_FIELD: + state = Alert.State.valueOf(xcp.text()); + break; + } + } + return new CorrelationAlert( + id, + startTime, + acknowledgedTime, + lastNotificationTime, + endTime, + correlationIds, + actionExecutionResults, + version, + schemaVersion, + triggerName, + triggerId, + errorMessage, + correlatedFindingIds, + correlationRuleNames, + user, + severity, + state + ); + } + + public String getId() { + return id; + } + + public Instant getStartTime() { + return startTime; + } + + public Instant getAcknowledgedTime() { + return acknowledgedTime; + } + + public Instant getLastNotificationTime() { + return lastNotificationTime; + } + + public Instant getEndTime() { + return endTime; + } + + public List getCorrelationIds() { + return correlationIds; + } + + public List getActionExecutionResults() { + return actionExecutionResults; + } + + public Long getVersion() { + return version; + } + + public Long getSchemaVersion() { + return schemaVersion; + } + + public String getTriggerName() { + return triggerName; + } + + public String getTriggerId() { + return triggerId; + } + + public String getErrorMessage() { + return errorMessage; + } + + public List getCorrelatedFindingIds() { + return correlatedFindingIds; + } + + public List getCorrelationRuleNames() { + return correlationRuleNames; + } + + public User getUser() { + return user; + } + + public String getSeverity() { + return severity; + } + + public Alert.State getState() { + return state; + } + + private void setVersion(Long version) { + this.version = version; + } + + private void setId(String id) { + this.id = id; + } +} diff --git a/src/main/resources/mappings/correlation_alert_mapping.json b/src/main/resources/mappings/correlation_alert_mapping.json new file mode 100644 index 000000000..d42eec24a --- /dev/null +++ b/src/main/resources/mappings/correlation_alert_mapping.json @@ -0,0 +1,112 @@ +{ + "mappings": { + "dynamic": "strict", + "_meta": { + "schema_version": 1 + }, + "properties": { + "acknowledged_time": { + "type": "date" + }, + "action_execution_results": { + "type": "nested", + "properties": { + "action_id": { + "type": "keyword" + }, + "last_execution_time": { + "type": "date" + }, + "throttled_count": { + "type": "integer" + } + } + }, + "end_time": { + "type": "date" + }, + "error_message": { + "type": "text" + }, + "correlated_finding_ids": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "last_notification_time": { + "type": "date" + }, + "correlation_id": { + "type": "keyword" + }, + "correlation_rule_names": { + "type": "keyword" + }, + "user": { + "properties": { + "backend_roles": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "custom_attribute_names": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "roles": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + } + } + }, + "schema_version": { + "type": "integer" + }, + "severity": { + "type": "keyword" + }, + "start_time": { + "type": "date" + }, + "state": { + "type": "keyword" + }, + "trigger_id": { + "type": "keyword" + }, + "trigger_name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "version": { + "type": "long" + } + } + } +}