Skip to content

Commit

Permalink
Support for bug bounty reports
Browse files Browse the repository at this point in the history
  • Loading branch information
Maffooch committed Sep 19, 2024
1 parent 460e8b1 commit 459891c
Show file tree
Hide file tree
Showing 13 changed files with 481 additions and 63 deletions.
2 changes: 1 addition & 1 deletion dojo/settings/.settings.dist.py.sha256sum
Original file line number Diff line number Diff line change
@@ -1 +1 @@
702d74c8bc703d11c03cf5b3f7c4319ad0cdeaef68db6426d1112c59e59365a6
6daac4dfbf815ecca2ad25d380b6f5277e2dc245aab9b5d443abd0f36d74fa1a
1 change: 1 addition & 0 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,7 @@ def saml2_attrib_map_format(dict):
"Legitify Scan": ["title", "endpoints", "severity"],
"ThreatComposer Scan": ["title", "description"],
"Invicti Scan": ["title", "description", "severity"],
"HackerOne Cases": ["title", "severity"],
}

# Override the hardcoded settings here via the env var
Expand Down
223 changes: 168 additions & 55 deletions dojo/tools/h1/parser.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import csv
import hashlib
import io
import json
from contextlib import suppress
from datetime import datetime
from typing import ClassVar

from dojo.models import Finding
from dateutil import parser as date_parser
from django.core.files.uploadedfile import TemporaryUploadedFile

from dojo.models import Finding, Test

__author__ = "Kirill Gotsman"


class VerboseJSONHackerOneParser:
class HackerOneVulnerabilityDisclosureProgram:
"""
Verbose JSON format of HackerOne cases
Vulnerability Disclosure Program HackerOne reports
"""
def get_findings(self, tree, test):

def get_vulnerability_disclosure_json_findings(self, tree, test):
"""
Converts a HackerOne reports to a DefectDojo finding
"""
Expand All @@ -21,7 +29,8 @@ def get_findings(self, tree, test):
# Get all relevant data
date = content["attributes"]["created_at"]
date = datetime.strftime(
datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ"), "%Y-%m-%d",
datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ"),
"%Y-%m-%d",
)
# Build the title of the Dojo finding
title = "#" + content["id"] + " " + content["attributes"]["title"]
Expand All @@ -30,21 +39,15 @@ def get_findings(self, tree, test):

# References
try:
issue_tracker_id = content["attributes"][
"issue_tracker_reference_id"
]
issue_tracker_url = content["attributes"][
"issue_tracker_reference_url"
]
issue_tracker_id = content["attributes"]["issue_tracker_reference_id"]
issue_tracker_url = content["attributes"]["issue_tracker_reference_url"]
references = f"[{issue_tracker_id}]({issue_tracker_url})\n"
except Exception:
references = ""

# Build the severity of the Dojo finding
try:
severity = content["relationships"]["severity"]["data"][
"attributes"
]["rating"].capitalize()
severity = content["relationships"]["severity"]["data"]["attributes"]["rating"].capitalize()
if severity not in ["Low", "Medium", "High", "Critical"]:
severity = "Info"
except Exception:
Expand All @@ -64,9 +67,7 @@ def get_findings(self, tree, test):
# Set CWE of the Dojo finding
try:
cwe = int(
content["relationships"]["weakness"]["data"]["attributes"][
"external_id"
][4:],
content["relationships"]["weakness"]["data"]["attributes"]["external_id"][4:],
)
except Exception:
cwe = 0
Expand Down Expand Up @@ -104,11 +105,10 @@ def get_findings(self, tree, test):
def build_description(self, content):
date = content["attributes"]["created_at"]
date = datetime.strftime(
datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ"), "%Y-%m-%d",
datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ"),
"%Y-%m-%d",
)
reporter = content["relationships"]["reporter"]["data"]["attributes"][
"username"
]
reporter = content["relationships"]["reporter"]["data"]["attributes"]["username"]
triaged_date = content["attributes"]["triaged_at"]

# Build the description of the Dojo finding
Expand All @@ -125,9 +125,7 @@ def build_description(self, content):

# Try to grab CVSS
try:
cvss = content["relationships"]["severity"]["data"]["attributes"][
"score"
]
cvss = content["relationships"]["severity"]["data"]["attributes"]["score"]
description += f"CVSS: {cvss}\n"
except Exception:
pass
Expand All @@ -139,32 +137,125 @@ def build_description(self, content):

# Try to grab weakness if it's there
try:
weakness_title = content["relationships"]["weakness"]["data"][
"attributes"
]["name"]
weakness_desc = content["relationships"]["weakness"]["data"][
"attributes"
]["description"]
weakness_title = content["relationships"]["weakness"]["data"]["attributes"]["name"]
weakness_desc = content["relationships"]["weakness"]["data"]["attributes"]["description"]
description += f"\n##Weakness: {weakness_title}\n{weakness_desc}"
except Exception:
pass

return description


class JSONHackerOneParser:
"""Parse the JSON format"""
def get_findings(self, tree, test):
return []
class HackerOneBugBountyProgram:
"""Bug Bounty Program HackerOne reports."""

fields_to_label: ClassVar[dict[str, str]] = {
"id": "ID",
"weakness": "Weakness Category",
"substate": "Substate",
"reporter": "Reporter",
"assigned": "Assigned To",
"public": "Public",
"triageted_at": "Triaged On",
"closed_at": "Closed On",
"awarded_at": "Awarded On",
"bounty": "Bounty Price",
"bonus": "Bonus",
"first_response_at": "First Response On",
"source": "Source",
"reference": "Reference",
"reference_url": "Reference URL",
"structured_scope": "Structured Scope",
"structured_scope_reference": "Structured Scope Reference",
"original_report_id": "Original Report ID",
"collaborating_users": "Collaboration Users",
"duplicate_report_ids": "Duplicate Report IDs",
}

def get_bug_bounty_program_json_findings(self, dict_list: dict, test: Test) -> list[Finding]:
return self.parse_findings(dict_list, test)

class CSVHackerOneParser:
"""Parse the CSV format"""
def get_findings(self, file, test):
return []
def get_bug_bounty_program_csv_findings(self, dict_list: dict, test: Test) -> list[Finding]:
return self.parse_findings(dict_list, test)

def parse_findings(self, dict_list: list[dict], test: Test) -> list[Finding]:
"""Return a list of findings generated by the submitted report."""
findings = []
for entry in dict_list:
status_dict = self.determine_status(entry)
finding = Finding(
title=entry.get("title"),
severity=self.convert_severity(entry),
description=self.parse_description(entry),
date=date_parser.parse(entry.get("reported_at")),
dynamic_finding=True,
test=test,
**status_dict,
)
# Add vulnerability IDs if they are present
if (cve_str := entry.get("cve_ids")) is not None and len(cve_str) > 0:
finding.unsaved_vulnerability_ids = [cve_str]
# Add the finding the the list
findings.append(finding)
return findings

def determine_status(self, row) -> dict:
"""Generate a dict of status meta to fully represent that state of the finding
Possible states currently supported are open and closed. In the event that neither
of those options are present, the open status will be the default, and returned
"""
default_status = {
"active": True,
}
# Open status -> active = True
# Closed status -> is_mitigated = True + timestamp
if (status := row.get("state")) is not None:
if status == "open":
return default_status
if status == "closed":
return {
"is_mitigated": True,
"active": False,
"mitigated": date_parser.parse(row.get("closed_at")),
}
return default_status

class H1Parser:
def convert_severity(self, entry: dict) -> str:
"""Convert the severity from the parser from the string value, or CVSS score."""
# Try to use the string severity first
if (severity := entry.get("severity_rating")) is not None:
if severity in ["critical", "high", "medium", "low"]:
return severity.capitalize()
# Fall back to "severity_score" which I assume is CVSS Score
if (severity_score := entry.get("severity_score")) is not None:
with suppress(ValueError):
severity_score = float(severity_score)
if severity_score >= 9.0:
return "Critical"
if severity_score >= 7.0:
return "High"
if severity_score >= 4.0:
return "Medium"
if severity_score > 0.0:
return "Low"
# Default to Info in all cases (assuming we reach this)
return "Info"

def parse_description(self, entry: dict) -> str:
"""Build the description from the mapping set in the fields_to_label var."""
# Iterate over the items and build the string
description = ""
for field, label in self.fields_to_label.items():
if (value := entry.get(field)) is not None and len(value) > 0:
description += f"**{label}**: {value}\n"
return description


class H1Parser(
HackerOneVulnerabilityDisclosureProgram,
HackerOneBugBountyProgram,
):
"""
A class that can be used to parse the Get All Reports JSON export from HackerOne API.
"""
Expand All @@ -178,32 +269,54 @@ def get_label_for_scan_types(self, scan_type):
def get_description_for_scan_types(self, scan_type):
return "Import HackerOne cases findings in JSON format."

def get_json_tree(self, file):
# Load the contents of the JSON file into a dictionary
def get_findings(self, file: TemporaryUploadedFile, test: Test) -> list[Finding]:
"""Return the list of findings generated from the uploaded report."""
# first determine which format to pase
file_name = file.name
if str(file_name).endswith(".json"):
return self.determine_json_format(file, test)
elif str(file_name).endswith(".csv"):
return self.determine_csv_format(file, test)
else:
msg = "Filename extension not recognized. Use .json or .csv"
raise ValueError(msg)

def get_json_tree(self, file: TemporaryUploadedFile) -> dict:
"""Extract the CSV file into a iterable that represents a dict."""
data = file.read()
try:
tree = json.loads(str(data, "utf-8"))
except Exception:
tree = json.loads(data)
return tree

def get_findings(self, file, test):
# first determine which format to pase
if str(file.name).endswith(".json"):
return self.determine_json_format(file, test)
elif str(file.name).endswith(".csv"):
return CSVHackerOneParser().get_findings(file, test)
else:
msg = "Filename extension not recognized. Use .json or .csv"
raise ValueError(msg)

def determine_json_format(self, file, test):
def determine_json_format(self, file: TemporaryUploadedFile, test: Test) -> list[Finding]:
"""Evaluate the format of the JSON report that was uploaded to determine which parser to use."""
tree = self.get_json_tree(file)
# Check for some root elements
if "finding" in tree:
return JSONHackerOneParser().get_findings(tree, test)
if "findings" in tree:
return self.get_bug_bounty_program_json_findings(tree.get("findings", []), test)
if "data" in tree:
return VerboseJSONHackerOneParser().get_findings(tree, test)
return self.get_vulnerability_disclosure_json_findings(tree, test)
else:
msg = "This JSON format is not supported"
raise ValueError(msg)

def get_csv_reader(self, file: TemporaryUploadedFile) -> csv.DictReader:
"""Extract the CSV file into a iterable that represents a dict."""
if file is None:
return ()
content = file.read()
if isinstance(content, bytes):
content = content.decode("utf-8")
return csv.DictReader(io.StringIO(content), delimiter=",", quotechar='"')

def determine_csv_format(self, file: TemporaryUploadedFile, test: Test) -> list[Finding]:
"""Evaluate the format of the CSV report that was uploaded to determine which parser to use."""
reader = self.get_csv_reader(file)
# Check for some root elements
if "bounty" in reader.fieldnames:
return self.get_bug_bounty_program_csv_findings(reader, test)
else:
msg = "This CSV format is not supported"
raise ValueError(msg)
5 changes: 5 additions & 0 deletions unittests/scans/h1/bug_bounty_many.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,title,severity_rating,severity_score,state,substate,weakness,reported_at,first_response_at,triaged_at,closed_at,awarded_at,assigned,reporter,source,bounty,bonus,public,reference,reference_url,structured_scope,structured_scope_reference,original_report_id,cve_ids,collaborating_users,duplicate_report_ids
2501687,Sensitive Account Balance Information Exposure via example's DaviPlata Payment Link Integration,medium,,open,triaged,Information Disclosure,2024-05-12 04:05:27 UTC,2024-05-14 22:14:16 UTC,2024-08-28 19:35:16 UTC,,2024-08-28 19:40:24 UTC,Group example.co Team,reporter,,400.0,,no,,,1489537348,,,"",,
2710467,Acceso no autorizado a soporte premium sin pagar,critical,9.1,open,new,,2024-09-10 15:38:20 UTC,,,,,,reporter,,,,no,,,example.co,,,"",,
2682608,XSS - stg.pse.mock.example.co,none,0.0,closed,duplicate,,2024-08-25 07:27:18 UTC,2024-08-27 18:19:23 UTC,,2024-08-27 18:19:23 UTC,,,reporter,,,,no,,,,,2311675,"",,
2616856,example.co/File creation via HTTP method PUT,critical,,closed,duplicate,,2024-07-22 17:54:36 UTC,2024-07-22 20:57:56 UTC,,2024-07-22 20:57:56 UTC,,,reporter,,,,no,,,example.co,,2597854,CVE-2017-12615,,
Loading

0 comments on commit 459891c

Please sign in to comment.