Skip to content

Commit

Permalink
Merge pull request #53 from SigmaHQ/analyze_logsource
Browse files Browse the repository at this point in the history
Feat: Analyze logsources
  • Loading branch information
thomaspatzke committed Jul 17, 2024
2 parents 22f0ec0 + e33ebea commit 90dabf8
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 79 deletions.
110 changes: 37 additions & 73 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@ pysigma = "^0.11.9"
colorama = "^0.4.6"

[tool.poetry.dev-dependencies]
pytest = "^6.2.2"
pytest-cov = "^2.11.1"
pytest = "^7.3.0"
pytest-cov = "^4.0.0"
defusedxml = "^0.7.1"

[tool.poetry.scripts]
sigma = "sigma.cli.main:main"

[tool.pytest.ini_options]
python_paths = ["."]
testpaths = ["tests"]
minversion = "6.0"
python_files = "test_*.py"
addopts = "--cov=sigma --cov-report term --cov-report xml:cov.xml"
testpaths = [
"tests",
]
filterwarnings = [
'ignore:Unverified HTTPS request'
]

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
55 changes: 55 additions & 0 deletions sigma/analyze/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import copy
from typing import Dict, List
from sigma.rule import SigmaRule, SigmaLevel
from sigma.collection import SigmaCollection


rule_level_mapping = {
None: "None",
SigmaLevel.INFORMATIONAL: "Informational",
SigmaLevel.LOW: "Low",
SigmaLevel.MEDIUM: "Medium",
SigmaLevel.HIGH: "High",
SigmaLevel.CRITICAL: "Critical",
}

template_stat_detail = {
"Overall": 0,
"Critical": 0,
"High": 0,
"Medium": 0,
"Low": 0,
"Informational": 0,
"None": 0,
}


def format_row(row: str, column_widths: List) -> str:
"""Format rows for table."""
return " | ".join(
f"{str(item).ljust(width)}" for item, width in zip(row, column_widths)
)


def get_rulelevel_mapping(rule: SigmaRule) -> int:
"""Calculate rule score according to rule_level_scores."""
return rule_level_mapping[rule.level]


def create_logsourcestats(rules: SigmaCollection) -> Dict[str, int]:
"""
Iterate through all the rules and count SigmaLevel grouped by
Logsource Category Name.
"""
stats = {}

for rule in rules:
if hasattr(rule, "logsource"):
# Create stats key for logsource category.
if not rule.logsource.category in stats:
stats[rule.logsource.category] = copy.deepcopy(template_stat_detail)

stats[rule.logsource.category]["Overall"] += 1
stats[rule.logsource.category][get_rulelevel_mapping(rule)] += 1

return stats
56 changes: 56 additions & 0 deletions sigma/cli/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
mitre_attack_techniques_tactics_mapping,
mitre_attack_version,
)
from sigma.analyze.stats import create_logsourcestats, format_row


@click.group(name="analyze", help="Analyze Sigma rule sets")
Expand Down Expand Up @@ -126,3 +127,58 @@ def analyze_attack(
"techniques": layer_techniques,
}
json.dump(layer, output, indent=2)

@analyze_group.command(name="logsource", help="Create stats about logsources.")
@click.option(
"--file-pattern",
"-P",
default="*.yml",
show_default=True,
help="Pattern for file names to be included in recursion into directories.",
)
@click.option(
"--sort-by",
"-k",
type=str,
default="Overall",
show_default=True,
help="Sort by column.",
)
@click.argument(
"output",
type=click.File("w"),
)
@click.argument(
"input",
nargs=-1,
required=True,
type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path),
)
def analyze_logsource(
file_pattern,
sort_by,
output,
input,
):
rules = load_rules(input, file_pattern)
stats = create_logsourcestats(rules)

# Extract column header
headers = ["Logsource"] + list(next(iter(stats.values())).keys())

# Prepare rows
rows = [[key] + list(value.values()) for key, value in stats.items()]
sort_index = headers.index(sort_by)
rows.sort(key=lambda x: x[sort_index], reverse=True)

# Determine col width
column_widths = [
max(len(str(item)) for item in column) for column in zip(*([headers] + rows))
]

# Print table
print("-+-".join("-" * width for width in column_widths), file=output)
print(format_row(headers, column_widths), file=output)
print("-+-".join("-" * width for width in column_widths), file=output)
for row in rows:
print(format_row(row, column_widths), file=output)
Loading

0 comments on commit 90dabf8

Please sign in to comment.