Skip to content

Commit

Permalink
Merge pull request #31 from frack113/add_fail_condition
Browse files Browse the repository at this point in the history
Add condition errors to check failure
  • Loading branch information
thomaspatzke committed Nov 11, 2023
2 parents 34ec3cc + 557bd3b commit 1e61c47
Show file tree
Hide file tree
Showing 16 changed files with 855 additions and 307 deletions.
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
repos:
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
2 changes: 1 addition & 1 deletion print-coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
elif coverage >= 85.0:
print("COVERAGE_COLOR=orange")
else:
print("COVERAGE_COLOR=red")
print("COVERAGE_COLOR=red")
33 changes: 20 additions & 13 deletions sigma/analyze/attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
from sigma.collection import SigmaCollection
from collections import defaultdict

def score_count(rules : Iterable[SigmaRule]) -> int:

def score_count(rules: Iterable[SigmaRule]) -> int:
"""Return count of rules."""
return len(list(rules))

def score_max(rules : Iterable[SigmaRule]) -> int:

def score_max(rules: Iterable[SigmaRule]) -> int:
"""Return maximum rule level value."""
return max(map(lambda rule: rule.level.value if rule.level is not None else 0, rules))
return max(
map(lambda rule: rule.level.value if rule.level is not None else 0, rules)
)


rule_level_scores = {
None: 1,
Expand All @@ -19,24 +24,29 @@ def score_max(rules : Iterable[SigmaRule]) -> int:
SigmaLevel.HIGH: 8,
SigmaLevel.CRITICAL: 12,
}
def rule_score(rule : SigmaRule) -> int:


def rule_score(rule: SigmaRule) -> int:
"""Calculate rule score according to rule_level_scores."""
return rule_level_scores[rule.level]

def score_level(rules : Iterable[SigmaRule]) -> int:

def score_level(rules: Iterable[SigmaRule]) -> int:
return sum(map(rule_score, rules))


score_functions = {
"count": (score_count, "Count of rules"),
"max": (score_max, "Maximum severity value"),
"level": (score_level, "Summarized level score"),
}


def calculate_attack_scores(
rules : SigmaCollection,
score_function : Callable[[Iterable[SigmaRule]], int],
no_subtechniques : bool = False
) -> Dict[str, int]:
rules: SigmaCollection,
score_function: Callable[[Iterable[SigmaRule]], int],
no_subtechniques: bool = False,
) -> Dict[str, int]:
"""Generate MITRE™️ ATT&CK Navigator heatmap according to scoring function."""
attack_rules = defaultdict(list)
for rule in rules:
Expand All @@ -46,7 +56,4 @@ def calculate_attack_scores(
if no_subtechniques:
technique = technique.split(".")[0]
attack_rules[technique].append(rule)
return {
attack: score_function(rules)
for attack, rules in attack_rules.items()
}
return {attack: score_function(rules) for attack, rules in attack_rules.items()}
75 changes: 48 additions & 27 deletions sigma/cli/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,60 @@

from sigma.cli.rules import load_rules
from sigma.analyze.attack import score_functions, calculate_attack_scores
from sigma.data.mitre_attack import mitre_attack_techniques_tactics_mapping, mitre_attack_version
from sigma.data.mitre_attack import (
mitre_attack_techniques_tactics_mapping,
mitre_attack_version,
)


@click.group(name="analyze", help="Analyze Sigma rule sets")
def analyze_group():
pass


@analyze_group.command(
name="attack",
help="Create MITRE™️ ATT&CK heatmaps from Sigma rule set. Score functions are: " + ", ".join((
f"{definition[1]} ({func})"
for func, definition in score_functions.items()
))
)
name="attack",
help="Create MITRE™️ ATT&CK heatmaps from Sigma rule set. Score functions are: "
+ ", ".join(
(f"{definition[1]} ({func})" for func, definition in score_functions.items())
),
)
@click.option(
"--file-pattern", "-P",
"--file-pattern",
"-P",
default="*.yml",
show_default=True,
help="Pattern for file names to be included in recursion into directories.",
)
@click.option(
"--subtechniques/--no-subtechniques", "-s/-S",
"--subtechniques/--no-subtechniques",
"-s/-S",
default=True,
)
@click.option(
"--max-color", "-c",
"--max-color",
"-c",
default="#ff0000",
show_default=True,
help="Color used for maximum score."
help="Color used for maximum score.",
)
@click.option(
"--min-color", "-C",
"--min-color",
"-C",
default="#ffffff00",
show_default=True,
help="Color used for zero score."
help="Color used for zero score.",
)
@click.option(
"--max-score", "-m",
"--max-score",
"-m",
type=int,
default=None,
help="Set fixed maximum score. All scores above are rendered as maximum. Increases color scale resolution for scores below.",
)
@click.option(
"--min-score", "-M",
"--min-score",
"-M",
type=int,
default="0",
show_default=True,
Expand All @@ -66,21 +77,31 @@ def analyze_group():
required=True,
type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path),
)
def analyze_attack(file_pattern, subtechniques, max_color, min_color, max_score, min_score, function, output, input):
def analyze_attack(
file_pattern,
subtechniques,
max_color,
min_color,
max_score,
min_score,
function,
output,
input,
):
rules = load_rules(input, file_pattern)
score_function = score_functions[function][0]
scores = calculate_attack_scores(rules, score_function, not subtechniques)
layer_techniques = [
{
"techniqueID": technique,
"tactic": tactic,
"score": score,
"color": "",
"comment": "",
"enabled": True,
"metadata": [],
"links": [],
"showSubtechniques": False,
"tactic": tactic,
"score": score,
"color": "",
"comment": "",
"enabled": True,
"metadata": [],
"links": [],
"showSubtechniques": False,
}
for technique, score in scores.items()
for tactic in mitre_attack_techniques_tactics_mapping.get(technique, [])
Expand All @@ -90,7 +111,7 @@ def analyze_attack(file_pattern, subtechniques, max_color, min_color, max_score,
"versions": {
"attack": mitre_attack_version,
"navigator": "4.8.1",
"layer": "4.4"
"layer": "4.4",
},
"domain": "enterprise-attack",
"description": f"Sigma coverage heatmap generated by Sigma CLI with score function {function}",
Expand All @@ -100,8 +121,8 @@ def analyze_attack(file_pattern, subtechniques, max_color, min_color, max_score,
max_color,
],
"minValue": min_score,
"maxValue": max_score or max(scores.values())
"maxValue": max_score or max(scores.values()),
},
"techniques": layer_techniques,
}
json.dump(layer, output, indent=2)
json.dump(layer, output, indent=2)
Loading

0 comments on commit 1e61c47

Please sign in to comment.