Skip to content

Commit

Permalink
Allow attribute tagging to be disabled with -v
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Dec 30, 2022
1 parent 954f582 commit b1a34de
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 40 deletions.
72 changes: 38 additions & 34 deletions cs_misp_import/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ def create_internal_reference() -> MISPObject:
return inter

@staticmethod
def int_ref_handler(evt, kc_name, kc_detail, ref_list, slg, act_name, int_ref, no_slug: bool = False):
def int_ref_handler(evt, kc_name, kc_detail, ref_list, slg, act_name, int_ref, verbose: bool = False):
misp_object = MISPObject("internal-reference")
misp_object.add_attribute("type", "Adversary detail", disable_correlation=True)
misp_object.add_attribute("identifier", kc_name.title(), disable_correlation=True)
if not isinstance(kc_detail, list):
kc_detail.replace("\t", "").replace(" ", "")
sum_id = misp_object.add_attribute("comment", kc_detail, disable_correlation=True)
ref_list.append(evt.add_object(misp_object))
if not no_slug:
if verbose:
evt.add_attribute_tag(f"CrowdStrike:adversary:{kc_name.lower().replace(' ', '-')}: {act_name}", sum_id.uuid)
evt.add_attribute_tag(f"CrowdStrike:adversary:{slg}: {kc_name.upper()}", sum_id.uuid)
int_ref.add_reference(misp_object.uuid, "Adversary detail")
Expand All @@ -185,6 +185,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
slug = details.get("slug", actor_name.lower().replace(" ", "-"))
actor_branch = actor_name.split(" ")[1].upper()
actor_region = ""
verbosity = self.import_settings["verbose_tags"]
if actor_name:
for act_reg in [adv for adv in dir(Adversary) if "__" not in adv]:
if act_reg in actor_branch:
Expand Down Expand Up @@ -213,7 +214,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
if not internal:
internal = self.create_internal_reference()

self.int_ref_handler(event, "Actor Type", act_type.title(), to_reference, slug, actor_name, internal, False)
self.int_ref_handler(event, "Actor Type", act_type.title(), to_reference, slug, actor_name, internal, verbosity)
event.add_tag(f"CrowdStrike:adversary:type: {act_type.upper()}")

# Adversary motives
Expand All @@ -224,7 +225,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
if not internal:
internal = self.create_internal_reference()

self.int_ref_handler(event, "Motivation", motive_list_string, to_reference, slug, actor_name, internal, False)
self.int_ref_handler(event, "Motivation", motive_list_string, to_reference, slug, actor_name, internal, verbosity)
for mname in mlist:
event.add_tag(f"CrowdStrike:adversary:motivation: {mname.upper()}")

Expand All @@ -236,7 +237,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
if not internal:
internal = self.create_internal_reference()

self.int_ref_handler(event, "Capability", cap_val, to_reference, slug, actor_name, internal, False)
self.int_ref_handler(event, "Capability", cap_val, to_reference, slug, actor_name, internal, verbosity)
event.add_tag(f"CrowdStrike:adversary:capability: {cap_val.upper()}")
# Set adversary event threat level based upon adversary capability
if "BELOW" in cap_val.upper() or "LOW" in cap_val.upper():
Expand All @@ -262,13 +263,13 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():

# Kill chain - Objectives
if objectives:
self.int_ref_handler(event, "objectives", objectives, to_reference, slug, actor_name, internal)
self.int_ref_handler(event, "objectives", objectives, to_reference, slug, actor_name, internal, verbosity)
# Kill chain - Command and Control
if candc:
self.int_ref_handler(event, "command and control", candc, to_reference, slug, actor_name, internal)
self.int_ref_handler(event, "command and control", candc, to_reference, slug, actor_name, internal, verbosity)
# Kill chain - Delivery
if delivery:
self.int_ref_handler(event, "delivery", delivery, to_reference, slug, actor_name, internal)
self.int_ref_handler(event, "delivery", delivery, to_reference, slug, actor_name, internal, verbosity)
# Kill chain - Exploitation
if exploitation:
exploitation_object = MISPObject("internal-reference")
Expand All @@ -278,23 +279,24 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
exploits = exploitation.replace("\t", "").replace(" ", "").split("\r\n")
ex_id = exploitation_object.add_attribute("comment", exploitation.replace("\t", "").replace(" ", ""), disable_correlation=True)
to_reference.append(event.add_object(exploitation_object))
event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: EXPLOITATION", ex_id.uuid)
event.add_attribute_tag(f"CrowdStrike:adversary:exploitation: {actor_name}", ex_id.uuid)
for exptt in [exp for exp in exploits if exp]:
if exptt not in ["Unknown", "N/A"]:
for exploit in [a.strip() for a in exptt.split(",")]:
if len(exploit.split(" ")) <= 4:
event.add_attribute_tag(f"CrowdStrike:adversary:exploitation: {exploit.upper()}", ex_id.uuid)
if verbosity:
event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: EXPLOITATION", ex_id.uuid)
event.add_attribute_tag(f"CrowdStrike:adversary:exploitation: {actor_name}", ex_id.uuid)
for exptt in [exp for exp in exploits if exp]:
if exptt not in ["Unknown", "N/A"]:
for exploit in [a.strip() for a in exptt.split(",")]:
if len(exploit.split(" ")) <= 4:
event.add_attribute_tag(f"CrowdStrike:adversary:exploitation: {exploit.upper()}", ex_id.uuid)
internal.add_reference(exploitation_object.uuid, "Adversary detail")
# Kill chain - Installation
if installation:
self.int_ref_handler(event, "installation", installation, to_reference, slug, actor_name, internal)
self.int_ref_handler(event, "installation", installation, to_reference, slug, actor_name, internal, verbosity)
# Kill chain - Reconnaissance
if reconnaissance:
self.int_ref_handler(event, "reconnaissance", reconnaissance, to_reference, slug, actor_name, internal)
self.int_ref_handler(event, "reconnaissance", reconnaissance, to_reference, slug, actor_name, internal, verbosity)
# Kill chain - Weaponization
if weaponization:
self.int_ref_handler(event, "weaponization", weaponization, to_reference, slug, actor_name, internal)
self.int_ref_handler(event, "weaponization", weaponization, to_reference, slug, actor_name, internal, verbosity)

for ref in to_reference:
internal.add_reference(ref.uuid, "Adversary detail")
Expand All @@ -305,7 +307,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
if internal:
event.add_object(internal)
# Add the description tags
if details.get('description'):
if details.get('description') and verbosity:
event.add_attribute_tag(f"CrowdStrike:adversary:description: {actor_name}", desc_id.uuid)
event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: DESCRIPTION", desc_id.uuid)

Expand Down Expand Up @@ -339,10 +341,10 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
event.add_attribute_tag(f"CrowdStrike:adversary:branch: {actor_branch}", ta.uuid)
if had_timestamp:
event.add_object(timestamp_object)
if tsf:
if tsf and verbosity:
event.add_attribute_tag(f"CrowdStrike:adversary:first-seen: {actor_name}", tsf.uuid)
event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: FIRST SEEN", tsf.uuid)
if tsl:
if tsl and verbosity:
event.add_attribute_tag(f"CrowdStrike:adversary:last-seen: {actor_name}", tsl.uuid)
event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: LAST SEEN", tsl.uuid)
if actor.get('known_as') or actor.get("origins"):
Expand All @@ -351,19 +353,20 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
aliased = [a.strip() for a in actor.get("known_as").split(",")]
for alias in aliased:
kao = known_as_object.add_attribute('alias', alias, disable_correlation=True)
kao.add_tag(f"CrowdStrike:adversary:branch: {actor_branch}")
kao.add_tag(f"CrowdStrike:adversary:{slug}:alias: {alias.upper()}")
# Tag the aliases to the threat-actor attribution
event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:alias: {alias.upper()}", ta.uuid)
if verbosity:
kao.add_tag(f"CrowdStrike:adversary:branch: {actor_branch}")
kao.add_tag(f"CrowdStrike:adversary:{slug}:alias: {alias.upper()}")
event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:alias: {alias.upper()}", ta.uuid)
event.add_object(known_as_object)
for orig in actor.get("origins", []):
locale = orig.get("value")
if locale:
kar = event.add_attribute("country-of-residence", locale, disable_correlation=True)
event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:origin: {locale.upper()}", kar.uuid)
event.add_attribute_tag(f"CrowdStrike:adversary:origin: {locale.upper()}", kar.uuid)
event.add_tag(f"CrowdStrike:adversary:origin: {locale.upper()}")

if verbosity:
event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:origin: {locale.upper()}", kar.uuid)
event.add_attribute_tag(f"CrowdStrike:adversary:origin: {locale.upper()}", kar.uuid)

victim = None
# Adversary victim location
Expand All @@ -373,9 +376,9 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
if not victim:
victim = MISPObject("victim")
vic = victim.add_attribute('regions', region, disable_correlation=True)
vic.add_tag(f"CrowdStrike:target:location: {region.upper()}")
vic.add_tag(f"CrowdStrike:adversary:{slug}:target:location: {region.upper()}")
#vic.add_tag(f"CrowdStrike:adversary:{slug}:target: LOCATION")
if verbosity:
vic.add_tag(f"CrowdStrike:target:location: {region.upper()}")
vic.add_tag(f"CrowdStrike:adversary:{slug}:target:location: {region.upper()}")

# Adversary victim industry
if actor.get("target_industries"):
Expand All @@ -384,10 +387,11 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent():
if not victim:
victim = MISPObject("victim")
vic = victim.add_attribute('sectors', sector, disable_correlation=True)
vic.add_tag(f"CrowdStrike:adversary:{slug}:target:sector: {sector.upper()}")
#vic.add_tag(f"CrowdStrike:adversary:{slug}:target: SECTOR")
vic.add_tag(f"CrowdStrike:target:sector: {sector.upper()}")
event.add_object(victim)
if verbosity:
vic.add_tag(f"CrowdStrike:adversary:{slug}:target:sector: {sector.upper()}")
vic.add_tag(f"CrowdStrike:target:sector: {sector.upper()}")
if victim:
event.add_object(victim)

# TYPE Taxonomic tag, all events
if confirm_boolean_param(self.settings["TAGGING"].get("taxonomic_TYPE", False)):
Expand Down
13 changes: 8 additions & 5 deletions cs_misp_import/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def add_actor_detail(self, report: dict, event: MISPEvent) -> MISPEvent:
att = event.add_attribute(**actor_att, disable_correlation=True)
for stem in actor_name:
for adversary in Adversary:
if adversary.name == stem.upper():
if adversary.name == stem.upper() and self.import_settings["verbose_tags"]:
# Can't cross-tag with this as we're using it for delete
event.add_attribute_tag(f"CrowdStrike:report:adversary:branch: {stem.upper()}", att.uuid)
event.add_tag(f"CrowdStrike:report:adversary: {actor.get('name')}")
Expand Down Expand Up @@ -392,7 +392,8 @@ def add_indicator_detail(self, event: MISPEvent, report_id: str, indicator_list:
ind_seen["last_seen"] = ind.get("published_date")

added = event.add_attribute(indicator_object.type, indicator_object.value, category=indicator_object.category, disable_correlation=True, **ind_seen)
event.add_attribute_tag(f"CrowdStrike:report:indicator:type: {indicator_object.type.upper()}", added.uuid)
if self.import_settings["verbose_tags"]:
event.add_attribute_tag(f"CrowdStrike:report:indicator:type: {indicator_object.type.upper()}", added.uuid)
# Event level only
#for tag in self.settings["CrowdStrike"]["indicators_tags"].split(","):
# event.add_attribute_tag(tag, added.uuid)
Expand All @@ -416,7 +417,8 @@ def add_victim_detail(self, report: dict, event: MISPEvent) -> MISPEvent:
if not victim:
victim = MISPObject("victim")
vic = victim.add_attribute('regions', country, disable_correlation=True)
vic.add_tag(f"CrowdStrike:target:location: {country.upper()}")
if self.import_settings["verbose_tags"]:
vic.add_tag(f"CrowdStrike:target:location: {country.upper()}")
# Also create a target-location attribute for this value (Too noisy?)
# reg = event.add_attribute('target-location', country)
# event.add_attribute_tag(f"CrowdStrike:target: {country.upper()}", reg.uuid)
Expand All @@ -429,7 +431,8 @@ def add_victim_detail(self, report: dict, event: MISPEvent) -> MISPEvent:
if not victim:
victim = MISPObject("victim")
vic = victim.add_attribute('sectors', sector, disable_correlation=True)
vic.add_tag(f"CrowdStrike:target:sector: {sector.upper()}")
if self.import_settings["verbose_tags"]:
vic.add_tag(f"CrowdStrike:target:sector: {sector.upper()}")
if victim:
event.add_object(victim)

Expand Down Expand Up @@ -472,7 +475,7 @@ def add_report_content(self, report: dict, event: MISPEvent, details: dict, repo
# Event level only
#for tag in self.settings["CrowdStrike"]["reports_tags"].split(","):
# event.add_attribute_tag(tag, att.uuid)
if att.value not in ["text", "Full Report", "Report", report_id]:
if att.value not in ["text", "Full Report", "Report", report_id] and self.import_settings["verbose_tags"]:
event.add_attribute_tag(f"CrowdStrike:report:{report_id.lower().replace('-',': ')}", att.uuid)
#if report_tag:
# event.add_attribute_tag(f"CrowdStrike:report: {report_tag.upper()}", att.uuid)
Expand Down
4 changes: 3 additions & 1 deletion misp_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def parse_command_line() -> Namespace:
parser.add_argument("-p", "--publish", dest="publish", help="Publish events upon creation.", action="store_true", required=False, default=False)
parser.add_argument("-t", "--type", "--report_type", "--indicator_type", "--adversary_type", dest="type", help="Import only this type.", required=False, default=False)
parser.add_argument("-c", "--config", dest="config_file", help="Path to local configuration file", required=False)
parser.add_argument("-v", "--verbose_tagging", dest="verbose", action="store_false", help="Disable verbose tagging.", required=False, default=True)
parser.add_argument("-nd", "--no_dupe_check",
dest="no_dupe_check",
help="Enable or disable duplicate checking on import, defaults to False.",
Expand Down Expand Up @@ -305,7 +306,8 @@ def main():
"no_banners": args.no_banner,
"no_dupe_check": args.no_dupe_check,
"type": args.type,
"publish": args.publish
"publish": args.publish,
"verbose_tags": args.verbose
}

if not import_settings["unknown_mapping"]:
Expand Down

0 comments on commit b1a34de

Please sign in to comment.