Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sample static analyzers -> visualizer #2404

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions api_app/visualizers_manager/visualizers/static_sample_analyzers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

from logging import getLogger
from typing import Dict, List

from api_app.analyzers_manager.models import AnalyzerReport
from api_app.visualizers_manager.classes import Visualizer
from api_app.visualizers_manager.decorators import (
visualizable_error_handler_with_params,
)

logger = getLogger(__name__)


class StaticSampleAnalyzers(Visualizer):
@classmethod
def update(cls) -> bool:
pass

@visualizable_error_handler_with_params("Yara Signatures")
def _yara_signatures(self):
yara_report = self.analyzer_reports().get(config__name="Yara")
signatures = [
match["match"]
for matches in yara_report.report.values()
for match in matches
if match.get("match", None)
]
disable_signatures = not signatures
return self.VList(
name=self.Base(value="Yara Signatures", disable=disable_signatures),
value=[
self.Base(value=value, disable=disable_signatures)
for value in signatures
],
start_open=True,
disable=disable_signatures,
)

@visualizable_error_handler_with_params("ClamAV Signatures")
def _clamav_signatures(self):
clamav_report = self.analyzer_reports().get(config__name="ClamAV")
detections = clamav_report.report.get("detections", [])
disable_signatures = not detections
return self.VList(
name=self.Base(value="ClamAV Signatures", disable=disable_signatures),
value=[
self.Base(value=value, disable=disable_signatures)
for value in detections
],
disable=disable_signatures,
)

@visualizable_error_handler_with_params("Mraptor")
def _mraptor(self, analyzer_report):
report = analyzer_report.report
mraptor = report.get("mraptor", "")
mraptor_match = False
if mraptor == "suspicious":
mraptor_match = True
disable_signatures = not mraptor_match
return self.Title(
name=self.Base(value="Mraptor", disable=disable_signatures),
value=self.Base(
value="found" if mraptor_match else "", disable=disable_signatures
),
disable=disable_signatures,
)

@visualizable_error_handler_with_params("Extracted CVEs")
def _cves(self, analyzer_report):
report = analyzer_report.report
cves = report.get("extracted_CVEs", "")
extracted_cves = []
for cve_item in cves:
extracted_cves.extend(cve_item.get("CVEs", []))
disable_signatures = not extracted_cves
return self.VList(
name=self.Base(value="Extracted CVEs", disable=disable_signatures),
value=[
self.Base(value=value, disable=disable_signatures)
for value in extracted_cves
],
disable=disable_signatures,
)

@visualizable_error_handler_with_params("PDF Info")
def _pdf_info(self):
try:
analyzer_report = self.analyzer_reports().get(config__name="PDF_Info")
except AnalyzerReport.DoesNotExist:
pdf_report = self.Title(
self.Base(
value="PDF Info",
link="",
),
self.Base(value="Not executed"),
disable=True,
)
return pdf_report
else:
report = analyzer_report.report
peepdf = report.get("peepdf", [])
uris = []
for stat in peepdf.get("stats", []):
uris.extend(stat.get("uris", []))

disable_signatures = not uris
return self.VList(
name=self.Base(
value="Extracted URLs from PDFs", disable=disable_signatures
),
value=[
self.Base(value=value, disable=disable_signatures) for value in uris
],
disable=disable_signatures,
)

@visualizable_error_handler_with_params("CAPA Info")
def _capa_info(self):
try:
analyzer_report = self.analyzer_reports().get(config__name="CAPA_Info")
except AnalyzerReport.DoesNotExist:
virustotal_report = self.Title(
self.Base(
value="CAPA Info",
link="",
),
self.Base(value="Not executed"),
disable=True,
)
return virustotal_report
else:
report = analyzer_report.report
capa_rules = report.get("rules", {}).keys()
disable_signatures = not capa_rules
return self.VList(
name=self.Base(value="CAPA Signatures", disable=disable_signatures),
value=[
self.Base(value=value, disable=disable_signatures)
for value in capa_rules
],
disable=disable_signatures,
)

@visualizable_error_handler_with_params("Blint Info")
def _blint_info(self):
try:
analyzer_report = self.analyzer_reports().get(config__name="Blint")
except AnalyzerReport.DoesNotExist:
virustotal_report = self.Title(
self.Base(
value="Blint Info",
link="",
),
self.Base(value="Not executed"),
disable=True,
)
return virustotal_report
else:
report = analyzer_report.report
findings = report.get("findings", [])
titles = [f.get("title", "") for f in findings]
disable_signatures = not findings
return self.VList(
name=self.Base(value="Blint Signatures", disable=disable_signatures),
value=[
self.Base(value=value, disable=disable_signatures)
for value in titles
],
disable=disable_signatures,
)

def run(self) -> List[Dict]:
page1 = self.Page(name="Signatures Info")
page2 = self.Page(name="Docs Info")
page3 = self.Page(name="Executables Info")

h1 = self.HList(value=[self._yara_signatures(), self._clamav_signatures()])
page1.add_level(
self.Level(position=1, size=self.LevelSize.S_3, horizontal_list=h1)
)

doc_info = self.analyzer_reports().get(config__name="Doc_Info")

h2 = self.HList(
value=[self._mraptor(doc_info), self._cves(doc_info), self._pdf_info()]
)
page2.add_level(
self.Level(position=1, size=self.LevelSize.S_3, horizontal_list=h2)
)

h3 = self.HList(value=[self._capa_info(), self._blint_info()])
page3.add_level(
self.Level(position=1, size=self.LevelSize.S_3, horizontal_list=h3)
)

return [page1.to_dict(), page2.to_dict(), page3.to_dict()]

@classmethod
def _monkeypatch(cls):
patches = []
return super()._monkeypatch(patches=patches)
3 changes: 2 additions & 1 deletion api_app/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ def disconnect(self, close_code) -> None:

def send_job(self, event) -> None:
job_data = event["job"]
logger.debug(f"job data: {job_data}")
# this is too much noisy
# logger.debug(f"job data: {job_data}")
self.send_json(content=job_data)
if job_data["status"] in Status.final_statuses():
logger.debug("job sent to the client and terminated, close ws")
Expand Down
Loading