From 71dcd0432e2773ef74fc89770257475420652dfe Mon Sep 17 00:00:00 2001 From: Thomas Patzke Date: Sun, 8 Oct 2023 19:18:00 +0300 Subject: [PATCH] Eql backend (#32) * Renamed current lucene backend * Tests: Refactored connect test to support xpack.security * Rename tests to lucene tests * Feature: Elasticsearch EQL Backend * Nocover for overriden function * Fix typo * Fix override * Tests: Import backends directly * Removed imports Thanks to the autoloader * Support for case sensitiveness --------- Co-authored-by: Hendrik --- sigma/backends/elasticsearch/__init__.py | 5 - .../elasticsearch/elasticsearch_eql.py | 408 +++++++++++++ ...asticsearch.py => elasticsearch_lucene.py} | 0 tests/test_backend_elasticsearch_eql.py | 572 ++++++++++++++++++ .../test_backend_elasticsearch_eql_connect.py | 562 +++++++++++++++++ ...y => test_backend_elasticsearch_lucene.py} | 2 +- ...t_backend_elasticsearch_lucene_connect.py} | 116 ++-- tests/test_pipelines_windows.py | 2 +- tests/test_pipelines_zeek.py | 2 +- 9 files changed, 1621 insertions(+), 48 deletions(-) create mode 100644 sigma/backends/elasticsearch/elasticsearch_eql.py rename sigma/backends/elasticsearch/{elasticsearch.py => elasticsearch_lucene.py} (100%) create mode 100644 tests/test_backend_elasticsearch_eql.py create mode 100644 tests/test_backend_elasticsearch_eql_connect.py rename tests/{test_backend_elasticsearch.py => test_backend_elasticsearch_lucene.py} (99%) rename tests/{test_backend_elasticsearch_connect.py => test_backend_elasticsearch_lucene_connect.py} (77%) diff --git a/sigma/backends/elasticsearch/__init__.py b/sigma/backends/elasticsearch/__init__.py index 9b4052c..e69de29 100644 --- a/sigma/backends/elasticsearch/__init__.py +++ b/sigma/backends/elasticsearch/__init__.py @@ -1,5 +0,0 @@ -from .elasticsearch import LuceneBackend - -backends = { - "elasticsearch": LuceneBackend, -} diff --git a/sigma/backends/elasticsearch/elasticsearch_eql.py b/sigma/backends/elasticsearch/elasticsearch_eql.py new file mode 100644 index 0000000..2e6a7ff --- /dev/null +++ b/sigma/backends/elasticsearch/elasticsearch_eql.py @@ -0,0 +1,408 @@ +import re +import json +from typing import ClassVar, Dict, List, Optional, Pattern, Tuple, Union, Any + +from sigma.conversion.state import ConversionState +from sigma.rule import SigmaRule +from sigma.conversion.base import TextQueryBackend +from sigma.conversion.deferred import DeferredQueryExpression +from sigma.conditions import ( + ConditionItem, + ConditionAND, + ConditionOR, + ConditionNOT, + ConditionFieldEqualsValueExpression, +) +from sigma.types import SigmaCompareExpression, SigmaNull, SpecialChars, SigmaNumber +import ipaddress +import sigma + + +class EqlBackend(TextQueryBackend): + """ + Elasticsearch event query language backend. Generates query strings described here in the + Elasticsearch documentation: + + https://www.elastic.co/guide/en/elasticsearch/reference/current/eql.html + """ + + # A descriptive name of the backend + name: ClassVar[str] = "Elasticsearch EQL" + # Output formats provided by the backend as name -> description mapping. + # The name should match to finalize_output_. + formats: ClassVar[Dict[str, str]] = { + "default": "Plain Elasticsearch EQL queries.", + "eqlapi": "Plain EQL queries ready for '/_eql/search' API endpoint.", + "siem_rule": "Elasticsearch EQL queries as SIEM Rule.", + "siem_rule_ndjson": "Elasticsearch EQL Query as SIEM Rules in NDJSON Format.", + } + # Does the backend requires that a processing pipeline is provided? + requires_pipeline: ClassVar[bool] = True + + # Operator precedence: tuple of Condition{AND,OR,NOT} in order of precedence. + # The backend generates grouping if required + precedence: ClassVar[Tuple[ConditionItem, ConditionItem, ConditionItem]] = ( + ConditionNOT, + ConditionOR, + ConditionAND, + ) + # Expression for precedence override grouping as format string with {expr} placeholder + group_expression: ClassVar[str] = "({expr})" + parenthesize: bool = True + + # Generated query tokens + token_separator: str = " " # separator inserted between all boolean operators + or_token: ClassVar[str] = "or" + and_token: ClassVar[str] = "and" + not_token: ClassVar[str] = "not" + # Token inserted between field and value (without separator) + eq_token: ClassVar[str] = ":" + + field_quote: ClassVar[str] = "`" + field_quote_pattern: ClassVar[Pattern] = re.compile(r"^\d.*|.*\s.*|-") + field_quote_pattern_negation: ClassVar[bool] = False + # String output + # Fields + # No quoting of field names + # Escaping + # Character to escape particular parts defined in field_escape_pattern. + # field_escape: ClassVar[str] = "" + # All matches of this pattern are prepended with the string contained in field_escape. + # field_escape_pattern: ClassVar[Pattern] = re.compile("[\\s*]") + + # Values + # string quoting character (added as escaping character) + str_quote: ClassVar[str] = '"' + str_quote_pattern: ClassVar[Pattern] = re.compile(r"^$|.*") + str_quote_pattern_negation: ClassVar[bool] = False + # Escaping character for special characrers inside string + escape_char: ClassVar[str] = "\\" + # Character used as multi-character wildcard + wildcard_multi: ClassVar[str] = "*" + # Character used as single-character wildcard + wildcard_single: ClassVar[str] = "?" + # Characters quoted in addition to wildcards and string quote + # add_escaped: ClassVar[str] = '+-=&|!(){}[]<>^"~*?:\\/ ' + add_escaped: ClassVar[str] = '\n\r\t\\"' + bool_values: ClassVar[ + Dict[bool, str] + ] = { # Values to which boolean values are mapped. + True: "true", + False: "false", + } + + # Regular expressions + # Regular expression query as format string with placeholders {field} and {regex} + re_expression: ClassVar[str] = '{field} regex~ "{regex}"' + # Character used for escaping in regular expressions + re_escape_char: ClassVar[str] = "\\" + re_escape: ClassVar[Tuple[str]] = ("/",) + # Don't escape the escape char + re_escape_escape_char: ClassVar[bool] = False + + # Case sensitive string matching expression. String is quoted/escaped like a normal string. + # Placeholders {field} and {value} are replaced with field name and quoted/escaped string. + case_sensitive_match_expression: ClassVar[Optional[str]] = '{field} == {value}' + + # cidr expressions + # CIDR expression query as format string with placeholders {field} = {value} + cidr_expression: ClassVar[str] = 'cidrMatch({field}, "{network}/{prefixlen}")' + + # Numeric comparison operators + # Compare operation query as format string with placeholders {field}, {operator} and {value} + compare_op_expression: ClassVar[str] = "{field} {operator} {value}" + # Mapping between CompareOperators elements and strings used as replacement + # for {operator} in compare_op_expression + compare_operators: ClassVar[Dict[SigmaCompareExpression.CompareOperators, str]] = { + SigmaCompareExpression.CompareOperators.LT: "<", + SigmaCompareExpression.CompareOperators.LTE: "<=", + SigmaCompareExpression.CompareOperators.GT: ">", + SigmaCompareExpression.CompareOperators.GTE: ">=", + } + + # Null/None expressions + # Expression for field has null value as format string with {field} placeholder for field name + field_null_expression: ClassVar[str] = "?{field} == null" + + # Field value in list, e.g. "field in (value list)" or "field containsall (value list)" + # Convert OR as in-expression + convert_or_as_in: ClassVar[bool] = True + # Convert AND as in-expression + convert_and_as_in: ClassVar[bool] = False + # Values in list can contain wildcards. If set to False (default) + # only plain values are converted into in-expressions. + in_expressions_allow_wildcards: ClassVar[bool] = True + # Expression for field in list of values as format string with + # placeholders {field}, {op} and {list} + field_in_list_expression: ClassVar[str] = "{field}{op}({list})" + # Operator used to convert OR into in-expressions. Must be set if convert_or_as_in is set + or_in_operator: ClassVar[str] = " like~ " + # List element separator + list_separator: ClassVar[str] = ", " + + # Value not bound to a field + # Expression for string value not bound to a field as format string with placeholder {value} + unbound_value_str_expression: ClassVar[str] = '"{value}"' + # Expression for number value not bound to a field as format string with placeholder {value} + unbound_value_num_expression: ClassVar[str] = "{value}" + + def __init__( + self, + processing_pipeline: Optional[ + "sigma.processing.pipeline.ProcessingPipeline" + ] = None, + collect_errors: bool = False, + index_names: List = [ + "apm-*-transaction*", + "auditbeat-*", + "endgame-*", + "filebeat-*", + "logs-*", + "packetbeat-*", + "traces-apm*", + "winlogbeat-*", + "-*elastic-cloud-logs-*", + ], + schedule_interval: int = 5, + schedule_interval_unit: str = "m", + **kwargs, + ): + super().__init__(processing_pipeline, collect_errors, **kwargs) + self.index_names = index_names or [ + "apm-*-transaction*", + "auditbeat-*", + "endgame-*", + "filebeat-*", + "logs-*", + "packetbeat-*", + "traces-apm*", + "winlogbeat-*", + "-*elastic-cloud-logs-*", + ] + self.schedule_interval = schedule_interval or 5 + self.schedule_interval_unit = schedule_interval_unit or "m" + self.severity_risk_mapping = { + "INFORMATIONAL": 1, + "LOW": 21, + "MEDIUM": 47, + "HIGH": 73, + "CRITICAL": 99, + } + + @staticmethod + def _is_field_null_condition(cond: ConditionItem) -> bool: + return isinstance(cond, ConditionFieldEqualsValueExpression) and isinstance( + cond.value, SigmaNull + ) + + def is_ip(self, value: ConditionFieldEqualsValueExpression) -> bool: + try: + ipaddress.ip_address(value) + return True + except ValueError: + return False + + def convert_condition_field_eq_val_str( + self, cond: ConditionFieldEqualsValueExpression, state: ConversionState + ) -> Union[str, DeferredQueryExpression]: # pragma: no cover + """Conversion of field = string value expressions""" + if ( # Use '==' as operator for empty string or ip addresses + cond.value.convert() == "" or self.is_ip(cond.value) + ): + expr = "{field}" + "==" + "{value}" + value = cond.value + return expr.format( + field=self.escape_and_quote_field(cond.field), + value=self.convert_value_str(value, state), + ) + else: + return super().convert_condition_field_eq_val_str(cond, state) + + def convert_condition_not( + self, cond: ConditionNOT, state: ConversionState + ) -> Union[str, DeferredQueryExpression]: + """When checking if a field is not null, convert "NOT NOT _exists_:field" to "_exists_:field".""" + if EqlBackend._is_field_null_condition(cond.args[0]): + # return f"_exists_:{cond.args[0].field}" + return f"?{cond.args[0].field} != null" + + return super().convert_condition_not(cond, state) + + def compare_precedence(self, outer: ConditionItem, inner: ConditionItem) -> bool: + """Override precedence check for null field conditions.""" + if isinstance(inner, ConditionNOT) and EqlBackend._is_field_null_condition( + inner.args[0] + ): + # inner will turn into "_exists_:field", no parentheses needed + return True + + if EqlBackend._is_field_null_condition(inner): + # inner will turn into "NOT _exists_:field", force parentheses + return False + + return super().compare_precedence(outer, inner) + + def finalize_query_default( + self, rule: SigmaRule, query: str, index: int, state: ConversionState + ) -> Any: + # TODO: implement the per-query output for the output format {{ format }} here. Usually, the generated query is + # embedded into a template, e.g. a JSON format with additional information from the Sigma rule. + # TODO: proper type annotation. + return f"any where {query}" + + def finalize_output_default(self, queries: List[str]) -> Any: + # TODO: implement the output finalization for all generated queries for the format {{ format }} here. Usually, + # the single generated queries are embedded into a structure, e.g. some JSON or XML that can be imported into + # the SIEM. + # TODO: proper type annotation. Sigma CLI supports: + # - str: output as is. + # - bytes: output in file only (e.g. if a zip package is output). + # - dict: output serialized as JSON. + # - list of str: output each item as is separated by two newlines. + # - list of dict: serialize each item as JSON and output all separated by newlines. + return list(queries) + + def finalize_query_eqlapi( + self, rule: SigmaRule, query: str, index: int, state: ConversionState + ) -> Dict: + """ + Create EQL Queries ready to be used against the '_eql/search' API Endpoint. + """ + return {"query": f"any where {query}"} + + def finalize_output_eqlapi(self, queries: List[str]) -> Any: + # TODO: implement the output finalization for all generated queries for the format {{ format }} here. Usually, + # the single generated queries are embedded into a structure, e.g. some JSON or XML that can be imported into + # the SIEM. + # TODO: proper type annotation. Sigma CLI supports: + # - str: output as is. + # - bytes: output in file only (e.g. if a zip package is output). + # - dict: output serialized as JSON. + # - list of str: output each item as is separated by two newlines. + # - list of dict: serialize each item as JSON and output all separated by newlines. + return list(queries) + + def finalize_query_siem_rule( + self, rule: SigmaRule, query: str, index: int, state: ConversionState + ) -> Dict: + """ + Create SIEM Rules in JSON Format. These rules could be imported into Kibana using the + Create Rule API https://www.elastic.co/guide/en/kibana/8.6/create-rule-api.html + This API (and generated data) is NOT the same like importing Detection Rules via: + Kibana -> Security -> Alerts -> Manage Rules -> Import + If you want to have a nice importable NDJSON File for the Security Rule importer + use pySigma Format 'siem_rule_ndjson' instead. + """ + + siem_rule = { + "name": f"SIGMA - {rule.title}", + "tags": [f"{n.namespace}-{n.name}" for n in rule.tags], + "consumer": "siem", + "enabled": True, + "throttle": None, + "schedule": { + "interval": f"{self.schedule_interval}{self.schedule_interval_unit}" + }, + "params": { + "author": [rule.author] if rule.author is not None else [], + "description": rule.description + if rule.description is not None + else "No description", + "ruleId": str(rule.id), + "falsePositives": rule.falsepositives, + "from": f"now-{self.schedule_interval}{self.schedule_interval_unit}", + "immutable": False, + "license": "DRL", + "outputIndex": "", + "meta": { + "from": "1m", + }, + "maxSignals": 100, + "riskScore": self.severity_risk_mapping[rule.level.name] + if rule.level is not None + else 21, + "riskScoreMapping": [], + "severity": str(rule.level.name).lower() + if rule.level is not None + else "low", + "severityMapping": [], + "threat": [], + "to": "now", + "references": rule.references, + "version": 1, + "exceptionsList": [], + "relatedIntegrations": [], + "requiredFields": [], + "setup": "", + "type": "query", + "language": "lucene", + "index": self.index_names, + "query": f"any where {query}", + "filters": [], + }, + "rule_type_id": "siem.queryRule", + "notify_when": "onActiveAlert", + "actions": [], + } + return siem_rule + + def finalize_output_siem_rule(self, queries: List[Dict]) -> Dict: + return list(queries) + + def finalize_query_siem_rule_ndjson( + self, rule: SigmaRule, query: str, index: int, state: ConversionState + ) -> Dict: + """ + Generating SIEM/Detection Rules in NDJSON Format. Compatible with + + https://www.elastic.co/guide/en/security/8.6/rules-ui-management.html#import-export-rules-ui + """ + + siem_rule = { + "id": str(rule.id), + "name": f"SIGMA - {rule.title}", + "tags": [f"{n.namespace}-{n.name}" for n in rule.tags], + "enabled": True, + "throttle": "no_actions", + "interval": f"{self.schedule_interval}{self.schedule_interval_unit}", + "author": [rule.author] if rule.author is not None else [], + "description": rule.description + if rule.description is not None + else "No description", + "rule_id": str(rule.id), + "false_positives": rule.falsepositives, + "from": f"now-{self.schedule_interval}{self.schedule_interval_unit}", + "immutable": False, + "license": "DRL", + "output_index": "", + "meta": { + "from": "1m", + }, + "max_signals": 100, + "risk_score": self.severity_risk_mapping[rule.level.name] + if rule.level is not None + else 21, + "risk_score_mapping": [], + "severity": str(rule.level.name).lower() + if rule.level is not None + else "low", + "severity_mapping": [], + "threat": [], + "to": "now", + "references": rule.references, + "version": 1, + "exceptions_list": [], + "related_integrations": [], + "required_fields": [], + "setup": "", + "type": "query", + "language": "lucene", + "index": self.index_names, + "query": f"any where {query}", + "filters": [], + "actions": [], + } + return siem_rule + + def finalize_output_siem_rule_ndjson(self, queries: List[Dict]) -> Dict: + return list(queries) diff --git a/sigma/backends/elasticsearch/elasticsearch.py b/sigma/backends/elasticsearch/elasticsearch_lucene.py similarity index 100% rename from sigma/backends/elasticsearch/elasticsearch.py rename to sigma/backends/elasticsearch/elasticsearch_lucene.py diff --git a/tests/test_backend_elasticsearch_eql.py b/tests/test_backend_elasticsearch_eql.py new file mode 100644 index 0000000..d52bad1 --- /dev/null +++ b/tests/test_backend_elasticsearch_eql.py @@ -0,0 +1,572 @@ +import pytest +from sigma.backends.elasticsearch.elasticsearch_eql import EqlBackend +from sigma.collection import SigmaCollection + + +@pytest.fixture(name="eql_backend") +def fixture_eql_backend(): + return EqlBackend() + + +def test_eql_and_expression(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: valueA + fieldB: valueB + condition: sel + """ + ) + + assert eql_backend.convert(rule) == [ + 'any where fieldA:"valueA" and fieldB:"valueB"' + ] + + +def test_eql_and_expression_empty_string(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: valueA + fieldB: '' + condition: sel + """ + ) + + assert eql_backend.convert(rule) == ['any where fieldA:"valueA" and fieldB==""'] + + +def test_eql_or_expression(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel1: + fieldA: valueA + sel2: + fieldB: valueB + condition: 1 of sel* + """ + ) + assert eql_backend.convert(rule) == ['any where fieldA:"valueA" or fieldB:"valueB"'] + + +def test_eql_and_or_expression(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: + - valueA1 + - valueA2 + fieldB: + - valueB1 + - valueB2 + condition: sel + """ + ) + assert eql_backend.convert(rule) == [ + 'any where (fieldA like~ ("valueA1", "valueA2")) and (fieldB like~ ("valueB1", "valueB2"))' + ] + + +def test_eql_or_and_expression(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel1: + fieldA: valueA1 + fieldB: valueB1 + sel2: + fieldA: valueA2 + fieldB: valueB2 + condition: 1 of sel* + """ + ) + assert eql_backend.convert(rule) == [ + 'any where (fieldA:"valueA1" and fieldB:"valueB1") or (fieldA:"valueA2" and fieldB:"valueB2")' + ] + + +def test_eql_in_expression(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: + - valueA + - valueB + - valueC* + condition: sel + """ + ) + assert eql_backend.convert(rule) == [ + 'any where fieldA like~ ("valueA", "valueB", "valueC*")' + ] + + +def test_eql_in_expression_empty_string(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: + - valueA + - '' + condition: sel + """ + ) + assert eql_backend.convert(rule) == ['any where fieldA like~ ("valueA", "")'] + + +def test_eql_regex_query(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|re: foo.*bar + fieldB: foo + condition: sel + """ + ) + assert eql_backend.convert(rule) == [ + 'any where fieldA regex~ "foo.*bar" and fieldB:"foo"' + ] + + +def test_eql_regex_query_escaped_input(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|re: 127\.0\.0\.1:[1-9]\d{3} + fieldB: foo + fieldC|re: foo/bar + condition: sel + """ + ) + assert eql_backend.convert(rule) == [ + 'any where fieldA regex~ "127\.0\.0\.1:[1-9]\d{3}" and fieldB:"foo" and fieldC regex~ "foo\\/bar"' + ] + + +def test_eql_cidr_query(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + field|cidr: 192.168.0.0/16 + condition: sel + """ + ) + assert eql_backend.convert(rule) == ['any where cidrMatch(field, "192.168.0.0/16")'] + + +def test_eql_field_name_with_whitespace(eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + field name: value + condition: sel + """ + ) + assert eql_backend.convert(rule) == ['any where `field name`:"value"'] + + +def test_eql_not_filter_null_and(eql_backend: EqlBackend): + """Test for DSL output with embedded query string query.""" + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + selection: + FieldA|endswith: 'valueA' + filter_1: + FieldB: null + filter_2: + FieldB: '' + condition: selection and not filter_1 and not filter_2 + """ + ) + + assert eql_backend.convert(rule) == [ + 'any where FieldA:"*valueA" and ?FieldB != null and (not FieldB=="")' + ] + + +def test_eql_filter_null_and(eql_backend: EqlBackend): + """Test for DSL output with embedded query string query.""" + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + selection: + FieldA|endswith: 'valueA' + filter_1: + FieldB: null + filter_2: + FieldB: '' + condition: selection and filter_1 and not filter_2 + """ + ) + + assert eql_backend.convert(rule) == [ + 'any where FieldA:"*valueA" and (?FieldB == null) and (not FieldB=="")' + ] + + +def test_eql_not_filter_null_or(eql_backend: EqlBackend): + """Test for DSL output with embedded query string query.""" + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + selection: + FieldA|endswith: 'valueA' + filter_1: + FieldB: null + filter_2: + FieldB: '' + condition: selection and (not filter_1 or not filter_2) + """ + ) + + assert eql_backend.convert(rule) == [ + 'any where FieldA:"*valueA" and (?FieldB != null or (not FieldB==""))' + ] + + +def test_eql_filter_null_or(eql_backend: EqlBackend): + """Test for DSL output with embedded query string query.""" + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + selection: + FieldA|endswith: 'valueA' + filter_1: + FieldB: null + filter_2: + FieldB: '' + condition: selection and (filter_1 or not filter_2) + """ + ) + + assert eql_backend.convert(rule) == [ + 'any where FieldA:"*valueA" and ((?FieldB == null) or (not FieldB==""))' + ] + + +def test_eql_filter_not_or_null(eql_backend: EqlBackend): + """Test for DSL output with embedded query string query.""" + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + selection: + FieldA|endswith: 'valueA' + filter_1: + FieldB: null + filter_2: + FieldB: '' + condition: selection and not 1 of filter_* + """ + ) + + assert eql_backend.convert(rule) == [ + 'any where FieldA:"*valueA" and (not ((?FieldB == null) or FieldB==""))' + ] + + +def test_eql_filter_not(eql_backend: EqlBackend): + """Test for DSL output with embedded query string query.""" + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + filter: + Field: null + condition: not filter + """ + ) + + assert eql_backend.convert(rule) == ["any where ?Field != null"] + + +def test_eql_angle_brackets(eql_backend: EqlBackend): + """Test for DSL output with < or > in the values""" + rule = SigmaCollection.from_yaml( + r""" + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + selection_cmd: + - OriginalFileName: 'Cmd.exe' + - Image|endswith: '\cmd.exe' + selection_cli: + - CommandLine|contains: '<' + - CommandLine|contains: '>' + condition: all of selection_* + """ + ) + + assert eql_backend.convert(rule) == [ + r'any where (OriginalFileName:"Cmd.exe" or Image:"*\\cmd.exe") and (CommandLine like~ ("*<*", "*>*"))' + ] + +def test_elasticsearch_eqlapi(eql_backend: EqlBackend): + """Test for NDJSON output with embedded query string query.""" + rule = SigmaCollection.from_yaml( + """ + title: Test + id: c277adc0-f0c4-42e1-af9d-fab062992156 + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: valueA + fieldB: valueB + condition: sel + """ + ) + result = eql_backend.convert(rule, output_format="eqlapi") + assert result[0] == { + "query": "any where fieldA:\"valueA\" and fieldB:\"valueB\"" + } + +def test_elasticsearch_siemrule_eql(eql_backend: EqlBackend): + """Test for NDJSON output with embedded query string query.""" + rule = SigmaCollection.from_yaml( + """ + title: Test + id: c277adc0-f0c4-42e1-af9d-fab062992156 + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: valueA + fieldB: valueB + condition: sel + """ + ) + result = eql_backend.convert(rule, output_format="siem_rule") + assert result[0] == { + "name": "SIGMA - Test", + "tags": [], + "consumer": "siem", + "enabled": True, + "throttle": None, + "schedule": {"interval": "5m"}, + "params": { + "author": [], + "description": "No description", + "ruleId": "c277adc0-f0c4-42e1-af9d-fab062992156", + "falsePositives": [], + "from": "now-5m", + "immutable": False, + "license": "DRL", + "outputIndex": "", + "meta": { + "from": "1m", + }, + "maxSignals": 100, + "riskScore": 21, + "riskScoreMapping": [], + "severity": "low", + "severityMapping": [], + "threat": [], + "to": "now", + "references": [], + "version": 1, + "exceptionsList": [], + "relatedIntegrations": [], + "requiredFields": [], + "setup": "", + "type": "query", + "language": "lucene", + "index": [ + "apm-*-transaction*", + "auditbeat-*", + "endgame-*", + "filebeat-*", + "logs-*", + "packetbeat-*", + "traces-apm*", + "winlogbeat-*", + "-*elastic-cloud-logs-*", + ], + "query": 'any where fieldA:"valueA" and fieldB:"valueB"', + "filters": [], + }, + "rule_type_id": "siem.queryRule", + "notify_when": "onActiveAlert", + "actions": [], + } + + +def test_elasticsearch_siemrule_eql_ndjson(eql_backend: EqlBackend): + """Test for NDJSON output with embedded query string query.""" + rule = SigmaCollection.from_yaml( + """ + title: Test + id: c277adc0-f0c4-42e1-af9d-fab062992156 + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: valueA + fieldB: valueB + condition: sel + """ + ) + result = eql_backend.convert(rule, output_format="siem_rule_ndjson") + assert result[0] == { + "id": "c277adc0-f0c4-42e1-af9d-fab062992156", + "name": "SIGMA - Test", + "tags": [], + "interval": "5m", + "enabled": True, + "description": "No description", + "risk_score": 21, + "severity": "low", + "license": "DRL", + "output_index": "", + "meta": { + "from": "1m", + }, + "author": [], + "false_positives": [], + "from": "now-5m", + "rule_id": "c277adc0-f0c4-42e1-af9d-fab062992156", + "max_signals": 100, + "risk_score_mapping": [], + "severity_mapping": [], + "threat": [], + "to": "now", + "references": [], + "version": 1, + "exceptions_list": [], + "immutable": False, + "related_integrations": [], + "required_fields": [], + "setup": "", + "type": "query", + "language": "lucene", + "index": [ + "apm-*-transaction*", + "auditbeat-*", + "endgame-*", + "filebeat-*", + "logs-*", + "packetbeat-*", + "traces-apm*", + "winlogbeat-*", + "-*elastic-cloud-logs-*", + ], + "query": 'any where fieldA:"valueA" and fieldB:"valueB"', + "filters": [], + "throttle": "no_actions", + "actions": [], + } + + +def test_elasticsearch_siem_rule_output(eql_backend: EqlBackend): + """Test for output format siem_rule.""" + # TODO: implement a test for the output format + pass + + +def test_elasticsearch_siem_rule_ndjson_output(eql_backend: EqlBackend): + """Test for output format siem_rule.""" + # TODO: implement a test for the output format + pass diff --git a/tests/test_backend_elasticsearch_eql_connect.py b/tests/test_backend_elasticsearch_eql_connect.py new file mode 100644 index 0000000..4a33893 --- /dev/null +++ b/tests/test_backend_elasticsearch_eql_connect.py @@ -0,0 +1,562 @@ +import time +import pytest +import requests +import urllib3 +from requests.auth import HTTPBasicAuth +from sigma.backends.elasticsearch.elasticsearch_eql import EqlBackend +from sigma.collection import SigmaCollection + +urllib3.disable_warnings() + +pytest.es_url = "" +pytest.es_creds = HTTPBasicAuth("sigmahq", "sigmahq") + + +def es_available_test(): + state = False + # Try {es_url} without auth + try: + if not state: + response = requests.get("http://localhost:9200", timeout=120) + if response.status_code == 200: + pytest.es_url = "http://localhost:9200" + pytest.es_creds = False + state = True + except requests.exceptions.ConnectionError: + state = False + + # Try https://localhost:9200 without auth + try: + if not state: + response = requests.get("https://localhost:9200", timeout=120, verify=False) + if response.status_code == 200: + pytest.es_url = "https://localhost:9200" + pytest.es_creds = False + state = True + except requests.exceptions.ConnectionError: + state = False + + # Try https://localhost:9200 with auth + try: + if not state: + response = requests.get( + "https://localhost:9200", + timeout=120, + verify=False, + auth=("sigmahq", "sigmahq"), + ) + if response.status_code == 200: + pytest.es_url = "https://localhost:9200" + pytest.es_creds = HTTPBasicAuth("sigmahq", "sigmahq") + state = True + except requests.exceptions.ConnectionError: + state = False + + return state + + +@pytest.fixture(scope="class", name="prepare_es_data") +@pytest.mark.skipif( + es_available_test is False, reason="ES not available... Skipping tests..." +) +def fixture_prepare_es_data(): + if es_available_test(): + requests.delete( + f"{pytest.es_url}/test-index", + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.put( + f"{pytest.es_url}/test-index", + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.put( + f"{pytest.es_url}/test-index/_mapping", + timeout=120, + verify=False, + auth=pytest.es_creds, + json={ + "properties": { + "ipfield": {"type": "ip"}, + "textFieldA": {"type": "text"}, + "keywordFieldA": {"type": "keyword"}, + }, + "dynamic_templates": [ + {"default": {"match": "*", "mapping": {"type": "keyword"}}} + ], + }, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "fieldA": "valueA", "fieldB": "valueB"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "fieldA": "otherisempty", "fieldB": ""}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "fieldK": "dot.value"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "fieldA": "valueA1", "fieldB": "valueB1"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "fieldA": "valueA2", "fieldB": "valueB2"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={ + "@timestamp": "1696587400", + "fieldA": "foosamplebar", + "fieldB": "foo", + }, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "ipfield": "192.168.1.1"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "ipfield": "10.5.5.5"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "field name": "value"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "textFieldA": "value with spaces"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "textFieldA": "value2 with spaces"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "keywordFieldA": "value with spaces"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={"@timestamp": "1696587400", "keywordFieldA": "value2 with spaces"}, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={ + "@timestamp": "1696587400", + "OriginalFileName": "Cmd.exe", + "Image": "c:\\windows\\system32\\cmd.exe", + "CommandLine": "something < someother", + }, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={ + "@timestamp": "1696587400", + "OriginalFileName": "Cmd.exe", + "Image": "c:\\windows\\system32\\cmd.exe", + "CommandLine": "something > someother", + }, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + requests.post( + f"{pytest.es_url}/test-index/_doc/", + json={ + "@timestamp": "1696587400", + "OriginalFileName": "Cmd.exe", + "Image": "c:\\windows\\system32\\cmd.exe", + "CommandLine": "without angle bracket", + }, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + # Wait a bit for Documents to be indexed + time.sleep(1) + + +@pytest.fixture(name="eql_backend") +def fixture_eql_backend(): + return EqlBackend() + + +@pytest.mark.skipif(es_available_test() is False, reason="ES not available") +class TestConnectElasticsearch: + """ + Test Class for Elasticsearch Backend + """ + + def query_backend_hits(self, query, num_wanted=0): + result = requests.post( + f"{pytest.es_url}/test-index/_eql/search", + json=query, + timeout=120, + verify=False, + auth=pytest.es_creds, + ) + assert result.status_code == 200 + rjson = result.json() + assert "hits" in rjson + assert "total" in rjson["hits"] + assert rjson["hits"]["total"]["value"] == num_wanted + return rjson + + def test_connect_eql_and_expression(self, prepare_es_data, eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: valueA + fieldB: valueB + condition: sel + """ + ) + + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_and_expression_empty_string( + self, prepare_es_data, eql_backend: EqlBackend + ): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: otherisempty + fieldB: '' + condition: sel + """ + ) + + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_or_expression(self, prepare_es_data, eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel1: + fieldA: valueA + sel2: + fieldB: valueB + condition: 1 of sel* + """ + ) + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_and_or_expression( + self, prepare_es_data, eql_backend: EqlBackend + ): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: + - valueA1 + - valueA2 + fieldB: + - valueB1 + - valueB2 + condition: sel + """ + ) + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=2) + + def test_connect_eql_or_and_expression( + self, prepare_es_data, eql_backend: EqlBackend + ): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel1: + fieldA: valueA1 + fieldB: valueB1 + sel2: + fieldA: valueA2 + fieldB: valueB2 + condition: 1 of sel* + """ + ) + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=2) + + def test_connect_eql_in_expression(self, prepare_es_data, eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: + - valueA + - valueB + - valueC* + condition: sel + """ + ) + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_in_expression_empty_string( + self, prepare_es_data, eql_backend: EqlBackend + ): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: + - otherisempty + - '' + condition: sel + """ + ) + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_regex_query(self, prepare_es_data, eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|re: foo.*bar + fieldB: foo + condition: sel + """ + ) + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_cidr_query(self, prepare_es_data, eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + ipfield|cidr: 192.168.0.0/16 + condition: sel + """ + ) + + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_ip_query(self, prepare_es_data, eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + ipfield: 192.168.1.1 + condition: sel + """ + ) + + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_field_name_with_whitespace( + self, prepare_es_data, eql_backend: EqlBackend + ): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + field name: value + condition: sel + """ + ) + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_dot_value(self, prepare_es_data, eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldK: dot.value + condition: sel + """ + ) + + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_space_value_text( + self, prepare_es_data, eql_backend: EqlBackend + ): + """Test for output format siem_rule.""" + # WONTFIX: EQL won't work on text fields! + # See also: https://www.elastic.co/guide/en/elasticsearch/reference/current/eql-syntax.html#eql-text-fields + pass + + def test_connect_eql_space_value_keyword( + self, prepare_es_data, eql_backend: EqlBackend + ): + rule = SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + keywordFieldA: 'value with spaces' + condition: sel + """ + ) + + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) + + def test_connect_eql_angle_brackets(self, prepare_es_data, eql_backend: EqlBackend): + rule = SigmaCollection.from_yaml( + r""" + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + selection_cmd: + - OriginalFileName: 'Cmd.exe' + - Image|endswith: '\cmd.exe' + selection_cli: + - CommandLine|contains: '<' + - CommandLine|contains: '>' + condition: all of selection_* + """ + ) + + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=2) + + def test_connect_eql_angle_brackets_single( + self, prepare_es_data, eql_backend: EqlBackend + ): + rule = SigmaCollection.from_yaml( + r""" + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + selection_cmd: + - OriginalFileName: 'Cmd.exe' + - Image|endswith: '\cmd.exe' + selection_cli: + CommandLine|contains: '<' + condition: all of selection_* + """ + ) + + result_dsl = eql_backend.convert(rule, output_format="eqlapi")[0] + self.query_backend_hits(result_dsl, num_wanted=1) diff --git a/tests/test_backend_elasticsearch.py b/tests/test_backend_elasticsearch_lucene.py similarity index 99% rename from tests/test_backend_elasticsearch.py rename to tests/test_backend_elasticsearch_lucene.py index 83f59b5..8dab880 100644 --- a/tests/test_backend_elasticsearch.py +++ b/tests/test_backend_elasticsearch_lucene.py @@ -1,5 +1,5 @@ import pytest -from sigma.backends.elasticsearch import LuceneBackend +from sigma.backends.elasticsearch.elasticsearch_lucene import LuceneBackend from sigma.collection import SigmaCollection diff --git a/tests/test_backend_elasticsearch_connect.py b/tests/test_backend_elasticsearch_lucene_connect.py similarity index 77% rename from tests/test_backend_elasticsearch_connect.py rename to tests/test_backend_elasticsearch_lucene_connect.py index 7d0455c..7e9598b 100644 --- a/tests/test_backend_elasticsearch_connect.py +++ b/tests/test_backend_elasticsearch_lucene_connect.py @@ -1,25 +1,61 @@ import time import pytest import requests -from sigma.backends.elasticsearch import LuceneBackend +import urllib3 +from requests.auth import HTTPBasicAuth +from sigma.backends.elasticsearch.elasticsearch_lucene import LuceneBackend from sigma.collection import SigmaCollection +urllib3.disable_warnings() + +pytest.es_url='' +pytest.es_creds = HTTPBasicAuth('sigmahq', 'sigmahq') def es_available_test(): + state = False + # Try {es_url} without auth + try: + if not state: + response = requests.get('http://localhost:9200', timeout=120) + if response.status_code == 200: + pytest.es_url='http://localhost:9200' + pytest.es_creds=False + state = True + except requests.exceptions.ConnectionError: + state = False + + # Try https://localhost:9200 without auth try: - requests.get('http://localhost:9200/', timeout=120) + if not state: + response = requests.get('https://localhost:9200', timeout=120, verify=False) + if response.status_code == 200: + pytest.es_url='https://localhost:9200' + pytest.es_creds=False + state = True except requests.exceptions.ConnectionError: - return False - return True + state = False + + # Try https://localhost:9200 with auth + try: + if not state: + response=requests.get('https://localhost:9200', timeout=120, verify=False, auth=('sigmahq', 'sigmahq')) + if response.status_code == 200: + pytest.es_url='https://localhost:9200' + pytest.es_creds=HTTPBasicAuth('sigmahq', 'sigmahq') + state = True + except requests.exceptions.ConnectionError: + state = False + + return state @pytest.fixture(scope="class", name="prepare_es_data") @pytest.mark.skipif(es_available_test is False, reason="ES not available... Skipping tests...") def fixture_prepare_es_data(): if es_available_test(): - requests.delete('http://localhost:9200/test-index', timeout=120) - requests.put("http://localhost:9200/test-index", timeout=120) - requests.put("http://localhost:9200/test-index/_mapping", timeout=120, json={ + requests.delete(f'{pytest.es_url}/test-index', timeout=120, verify=False, auth=pytest.es_creds) + requests.put(f"{pytest.es_url}/test-index", timeout=120, verify=False, auth=pytest.es_creds) + requests.put(f"{pytest.es_url}/test-index/_mapping", timeout=120, verify=False, auth=pytest.es_creds, json={ "properties": { "ipfield": { "type": "ip" @@ -43,38 +79,38 @@ def fixture_prepare_es_data(): ] } ) - requests.post("http://localhost:9200/test-index/_doc/", - json={"fieldA": "valueA", "fieldB": "valueB"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"fieldA": "otherisempty", "fieldB": ""}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"fieldK": "dot.value"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"fieldA": "valueA1", "fieldB": "valueB1"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"fieldA": "valueA2", "fieldB": "valueB2"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"fieldA": "foosamplebar", "fieldB": "foo"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"ipfield": "192.168.1.1"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"ipfield": "10.5.5.5"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"field name": "value"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"textFieldA": "value with spaces"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"textFieldA": "value2 with spaces"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"keywordFieldA": "value with spaces"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"keywordFieldA": "value2 with spaces"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"OriginalFileName": "Cmd.exe", "CommandLine": "something < someother"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"OriginalFileName": "Cmd.exe", "CommandLine": "something > someother"}, timeout=120) - requests.post("http://localhost:9200/test-index/_doc/", - json={"OriginalFileName": "Cmd.exe", "CommandLine": "without angle bracket"}, timeout=120) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"fieldA": "valueA", "fieldB": "valueB"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"fieldA": "otherisempty", "fieldB": ""}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"fieldK": "dot.value"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"fieldA": "valueA1", "fieldB": "valueB1"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"fieldA": "valueA2", "fieldB": "valueB2"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"fieldA": "foosamplebar", "fieldB": "foo"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"ipfield": "192.168.1.1"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"ipfield": "10.5.5.5"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"field name": "value"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"textFieldA": "value with spaces"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"textFieldA": "value2 with spaces"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"keywordFieldA": "value with spaces"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"keywordFieldA": "value2 with spaces"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"OriginalFileName": "Cmd.exe", "CommandLine": "something < someother"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"OriginalFileName": "Cmd.exe", "CommandLine": "something > someother"}, timeout=120, verify=False, auth=pytest.es_creds) + requests.post(f"{pytest.es_url}/test-index/_doc/", + json={"OriginalFileName": "Cmd.exe", "CommandLine": "without angle bracket"}, timeout=120, verify=False, auth=pytest.es_creds) # Wait a bit for Documents to be indexed time.sleep(1) @@ -92,7 +128,7 @@ class TestConnectElasticsearch: def query_backend_hits(self, query, num_wanted=0): result = requests.post( - 'http://localhost:9200/test-index/_search', json=query, timeout=120) + f'{pytest.es_url}/test-index/_search', json=query, timeout=120, verify=False, auth=pytest.es_creds) assert result.status_code == 200 rjson = result.json() assert 'hits' in rjson diff --git a/tests/test_pipelines_windows.py b/tests/test_pipelines_windows.py index b4fba41..c422827 100644 --- a/tests/test_pipelines_windows.py +++ b/tests/test_pipelines_windows.py @@ -1,4 +1,4 @@ -from sigma.backends.elasticsearch import LuceneBackend +from sigma.backends.elasticsearch.elasticsearch_lucene import LuceneBackend from sigma.pipelines.elasticsearch.windows import ecs_windows, ecs_windows_old from sigma.collection import SigmaCollection from sigma.rule import SigmaRule diff --git a/tests/test_pipelines_zeek.py b/tests/test_pipelines_zeek.py index d2a5bd5..4cf6475 100644 --- a/tests/test_pipelines_zeek.py +++ b/tests/test_pipelines_zeek.py @@ -1,5 +1,5 @@ import pytest -from sigma.backends.elasticsearch import LuceneBackend +from sigma.backends.elasticsearch.elasticsearch_lucene import LuceneBackend from sigma.pipelines.elasticsearch.zeek import ecs_zeek_beats, ecs_zeek_corelight, zeek_raw from sigma.collection import SigmaCollection