From b1a34de6df74f6f2504a5f6900fd39c4d577b442 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Fri, 30 Dec 2022 17:54:30 -0500 Subject: [PATCH] Allow attribute tagging to be disabled with -v --- cs_misp_import/actors.py | 72 +++++++++++++++++++++------------------ cs_misp_import/reports.py | 13 ++++--- misp_import.py | 4 ++- 3 files changed, 49 insertions(+), 40 deletions(-) diff --git a/cs_misp_import/actors.py b/cs_misp_import/actors.py index d4706e2..ae1c033 100644 --- a/cs_misp_import/actors.py +++ b/cs_misp_import/actors.py @@ -150,7 +150,7 @@ def create_internal_reference() -> MISPObject: return inter @staticmethod - def int_ref_handler(evt, kc_name, kc_detail, ref_list, slg, act_name, int_ref, no_slug: bool = False): + def int_ref_handler(evt, kc_name, kc_detail, ref_list, slg, act_name, int_ref, verbose: bool = False): misp_object = MISPObject("internal-reference") misp_object.add_attribute("type", "Adversary detail", disable_correlation=True) misp_object.add_attribute("identifier", kc_name.title(), disable_correlation=True) @@ -158,7 +158,7 @@ def int_ref_handler(evt, kc_name, kc_detail, ref_list, slg, act_name, int_ref, n kc_detail.replace("\t", "").replace(" ", "") sum_id = misp_object.add_attribute("comment", kc_detail, disable_correlation=True) ref_list.append(evt.add_object(misp_object)) - if not no_slug: + if verbose: evt.add_attribute_tag(f"CrowdStrike:adversary:{kc_name.lower().replace(' ', '-')}: {act_name}", sum_id.uuid) evt.add_attribute_tag(f"CrowdStrike:adversary:{slg}: {kc_name.upper()}", sum_id.uuid) int_ref.add_reference(misp_object.uuid, "Adversary detail") @@ -185,6 +185,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): slug = details.get("slug", actor_name.lower().replace(" ", "-")) actor_branch = actor_name.split(" ")[1].upper() actor_region = "" + verbosity = self.import_settings["verbose_tags"] if actor_name: for act_reg in [adv for adv in dir(Adversary) if "__" not in adv]: if act_reg in actor_branch: @@ -213,7 +214,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): if not internal: internal = self.create_internal_reference() - self.int_ref_handler(event, "Actor Type", act_type.title(), to_reference, slug, actor_name, internal, False) + self.int_ref_handler(event, "Actor Type", act_type.title(), to_reference, slug, actor_name, internal, verbosity) event.add_tag(f"CrowdStrike:adversary:type: {act_type.upper()}") # Adversary motives @@ -224,7 +225,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): if not internal: internal = self.create_internal_reference() - self.int_ref_handler(event, "Motivation", motive_list_string, to_reference, slug, actor_name, internal, False) + self.int_ref_handler(event, "Motivation", motive_list_string, to_reference, slug, actor_name, internal, verbosity) for mname in mlist: event.add_tag(f"CrowdStrike:adversary:motivation: {mname.upper()}") @@ -236,7 +237,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): if not internal: internal = self.create_internal_reference() - self.int_ref_handler(event, "Capability", cap_val, to_reference, slug, actor_name, internal, False) + self.int_ref_handler(event, "Capability", cap_val, to_reference, slug, actor_name, internal, verbosity) event.add_tag(f"CrowdStrike:adversary:capability: {cap_val.upper()}") # Set adversary event threat level based upon adversary capability if "BELOW" in cap_val.upper() or "LOW" in cap_val.upper(): @@ -262,13 +263,13 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): # Kill chain - Objectives if objectives: - self.int_ref_handler(event, "objectives", objectives, to_reference, slug, actor_name, internal) + self.int_ref_handler(event, "objectives", objectives, to_reference, slug, actor_name, internal, verbosity) # Kill chain - Command and Control if candc: - self.int_ref_handler(event, "command and control", candc, to_reference, slug, actor_name, internal) + self.int_ref_handler(event, "command and control", candc, to_reference, slug, actor_name, internal, verbosity) # Kill chain - Delivery if delivery: - self.int_ref_handler(event, "delivery", delivery, to_reference, slug, actor_name, internal) + self.int_ref_handler(event, "delivery", delivery, to_reference, slug, actor_name, internal, verbosity) # Kill chain - Exploitation if exploitation: exploitation_object = MISPObject("internal-reference") @@ -278,23 +279,24 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): exploits = exploitation.replace("\t", "").replace(" ", "").split("\r\n") ex_id = exploitation_object.add_attribute("comment", exploitation.replace("\t", "").replace(" ", ""), disable_correlation=True) to_reference.append(event.add_object(exploitation_object)) - event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: EXPLOITATION", ex_id.uuid) - event.add_attribute_tag(f"CrowdStrike:adversary:exploitation: {actor_name}", ex_id.uuid) - for exptt in [exp for exp in exploits if exp]: - if exptt not in ["Unknown", "N/A"]: - for exploit in [a.strip() for a in exptt.split(",")]: - if len(exploit.split(" ")) <= 4: - event.add_attribute_tag(f"CrowdStrike:adversary:exploitation: {exploit.upper()}", ex_id.uuid) + if verbosity: + event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: EXPLOITATION", ex_id.uuid) + event.add_attribute_tag(f"CrowdStrike:adversary:exploitation: {actor_name}", ex_id.uuid) + for exptt in [exp for exp in exploits if exp]: + if exptt not in ["Unknown", "N/A"]: + for exploit in [a.strip() for a in exptt.split(",")]: + if len(exploit.split(" ")) <= 4: + event.add_attribute_tag(f"CrowdStrike:adversary:exploitation: {exploit.upper()}", ex_id.uuid) internal.add_reference(exploitation_object.uuid, "Adversary detail") # Kill chain - Installation if installation: - self.int_ref_handler(event, "installation", installation, to_reference, slug, actor_name, internal) + self.int_ref_handler(event, "installation", installation, to_reference, slug, actor_name, internal, verbosity) # Kill chain - Reconnaissance if reconnaissance: - self.int_ref_handler(event, "reconnaissance", reconnaissance, to_reference, slug, actor_name, internal) + self.int_ref_handler(event, "reconnaissance", reconnaissance, to_reference, slug, actor_name, internal, verbosity) # Kill chain - Weaponization if weaponization: - self.int_ref_handler(event, "weaponization", weaponization, to_reference, slug, actor_name, internal) + self.int_ref_handler(event, "weaponization", weaponization, to_reference, slug, actor_name, internal, verbosity) for ref in to_reference: internal.add_reference(ref.uuid, "Adversary detail") @@ -305,7 +307,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): if internal: event.add_object(internal) # Add the description tags - if details.get('description'): + if details.get('description') and verbosity: event.add_attribute_tag(f"CrowdStrike:adversary:description: {actor_name}", desc_id.uuid) event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: DESCRIPTION", desc_id.uuid) @@ -339,10 +341,10 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): event.add_attribute_tag(f"CrowdStrike:adversary:branch: {actor_branch}", ta.uuid) if had_timestamp: event.add_object(timestamp_object) - if tsf: + if tsf and verbosity: event.add_attribute_tag(f"CrowdStrike:adversary:first-seen: {actor_name}", tsf.uuid) event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: FIRST SEEN", tsf.uuid) - if tsl: + if tsl and verbosity: event.add_attribute_tag(f"CrowdStrike:adversary:last-seen: {actor_name}", tsl.uuid) event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: LAST SEEN", tsl.uuid) if actor.get('known_as') or actor.get("origins"): @@ -351,19 +353,20 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): aliased = [a.strip() for a in actor.get("known_as").split(",")] for alias in aliased: kao = known_as_object.add_attribute('alias', alias, disable_correlation=True) - kao.add_tag(f"CrowdStrike:adversary:branch: {actor_branch}") - kao.add_tag(f"CrowdStrike:adversary:{slug}:alias: {alias.upper()}") # Tag the aliases to the threat-actor attribution - event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:alias: {alias.upper()}", ta.uuid) + if verbosity: + kao.add_tag(f"CrowdStrike:adversary:branch: {actor_branch}") + kao.add_tag(f"CrowdStrike:adversary:{slug}:alias: {alias.upper()}") + event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:alias: {alias.upper()}", ta.uuid) event.add_object(known_as_object) for orig in actor.get("origins", []): locale = orig.get("value") if locale: kar = event.add_attribute("country-of-residence", locale, disable_correlation=True) - event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:origin: {locale.upper()}", kar.uuid) - event.add_attribute_tag(f"CrowdStrike:adversary:origin: {locale.upper()}", kar.uuid) event.add_tag(f"CrowdStrike:adversary:origin: {locale.upper()}") - + if verbosity: + event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:origin: {locale.upper()}", kar.uuid) + event.add_attribute_tag(f"CrowdStrike:adversary:origin: {locale.upper()}", kar.uuid) victim = None # Adversary victim location @@ -373,9 +376,9 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): if not victim: victim = MISPObject("victim") vic = victim.add_attribute('regions', region, disable_correlation=True) - vic.add_tag(f"CrowdStrike:target:location: {region.upper()}") - vic.add_tag(f"CrowdStrike:adversary:{slug}:target:location: {region.upper()}") - #vic.add_tag(f"CrowdStrike:adversary:{slug}:target: LOCATION") + if verbosity: + vic.add_tag(f"CrowdStrike:target:location: {region.upper()}") + vic.add_tag(f"CrowdStrike:adversary:{slug}:target:location: {region.upper()}") # Adversary victim industry if actor.get("target_industries"): @@ -384,10 +387,11 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): if not victim: victim = MISPObject("victim") vic = victim.add_attribute('sectors', sector, disable_correlation=True) - vic.add_tag(f"CrowdStrike:adversary:{slug}:target:sector: {sector.upper()}") - #vic.add_tag(f"CrowdStrike:adversary:{slug}:target: SECTOR") - vic.add_tag(f"CrowdStrike:target:sector: {sector.upper()}") - event.add_object(victim) + if verbosity: + vic.add_tag(f"CrowdStrike:adversary:{slug}:target:sector: {sector.upper()}") + vic.add_tag(f"CrowdStrike:target:sector: {sector.upper()}") + if victim: + event.add_object(victim) # TYPE Taxonomic tag, all events if confirm_boolean_param(self.settings["TAGGING"].get("taxonomic_TYPE", False)): diff --git a/cs_misp_import/reports.py b/cs_misp_import/reports.py index ab05f51..fb158d0 100644 --- a/cs_misp_import/reports.py +++ b/cs_misp_import/reports.py @@ -351,7 +351,7 @@ def add_actor_detail(self, report: dict, event: MISPEvent) -> MISPEvent: att = event.add_attribute(**actor_att, disable_correlation=True) for stem in actor_name: for adversary in Adversary: - if adversary.name == stem.upper(): + if adversary.name == stem.upper() and self.import_settings["verbose_tags"]: # Can't cross-tag with this as we're using it for delete event.add_attribute_tag(f"CrowdStrike:report:adversary:branch: {stem.upper()}", att.uuid) event.add_tag(f"CrowdStrike:report:adversary: {actor.get('name')}") @@ -392,7 +392,8 @@ def add_indicator_detail(self, event: MISPEvent, report_id: str, indicator_list: ind_seen["last_seen"] = ind.get("published_date") added = event.add_attribute(indicator_object.type, indicator_object.value, category=indicator_object.category, disable_correlation=True, **ind_seen) - event.add_attribute_tag(f"CrowdStrike:report:indicator:type: {indicator_object.type.upper()}", added.uuid) + if self.import_settings["verbose_tags"]: + event.add_attribute_tag(f"CrowdStrike:report:indicator:type: {indicator_object.type.upper()}", added.uuid) # Event level only #for tag in self.settings["CrowdStrike"]["indicators_tags"].split(","): # event.add_attribute_tag(tag, added.uuid) @@ -416,7 +417,8 @@ def add_victim_detail(self, report: dict, event: MISPEvent) -> MISPEvent: if not victim: victim = MISPObject("victim") vic = victim.add_attribute('regions', country, disable_correlation=True) - vic.add_tag(f"CrowdStrike:target:location: {country.upper()}") + if self.import_settings["verbose_tags"]: + vic.add_tag(f"CrowdStrike:target:location: {country.upper()}") # Also create a target-location attribute for this value (Too noisy?) # reg = event.add_attribute('target-location', country) # event.add_attribute_tag(f"CrowdStrike:target: {country.upper()}", reg.uuid) @@ -429,7 +431,8 @@ def add_victim_detail(self, report: dict, event: MISPEvent) -> MISPEvent: if not victim: victim = MISPObject("victim") vic = victim.add_attribute('sectors', sector, disable_correlation=True) - vic.add_tag(f"CrowdStrike:target:sector: {sector.upper()}") + if self.import_settings["verbose_tags"]: + vic.add_tag(f"CrowdStrike:target:sector: {sector.upper()}") if victim: event.add_object(victim) @@ -472,7 +475,7 @@ def add_report_content(self, report: dict, event: MISPEvent, details: dict, repo # Event level only #for tag in self.settings["CrowdStrike"]["reports_tags"].split(","): # event.add_attribute_tag(tag, att.uuid) - if att.value not in ["text", "Full Report", "Report", report_id]: + if att.value not in ["text", "Full Report", "Report", report_id] and self.import_settings["verbose_tags"]: event.add_attribute_tag(f"CrowdStrike:report:{report_id.lower().replace('-',': ')}", att.uuid) #if report_tag: # event.add_attribute_tag(f"CrowdStrike:report: {report_tag.upper()}", att.uuid) diff --git a/misp_import.py b/misp_import.py index 09e7a05..d1edb45 100644 --- a/misp_import.py +++ b/misp_import.py @@ -77,6 +77,7 @@ def parse_command_line() -> Namespace: parser.add_argument("-p", "--publish", dest="publish", help="Publish events upon creation.", action="store_true", required=False, default=False) parser.add_argument("-t", "--type", "--report_type", "--indicator_type", "--adversary_type", dest="type", help="Import only this type.", required=False, default=False) parser.add_argument("-c", "--config", dest="config_file", help="Path to local configuration file", required=False) + parser.add_argument("-v", "--verbose_tagging", dest="verbose", action="store_false", help="Disable verbose tagging.", required=False, default=True) parser.add_argument("-nd", "--no_dupe_check", dest="no_dupe_check", help="Enable or disable duplicate checking on import, defaults to False.", @@ -305,7 +306,8 @@ def main(): "no_banners": args.no_banner, "no_dupe_check": args.no_dupe_check, "type": args.type, - "publish": args.publish + "publish": args.publish, + "verbose_tags": args.verbose } if not import_settings["unknown_mapping"]: