Skip to content

Commit

Permalink
Merge pull request #159 from matejak/rhel9-polish
Browse files Browse the repository at this point in the history
Polish some aspects of the addon
  • Loading branch information
jan-cerny authored Jul 2, 2021
2 parents 56a1530 + 7046f39 commit 16c6045
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 243 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ container-test:
podman build --tag $(CONTAINER_NAME) --file tests/Dockerfile
podman run --volume .:/oscap-anaconda-addon:Z $(CONTAINER_NAME) make test

test: runpylint unittest
test: unittest runpylint

runpylint:
@echo "***Running pylint checks***"
Expand Down
11 changes: 7 additions & 4 deletions org_fedora_oscap/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ def N_(string): return string

SSG_DIR = "/usr/share/xml/scap/ssg/content/"

# Make it easy to change e.g. by sed substitution in spec files
# First name is the canonical addon name, rest are adapters
ADDON_NAMES = ["com_redhat_oscap", "org_fedora_oscap"]

COMPLAIN_ABOUT_NON_CANONICAL_NAMES = True

# Enable patches that set the content name at package-time
DEFAULT_SSG_CONTENT_NAME = ""
SSG_CONTENT = DEFAULT_SSG_CONTENT_NAME
Expand Down Expand Up @@ -572,10 +578,7 @@ def get_preinst_content_path(data):
return data.content_path

if data.content_type == "datastream":
return utils.join_paths(
INSTALLATION_CONTENT_DIR,
get_content_name(data)
)
return get_raw_preinst_content_path(data)

return utils.join_paths(
INSTALLATION_CONTENT_DIR,
Expand Down
28 changes: 17 additions & 11 deletions org_fedora_oscap/content_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pathlib
import shutil
from glob import glob
from typing import List

from pyanaconda.core import constants
from pyanaconda.threading import threadMgr
Expand Down Expand Up @@ -174,6 +175,7 @@ def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callbac

def _verify_fingerprint(self, dest_filename, fingerprint=""):
if not fingerprint:
log.info("No fingerprint provided, skipping integrity check")
return

hash_obj = utils.get_hashing_algorithm(fingerprint)
Expand All @@ -186,6 +188,7 @@ def _verify_fingerprint(self, dest_filename, fingerprint=""):
)
msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match")
raise content_handling.ContentCheckError(msg)
log.info(f"Integrity check passed using {hash_obj.name} hash")

def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
threadMgr.wait(wait_for)
Expand Down Expand Up @@ -263,9 +266,10 @@ def get_preferred_content(self, content):
return preferred_content

def get_preferred_tailoring(self, content):
if self._addon_data.tailoring_path:
if self._addon_data.tailoring_path != str(content.tailoring.relative_to(content.root)):
msg = f"Expected a tailoring {self.tailoring_path}, but it couldn't be found"
tailoring_path = self._addon_data.tailoring_path
if tailoring_path:
if tailoring_path != str(content.tailoring.relative_to(content.root)):
msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
raise content_handling.ContentHandlingError(msg)
return content.tailoring

Expand All @@ -279,12 +283,12 @@ class ObtainedContent:
"""
def __init__(self, root):
self.labelled_files = dict()
self.datastream = ""
self.xccdf = ""
self.ovals = []
self.tailoring = ""
self.archive = ""
self.verified = ""
self.datastream = None # type: Pathlib.Path
self.xccdf = None # type: Pathlib.Path
self.ovals = [] # type: List[Pathlib.Path]
self.tailoring = None # type: Pathlib.Path
self.archive = None # type: Pathlib.Path
self.verified = None # type: Pathlib.Path
self.root = pathlib.Path(root)

def record_verification(self, path):
Expand All @@ -305,14 +309,16 @@ def add_content_archive(self, fname):

def _assign_content_type(self, attribute_name, new_value):
old_value = getattr(self, attribute_name)
if old_value:
if old_value and old_value != new_value:
msg = (
f"When dealing with {attribute_name}, "
f"there was already the {old_value.name} when setting the new {new_value.name}")
raise content_handling.ContentHandlingError(msg)
setattr(self, attribute_name, new_value)

def add_file(self, fname, label):
def add_file(self, fname, label=None):
if not label:
label = content_handling.identify_files([fname])[fname]
path = pathlib.Path(fname)
if label == content_handling.CONTENT_TYPES["TAILORING"]:
self._assign_content_type("tailoring", path)
Expand Down
1 change: 1 addition & 0 deletions org_fedora_oscap/cpioarchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def close(self):
self.file.close()

def __next__(self):
# pylint: disable = E1102
return self.next()

def next(self):
Expand Down
5 changes: 4 additions & 1 deletion org_fedora_oscap/gui/spokes/oscap.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def __init__(self, data, storage, payload):
self.initialization_controller.init_done.connect(self._all_anaconda_spokes_initialized)

self.content_bringer = content_discovery.ContentBringer(self._policy_data)
self._content_handler = None

def _all_anaconda_spokes_initialized(self):
log.debug("OSCAP addon: Anaconda init_done signal triggered")
Expand Down Expand Up @@ -669,7 +670,8 @@ def _resolve_rootpw_issues(self, messages, report_only):
self.__class__, common.MESSAGE_TYPE_WARNING, msg.text)
messages.append(msg)

if not report_only:
passwords_can_be_fixed = False
if not report_only and passwords_can_be_fixed:
users_proxy = USERS.get_proxy()

self.__old_root_pw = users_proxy.RootPassword
Expand Down Expand Up @@ -1125,6 +1127,7 @@ def on_profile_clicked(self, widget, event, *args):
return

# if a profile is double-clicked, we should switch to it
# pylint: disable = E1101
if event.type == Gdk.EventType._2BUTTON_PRESS:
self._switch_profile()

Expand Down
152 changes: 66 additions & 86 deletions org_fedora_oscap/service/installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,92 +25,86 @@

from org_fedora_oscap import common, data_fetch, rule_handling, utils
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
from org_fedora_oscap.content_handling import ContentCheckError
from org_fedora_oscap import content_discovery

log = logging.getLogger(__name__)


REQUIRED_PACKAGES = ("openscap", "openscap-scanner",)


class FetchContentTask(Task):
"""The installation task for fetching the content."""
def _handle_error(exception):
log.error("Failed to fetch and initialize SCAP content!")

def __init__(self, policy_data, file_path, content_path):
"""Create a task."""
super().__init__()
self._policy_data = policy_data
self._file_path = file_path
self._content_path = content_path

@property
def name(self):
return "Fetch the content"
if isinstance(exception, ContentCheckError):
msg = _("The integrity check of the security content failed.")
terminate(msg)
elif (
isinstance(exception, common.OSCAPaddonError)
or isinstance(exception, data_fetch.DataFetchError)):
msg = _("There was an error fetching and loading the security content:\n" +
f"{str(exception)}")
terminate(msg)

def run(self):
"""Run the task."""
# Is the content available?
if os.path.exists(self._content_path):
log.debug("Content is already available. Skip.")
return
else:
msg = _("There was an unexpected problem with the supplied content.")
terminate(msg)

if os.path.exists(self._file_path):
log.debug("Content is already available. Skip.")
return

try:
data_fetch.fetch_data(
self._policy_data.content_url,
self._file_path,
self._policy_data.certificates
)
def terminate(message):
message += "\n" + _("The installation should be aborted.")
raise NonCriticalInstallationError(message)

# RPM is an archive at this phase
if self._policy_data.content_type in ("archive", "rpm"):
# extract the content
common.extract_data(
self._file_path,
common.INSTALLATION_CONTENT_DIR,
[self._policy_data.content_path]
)

except (common.OSCAPaddonError, data_fetch.DataFetchError) as e:
log.error("Failed to fetch SCAP content!")

raise NonCriticalInstallationError(_(
"There was an error fetching the security content:\n%s\n"
"The installation should be aborted."
) % e)


class CheckFingerprintTask(Task):
"""The installation task for checking the fingerprint."""
class PrepareValidContent(Task):
"""The installation task for fetching the content."""

def __init__(self, policy_data, file_path):
def __init__(self, policy_data, file_path, content_path):
"""Create a task."""
super().__init__()
self._policy_data = policy_data
self._file_path = file_path
self._content_path = content_path
self.content_bringer = content_discovery.ContentBringer(policy_data)

@property
def name(self):
return "Check the fingerprint"
return "Fetch the content, and optionally perform check or archive extraction"

def run(self):
"""Run the task."""
if not self._policy_data.fingerprint:
log.debug("No fingerprint is provided. Skip.")
return

hash_obj = utils.get_hashing_algorithm(self._policy_data.fingerprint)
digest = utils.get_file_fingerprint(self._file_path, hash_obj)

if digest != self._policy_data.fingerprint:
log.error("Failed to fetch and initialize SCAP content!")
# Is the content available?
fetching_thread_name = None
if not os.path.exists(self._content_path):
# content not available/fetched yet
fetching_thread_name = self.content_bringer.fetch_content(
_handle_error, self._policy_data.certificates)

content_dest = None
if self._policy_data.content_type != "scap-security-guide":
content_dest = self._file_path

content = self.content_bringer.finish_content_fetch(
fetching_thread_name, self._policy_data.fingerprint,
lambda msg: log.info(msg), content_dest, _handle_error)

if not content:
# this shouldn't happen because error handling is supposed to
# terminate the addon before finish_content_fetch returns
_handle_error(Exception())

remote_content_was_present = (
not fetching_thread_name
and self._policy_data.content_type != "scap-security-guide")
if remote_content_was_present:
content.add_file(self._content_path)

raise NonCriticalInstallationError(_(
"The integrity check of the security content failed.\n" +
"The installation should be aborted."
))
try:
# just check that preferred content exists
_ = self.content_bringer.get_preferred_content(content)
except Exception as exc:
terminate(str(exc))


class EvaluateRulesTask(Task):
Expand All @@ -134,41 +128,27 @@ def run(self):

def _initialize_rules(self):
try:
rules = common.get_fix_rules_pre(
self._policy_data.profile_id,
self._content_path,
self._policy_data.datastream_id,
self._policy_data.xccdf_id,
self._tailoring_path
)

# parse and store rules
rule_data = rule_handling.RuleData()

for rule in rules.splitlines():
rule_data.new_rule(rule)

rule_data = rule_handling.get_rule_data_from_content(
self._policy_data.profile_id, self._content_path,
self._policy_data.datastream_id, self._policy_data.xccdf_id,
self._tailoring_path)
return rule_data

except common.OSCAPaddonError as e:
log.error("Failed to load SCAP content!")
_handle_error(e)

raise NonCriticalInstallationError(_(
"There was an error loading the security content:\n%s\n"
"The installation should be aborted."
) % e)

def _evaluate_rules(self, rule_data):
# evaluate rules, do automatic fixes and stop if something that cannot
# be fixed automatically is wrong
all_messages = rule_data.eval_rules(None, None)
fatal_messages = [m for m in all_messages if m.type == common.MESSAGE_TYPE_FATAL]

fatal_messages = [message for message in all_messages
if message.type == common.MESSAGE_TYPE_FATAL]
if any(fatal_messages):
raise NonCriticalInstallationError(_(
"There was a wrong configuration detected:\n%s\n"
"The installation should be aborted."
) % "\n".join(message.text for message in fatal_messages))
msg_lines = [_("Wrong configuration detected!")]
msg_lines.extend([m.text for m in fatal_messages])
terminate("\n".join(msg_lines))
return

# add packages needed on the target system to the list of packages
# that are requested to be installed
Expand Down
Loading

0 comments on commit 16c6045

Please sign in to comment.