diff --git a/tmt/base.py b/tmt/base.py index 69e0d6cf04..242e8af3a4 100644 --- a/tmt/base.py +++ b/tmt/base.py @@ -13,6 +13,7 @@ import tempfile import time from collections.abc import Iterable, Iterator, Sequence +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from typing import ( TYPE_CHECKING, Any, @@ -44,6 +45,7 @@ import tmt.log import tmt.plugins import tmt.plugins.plan_shapers +import tmt.queue import tmt.result import tmt.steps import tmt.steps.discover @@ -58,6 +60,7 @@ import tmt.utils.jira from tmt.checks import Check from tmt.lint import LinterOutcome, LinterReturn +from tmt.queue import Queue from tmt.result import Result, ResultInterpret from tmt.utils import ( Command, @@ -82,6 +85,8 @@ import tmt.steps.discover import tmt.steps.provision.local + from ._compat.typing import Self + T = TypeVar('T') @@ -1700,6 +1705,9 @@ class Plan( 'gate', ] + def step_logger(self, step_name: str) -> tmt.log.Logger: + return self._logger.descend(logger_name=step_name) + def __init__( self, *, @@ -1752,32 +1760,32 @@ def __init__( # Initialize test steps self.discover = tmt.steps.discover.Discover( - logger=logger.descend(logger_name='discover'), + logger=self.step_logger('discover'), plan=self, data=self.node.get('discover') ) self.provision = tmt.steps.provision.Provision( - logger=logger.descend(logger_name='provision'), + logger=self.step_logger('provision'), plan=self, data=self.node.get('provision') ) self.prepare = tmt.steps.prepare.Prepare( - logger=logger.descend(logger_name='prepare'), + logger=self.step_logger('prepare'), plan=self, data=self.node.get('prepare') ) self.execute = tmt.steps.execute.Execute( - logger=logger.descend(logger_name='execute'), + logger=self.step_logger('execute'), plan=self, data=self.node.get('execute') ) self.report = tmt.steps.report.Report( - logger=logger.descend(logger_name='report'), + logger=self.step_logger('report'), plan=self, data=self.node.get('report') ) self.finish = tmt.steps.finish.Finish( - logger=logger.descend(logger_name='finish'), + logger=self.step_logger('finish'), plan=self, data=self.node.get('finish') ) @@ -3406,6 +3414,102 @@ class RunData(SerializableContainer): ) +@dataclasses.dataclass +class PlanTask(tmt.queue.GuestlessTask[None]): + """ A task to run a plan """ + + plans: list[Plan] + + #: Plan that was executed. + plan: Optional[Plan] + + # Custom yet trivial `__init__` is necessary, see note in `tmt.queue.Task`. + def __init__(self, logger: tmt.log.Logger, plans: list[Plan], **kwargs: Any) -> None: + super().__init__(logger, **kwargs) + + self.plans = plans + self.plan = None + + @property + def name(self) -> str: + return fmf.utils.listed([plan.name for plan in self.plans]) + + def go(self) -> Iterator['Self']: + """ + Perform the task. + + Called by :py:class:`Queue` machinery to accomplish the task. It expects + the child class would implement :py:meth:`run`, with ``go`` taking care + of task/queue interaction. + + :yields: instances of the same class, describing invocations of the + task and their outcome. For each guest, one instance would be + yielded. + """ + + multiple_plans = len(self.plans) > 1 + + new_loggers = tmt.queue.prepare_loggers(self.logger, [plan.name for plan in self.plans]) + old_loggers: dict[str, tmt.log.Logger] = {} + + with ThreadPoolExecutor(max_workers=len(self.plans)) as executor: + futures: dict[Future[None], Plan] = {} + + for plan in self.plans: + old_loggers[plan.name] = plan._logger + new_logger = new_loggers[plan.name] + + plan.inject_logger(new_logger) + + for step_name in tmt.steps.STEPS: + getattr(plan, step_name).inject_logger(plan.step_logger(step_name)) + + if multiple_plans: + new_logger.info('started', color='cyan') + + # Submit each task/guest combination (save the guest & logger + # for later)... + futures[executor.submit(plan.go)] = plan + + # ... and then sit and wait as they get delivered to us as they + # finish. Unpack the guest and logger, so we could preserve logging + # and prepare the right outcome package. + for future in as_completed(futures): + plan = futures[future] + + old_logger = old_loggers[plan.name] + new_logger = new_loggers[plan.name] + + if multiple_plans: + new_logger.info('finished', color='cyan') + + # `Future.result()` will either 1. reraise an exception the + # callable raised, if any, or 2. return whatever the callable + # returned - which is `None` in our case, therefore we can + # ignore the return value. + try: + result = future.result() + + except SystemExit as exc: + task = dataclasses.replace(self, result=None, exc=None, requested_exit=exc) + + except Exception as exc: + task = dataclasses.replace(self, result=None, exc=exc, requested_exit=None) + + else: + task = dataclasses.replace(self, result=result, exc=None, requested_exit=None) + + task.plan = plan + + yield task + + # Don't forget to restore the original logger. + plan.inject_logger(old_logger) + + for step_name in tmt.steps.STEPS: + getattr(plan, step_name).inject_logger(plan.step_logger(step_name)) + + class Run(tmt.utils.Common): """ Test run, a container of plans """ @@ -3615,9 +3719,9 @@ def plans(self) -> Sequence[Plan]: return self._plans @functools.cached_property - def plan_queue(self) -> Sequence[Plan]: + def plan_staging_queue(self) -> Sequence[Plan]: """ - A list of plans remaining to be executed. + A list of plans remaining to be queued by run and executed. It is being populated via :py:attr:`plans`, but eventually, :py:meth:`go` will remove plans from it as they get processed. @@ -3636,7 +3740,7 @@ def swap_plans(self, plan: Plan, *others: Plan) -> None: """ plans = cast(list[Plan], self.plans) - plan_queue = cast(list[Plan], self.plan_queue) + plan_queue = cast(list[Plan], self.plan_staging_queue) if plan in plan_queue: plan_queue.remove(plan) @@ -3805,27 +3909,49 @@ def go(self) -> None: self.verbose(f"Found {listed(self.plans, 'plan')}.") self.save() - # Iterate over plans - crashed_plans: list[tuple[Plan, Exception]] = [] + queue: Queue[PlanTask] = Queue( + 'plans', + self._logger.descend(logger_name=f'{self}.queue')) - while self.plan_queue: - plan = cast(list[Plan], self.plan_queue).pop(0) + failed_tasks: list[PlanTask] = [] - try: - plan.go() + def _enqueue_new_plans() -> None: + staging_queue = self.plan_staging_queue[:] - except Exception as exc: - if self.opt('on-plan-error') == 'quit': - raise tmt.utils.GeneralError( - 'plan failed', - causes=[exc]) + if not staging_queue: + return + + queue.enqueue_task(PlanTask( + self._logger, + cast(list[Plan], staging_queue) + )) + + for plan in staging_queue: + cast(list[Plan], self.plan_staging_queue).remove(plan) - crashed_plans.append((plan, exc)) + _enqueue_new_plans() - if crashed_plans: + for outcome in queue.run(stop_on_error=False): + _enqueue_new_plans() + + if outcome.exc: + outcome.logger.fail(str(outcome.exc)) + + failed_tasks.append(outcome) + continue + + if failed_tasks: raise tmt.utils.GeneralError( 'plan failed', - causes=[exc for _, exc in crashed_plans]) + causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None]) + +# crashed_plans: list[tuple[Plan, Exception]] = [] + +# except Exception as exc: +# if self.opt('on-plan-error') == 'quit': +# raise tmt.utils.GeneralError( +# 'plan failed', +# causes=[exc]) # Update the last run id at the very end # (override possible runs created during execution) diff --git a/tmt/queue.py b/tmt/queue.py index 63107a1198..95253fb424 100644 --- a/tmt/queue.py +++ b/tmt/queue.py @@ -1,5 +1,7 @@ import dataclasses import functools +import itertools +import queue from collections.abc import Iterator from concurrent.futures import Future, ThreadPoolExecutor, as_completed from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar @@ -33,6 +35,8 @@ class Task(Generic[TaskResultT]): to their defaults "manually". """ + id: Optional[int] + #: A logger to use for logging events related to the outcome. logger: Logger @@ -292,26 +296,31 @@ def go(self) -> Iterator['Self']: guest.inject_logger(old_logger) -class Queue(list[TaskT]): +class Queue(queue.Queue[TaskT]): """ Queue class for running tasks """ + _task_counter: 'itertools.count[int]' + def __init__(self, name: str, logger: Logger) -> None: super().__init__() self.name = name self._logger = logger + self._task_counter = itertools.count(start=1) def enqueue_task(self, task: TaskT) -> None: """ Put new task into a queue """ - self.append(task) + task.id = next(self._task_counter) + + self.put(task) self._logger.info( - f'queued {self.name} task #{len(self)}', + f'queued {self.name} task #{task.id}', task.name, color='cyan') - def run(self) -> Iterator[TaskT]: + def run(self, stop_on_error: bool = True) -> Iterator[TaskT]: """ Start crunching the queued tasks. @@ -319,11 +328,17 @@ def run(self) -> Iterator[TaskT]: combination a :py:class:`Task` instance is yielded. """ - for i, task in enumerate(self): + while True: + try: + task = self.get_nowait() + + except queue.Empty: + return + self._logger.info('') self._logger.info( - f'{self.name} task #{i + 1}', + f'{self.name} task #{task.id}', task.name, color='cyan') @@ -336,5 +351,5 @@ def run(self) -> Iterator[TaskT]: yield outcome # TODO: make this optional - if failed_tasks: + if failed_tasks and stop_on_error: return diff --git a/tmt/steps/provision/__init__.py b/tmt/steps/provision/__init__.py index 0d33ad6048..02f1670078 100644 --- a/tmt/steps/provision/__init__.py +++ b/tmt/steps/provision/__init__.py @@ -2274,6 +2274,7 @@ def go(self) -> Iterator['ProvisionTask']: except SystemExit as exc: yield ProvisionTask( + id=None, logger=new_logger, result=None, guest=None, @@ -2284,6 +2285,7 @@ def go(self) -> Iterator['ProvisionTask']: except Exception as exc: yield ProvisionTask( + id=None, logger=new_logger, result=None, guest=None, @@ -2294,6 +2296,7 @@ def go(self) -> Iterator['ProvisionTask']: else: yield ProvisionTask( + id=None, logger=new_logger, result=None, guest=phase.guest(), @@ -2316,6 +2319,7 @@ def enqueue( phases: list[ProvisionPlugin[ProvisionStepData]], logger: Logger) -> None: self.enqueue_task(ProvisionTask( + id=None, logger=logger, result=None, guest=None,