Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dependency-mgmt): format *.py files under ci/src/dependencies #1750

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ def get_open_findings_for_repo_and_scanner(
) -> Dict[Tuple[str, str, str, str], Finding]:
return {}

def get_deleted_findings(
self, repository: str, scanner: str, dependency_id: str
) -> List[Finding]:
def get_deleted_findings(self, repository: str, scanner: str, dependency_id: str) -> List[Finding]:
return []

def commit_has_block_exception(self, commit_type: CommitType, commit_hash: str) -> bool:
Expand Down
4 changes: 1 addition & 3 deletions ci/src/dependencies/data_source/finding_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ def get_open_findings_for_repo_and_scanner(
raise NotImplementedError

@abc.abstractmethod
def get_deleted_findings(
self, repository: str, scanner: str, dependency_id: str
) -> List[Finding]:
def get_deleted_findings(self, repository: str, scanner: str, dependency_id: str) -> List[Finding]:
"""Retrieve deleted findings with the given properties from data source, returns an empty list if no deleted findings exist or the data source doesn't support retrieval of deleted findings."""
raise NotImplementedError

Expand Down
99 changes: 59 additions & 40 deletions ci/src/dependencies/data_source/jira_finding_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ class JiraFindingDataSource(FindingDataSource):
risk_assessors: List[User]
app_owner_msg_subscriber: AppOwnerMsgSubscriber

def __init__(self, subscribers: List[FindingDataSourceSubscriber], app_owner_msg_subscriber: AppOwnerMsgSubscriber = ConsoleLoggerAppOwnerMsgSubscriber(), custom_jira: Optional[JIRA] = None):
def __init__(
self,
subscribers: List[FindingDataSourceSubscriber],
app_owner_msg_subscriber: AppOwnerMsgSubscriber = ConsoleLoggerAppOwnerMsgSubscriber(),
custom_jira: Optional[JIRA] = None,
):
logging.debug(f"JiraFindingDataSource({subscribers},{custom_jira})")
self.subscribers = subscribers
self.jira = (
Expand All @@ -98,9 +103,7 @@ def __init__(self, subscribers: List[FindingDataSourceSubscriber], app_owner_msg
# Remove the unnecessary text strings from the description of the Linux kernel CNA CVEs
@staticmethod
def __filter_linux_kernel_cna_cves(vuln_description: str) -> str:
filter_strings = [
"In the Linux kernel, the following vulnerability has been resolved: "
]
filter_strings = ["In the Linux kernel, the following vulnerability has been resolved: "]
for filter_string in filter_strings:
vuln_description = vuln_description.replace(filter_string, "")
return vuln_description
Expand Down Expand Up @@ -135,29 +138,29 @@ def __jira_to_finding_vulnerabilities(vulnerability_table: str) -> Optional[List
# nested square brackets are not supported
parts: List[str] = []
is_link = False
parsed = ''
parsed = ""
for c in row:
if c == '[':
if c == "[":
if is_link:
# nested links are not supported
return None
else:
is_link = True
parsed += c
elif c == ']':
elif c == "]":
is_link = False
parsed += c
elif c == '|':
elif c == "|":
if is_link:
parsed += c
else:
parts.append(parsed)
parsed = ''
parsed = ""
else:
parsed += c
parts.append(parsed)

if len(parts) > 1 and parts[1].startswith('[') and parts[1].endswith(']'):
if len(parts) > 1 and parts[1].startswith("[") and parts[1].endswith("]"):
# jira has changed the vulnerability id to a wiki markup link, e.g.
# [https://avd.aquasec.com/nvd/cve-2023-35823|https://avd.aquasec.com/nvd/cve-2023-35823]
# change it back to https://avd.aquasec.com/nvd/cve-2023-35823
Expand All @@ -169,9 +172,21 @@ def __jira_to_finding_vulnerabilities(vulnerability_table: str) -> Optional[List

if len(parts) == 5:
# backwards compatibility for entries that don't have risk column
res.append(Vulnerability(id=parts[1], name=parts[2], description=parts[3], score=int(parts[4]), risk_note=JIRA_VULNERABILITY_TABLE_RISK_NOTE_MIGRATION_LABEL))
res.append(
Vulnerability(
id=parts[1],
name=parts[2],
description=parts[3],
score=int(parts[4]),
risk_note=JIRA_VULNERABILITY_TABLE_RISK_NOTE_MIGRATION_LABEL,
)
)
elif len(parts) == 6:
res.append(Vulnerability(id=parts[1], name=parts[2], description=parts[3], score=int(parts[4]), risk_note=parts[5]))
res.append(
Vulnerability(
id=parts[1], name=parts[2], description=parts[3], score=int(parts[4]), risk_note=parts[5]
)
)
else:
# unexpected format
return None
Expand Down Expand Up @@ -385,16 +400,20 @@ def __finding_diff_to_jira(finding_old: Optional[Finding], finding_new: Finding)
summary_update_needed = True
if finding_old is None or finding_old.vulnerable_dependency != finding_new.vulnerable_dependency:
res[JIRA_FINDING_TO_CUSTOM_FIELD.get("vulnerable_dependency_id")[0]] = finding_new.vulnerable_dependency.id
res[
JIRA_FINDING_TO_CUSTOM_FIELD.get("vulnerable_dependency_version")[0]
] = finding_new.vulnerable_dependency.version
res[JIRA_FINDING_TO_CUSTOM_FIELD.get("vulnerable_dependency_version")[0]] = (
finding_new.vulnerable_dependency.version
)
summary_update_needed = True
dep_update_needed = True
patch_version_update_needed = True
if finding_old is None or finding_old.vulnerabilities != finding_new.vulnerabilities or finding_new.vulnerabilities[0].risk_note == JIRA_VULNERABILITY_TABLE_RISK_NOTE_MIGRATION_LABEL:
res[
JIRA_FINDING_TO_CUSTOM_FIELD.get("vulnerabilities")[0]
] = JiraFindingDataSource.__finding_to_jira_vulnerabilities(finding_new.vulnerabilities)
if (
finding_old is None
or finding_old.vulnerabilities != finding_new.vulnerabilities
or finding_new.vulnerabilities[0].risk_note == JIRA_VULNERABILITY_TABLE_RISK_NOTE_MIGRATION_LABEL
):
res[JIRA_FINDING_TO_CUSTOM_FIELD.get("vulnerabilities")[0]] = (
JiraFindingDataSource.__finding_to_jira_vulnerabilities(finding_new.vulnerabilities)
)
patch_version_update_needed = True
if finding_old is None or finding_old.first_level_dependencies != finding_new.first_level_dependencies:
dep_update_needed = True
Expand All @@ -417,9 +436,9 @@ def __finding_diff_to_jira(finding_old: Optional[Finding], finding_new: Finding)
owning_teams.append(JIRA_OWNER_GROUP_BY_TEAM[team])
res[JIRA_FINDING_TO_CUSTOM_FIELD.get("owning_teams")[0]] = owning_teams
if finding_old is None or finding_old.patch_responsible != finding_new.patch_responsible:
res[
JIRA_FINDING_TO_CUSTOM_FIELD.get("patch_responsible")[0]
] = JiraFindingDataSource.__finding_to_jira_users(finding_new.patch_responsible)
res[JIRA_FINDING_TO_CUSTOM_FIELD.get("patch_responsible")[0]] = (
JiraFindingDataSource.__finding_to_jira_users(finding_new.patch_responsible)
)
if finding_old is None or finding_old.due_date != finding_new.due_date:
res[JIRA_FINDING_TO_CUSTOM_FIELD.get("due_date")[0]] = JiraFindingDataSource.__finding_to_jira_due_date(
finding_new.due_date
Expand All @@ -428,16 +447,16 @@ def __finding_diff_to_jira(finding_old: Optional[Finding], finding_new: Finding)
res[JIRA_FINDING_TO_CUSTOM_FIELD.get("score")[0]] = None if finding_new.score == -1 else finding_new.score

if summary_update_needed:
res[
"summary"
] = f"[{finding_new.repository}][{finding_new.scanner}] Vulnerability in {finding_new.vulnerable_dependency.name} {finding_new.vulnerable_dependency.version}"[
:100
res["summary"] = (
f"[{finding_new.repository}][{finding_new.scanner}] Vulnerability in {finding_new.vulnerable_dependency.name} {finding_new.vulnerable_dependency.version}"[
:100
]
)
all_deps: List[Dependency] = [finding_new.vulnerable_dependency] + finding_new.first_level_dependencies
if dep_update_needed:
res[
JIRA_FINDING_TO_CUSTOM_FIELD.get("dependencies")[0]
] = JiraFindingDataSource.__finding_to_jira_dependencies(all_deps)
res[JIRA_FINDING_TO_CUSTOM_FIELD.get("dependencies")[0]] = (
JiraFindingDataSource.__finding_to_jira_dependencies(all_deps)
)
if patch_version_update_needed:
(
res[JIRA_FINDING_TO_CUSTOM_FIELD.get("patch_versions")[0]],
Expand Down Expand Up @@ -558,9 +577,7 @@ def get_open_findings_for_repo_and_scanner(
res[finding.id()] = deepcopy(finding)
return res

def get_deleted_findings(
self, repository: str, scanner: str, dependency_id: str
) -> List[Finding]:
def get_deleted_findings(self, repository: str, scanner: str, dependency_id: str) -> List[Finding]:
cache_key = (repository, scanner, dependency_id)
if cache_key in self.deleted_findings_cached:
return deepcopy(list(map(lambda x: x[0], self.deleted_findings_cached[cache_key])))
Expand All @@ -581,7 +598,11 @@ def get_deleted_findings(
result = []
for issue in issues:
finding: Finding = self.__jira_to_finding(issue)
if finding.repository == repository and finding.scanner == scanner and finding.vulnerable_dependency.id == dependency_id:
if (
finding.repository == repository
and finding.scanner == scanner
and finding.vulnerable_dependency.id == dependency_id
):
result.append((finding, issue))
self.deleted_findings_cached[cache_key] = result
return deepcopy(list(map(lambda x: x[0], result)))
Expand All @@ -608,7 +629,9 @@ def __does_exceed_character_limit(finding: Finding, fields_to_update: Dict[str,
for field_name, field_value in fields_to_update.items():
try:
if len(field_value) > 32700:
logging.warning(f"field {field_name} in finding {finding.id()} exceeds character limit with {len(field_value)} characters")
logging.warning(
f"field {field_name} in finding {finding.id()} exceeds character limit with {len(field_value)} characters"
)
does_exceed = True
except TypeError:
pass # some types don't have a length
Expand Down Expand Up @@ -707,10 +730,6 @@ def get_risk_assessor(self) -> List[User]:
self.risk_assessors = self.__jira_to_finding_users(assessors)
return self.risk_assessors
except RuntimeError:
logging.error(
"could not determine risk assessors by ticket\nusing default risk assessors instead"
)
logging.debug(
f"could not determine risk assessors by ticket, reason:\n{traceback.format_exc()}"
)
logging.error("could not determine risk assessors by ticket\nusing default risk assessors instead")
logging.debug(f"could not determine risk assessors by ticket, reason:\n{traceback.format_exc()}")
return JIRA_DEFAULT_RISK_ASSESSORS
Loading
Loading