Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run plans in parallel #3265

Draft
wants to merge 1 commit into
base: poc-split-plan-by-max
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 149 additions & 23 deletions tmt/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import tempfile
import time
from collections.abc import Iterable, Iterator, Sequence
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -44,6 +45,7 @@
import tmt.log
import tmt.plugins
import tmt.plugins.plan_shapers
import tmt.queue
import tmt.result
import tmt.steps
import tmt.steps.discover
Expand All @@ -58,6 +60,7 @@
import tmt.utils.jira
from tmt.checks import Check
from tmt.lint import LinterOutcome, LinterReturn
from tmt.queue import Queue
from tmt.result import Result, ResultInterpret
from tmt.utils import (
Command,
Expand All @@ -82,6 +85,8 @@
import tmt.steps.discover
import tmt.steps.provision.local

from ._compat.typing import Self


T = TypeVar('T')

Expand Down Expand Up @@ -1700,6 +1705,9 @@ class Plan(
'gate',
]

def step_logger(self, step_name: str) -> tmt.log.Logger:
return self._logger.descend(logger_name=step_name)

def __init__(
self,
*,
Expand Down Expand Up @@ -1752,32 +1760,32 @@ def __init__(

# Initialize test steps
self.discover = tmt.steps.discover.Discover(
logger=logger.descend(logger_name='discover'),
logger=self.step_logger('discover'),
plan=self,
data=self.node.get('discover')
)
self.provision = tmt.steps.provision.Provision(
logger=logger.descend(logger_name='provision'),
logger=self.step_logger('provision'),
plan=self,
data=self.node.get('provision')
)
self.prepare = tmt.steps.prepare.Prepare(
logger=logger.descend(logger_name='prepare'),
logger=self.step_logger('prepare'),
plan=self,
data=self.node.get('prepare')
)
self.execute = tmt.steps.execute.Execute(
logger=logger.descend(logger_name='execute'),
logger=self.step_logger('execute'),
plan=self,
data=self.node.get('execute')
)
self.report = tmt.steps.report.Report(
logger=logger.descend(logger_name='report'),
logger=self.step_logger('report'),
plan=self,
data=self.node.get('report')
)
self.finish = tmt.steps.finish.Finish(
logger=logger.descend(logger_name='finish'),
logger=self.step_logger('finish'),
plan=self,
data=self.node.get('finish')
)
Expand Down Expand Up @@ -3406,6 +3414,102 @@ class RunData(SerializableContainer):
)


@dataclasses.dataclass
class PlanTask(tmt.queue.GuestlessTask[None]):
""" A task to run a plan """

plans: list[Plan]

#: Plan that was executed.
plan: Optional[Plan]

# Custom yet trivial `__init__` is necessary, see note in `tmt.queue.Task`.
def __init__(self, logger: tmt.log.Logger, plans: list[Plan], **kwargs: Any) -> None:
super().__init__(logger, **kwargs)

self.plans = plans
self.plan = None

@property
def name(self) -> str:
return fmf.utils.listed([plan.name for plan in self.plans])

def go(self) -> Iterator['Self']:
"""
Perform the task.

Called by :py:class:`Queue` machinery to accomplish the task. It expects
the child class would implement :py:meth:`run`, with ``go`` taking care
of task/queue interaction.

:yields: instances of the same class, describing invocations of the
task and their outcome. For each guest, one instance would be
yielded.
"""

multiple_plans = len(self.plans) > 1

new_loggers = tmt.queue.prepare_loggers(self.logger, [plan.name for plan in self.plans])
old_loggers: dict[str, tmt.log.Logger] = {}

with ThreadPoolExecutor(max_workers=len(self.plans)) as executor:
futures: dict[Future[None], Plan] = {}

for plan in self.plans:
old_loggers[plan.name] = plan._logger
new_logger = new_loggers[plan.name]

plan.inject_logger(new_logger)

for step_name in tmt.steps.STEPS:
getattr(plan, step_name).inject_logger(plan.step_logger(step_name))

if multiple_plans:
new_logger.info('started', color='cyan')

# Submit each task/guest combination (save the guest & logger
# for later)...
futures[executor.submit(plan.go)] = plan

# ... and then sit and wait as they get delivered to us as they
# finish. Unpack the guest and logger, so we could preserve logging
# and prepare the right outcome package.
for future in as_completed(futures):
plan = futures[future]

old_logger = old_loggers[plan.name]
new_logger = new_loggers[plan.name]

if multiple_plans:
new_logger.info('finished', color='cyan')

# `Future.result()` will either 1. reraise an exception the
# callable raised, if any, or 2. return whatever the callable
# returned - which is `None` in our case, therefore we can
# ignore the return value.
try:
result = future.result()

except SystemExit as exc:
task = dataclasses.replace(self, result=None, exc=None, requested_exit=exc)

except Exception as exc:
task = dataclasses.replace(self, result=None, exc=exc, requested_exit=None)

else:
task = dataclasses.replace(self, result=result, exc=None, requested_exit=None)

task.plan = plan

yield task

# Don't forget to restore the original logger.
plan.inject_logger(old_logger)

for step_name in tmt.steps.STEPS:
getattr(plan, step_name).inject_logger(plan.step_logger(step_name))


class Run(tmt.utils.Common):
""" Test run, a container of plans """

Expand Down Expand Up @@ -3615,9 +3719,9 @@ def plans(self) -> Sequence[Plan]:
return self._plans

@functools.cached_property
def plan_queue(self) -> Sequence[Plan]:
def plan_staging_queue(self) -> Sequence[Plan]:
"""
A list of plans remaining to be executed.
A list of plans remaining to be queued by run and executed.

It is being populated via :py:attr:`plans`, but eventually,
:py:meth:`go` will remove plans from it as they get processed.
Expand All @@ -3636,7 +3740,7 @@ def swap_plans(self, plan: Plan, *others: Plan) -> None:
"""

plans = cast(list[Plan], self.plans)
plan_queue = cast(list[Plan], self.plan_queue)
plan_queue = cast(list[Plan], self.plan_staging_queue)

if plan in plan_queue:
plan_queue.remove(plan)
Expand Down Expand Up @@ -3805,27 +3909,49 @@ def go(self) -> None:
self.verbose(f"Found {listed(self.plans, 'plan')}.")
self.save()

# Iterate over plans
crashed_plans: list[tuple[Plan, Exception]] = []
queue: Queue[PlanTask] = Queue(
'plans',
self._logger.descend(logger_name=f'{self}.queue'))

while self.plan_queue:
plan = cast(list[Plan], self.plan_queue).pop(0)
failed_tasks: list[PlanTask] = []

try:
plan.go()
def _enqueue_new_plans() -> None:
staging_queue = self.plan_staging_queue[:]

except Exception as exc:
if self.opt('on-plan-error') == 'quit':
raise tmt.utils.GeneralError(
'plan failed',
causes=[exc])
if not staging_queue:
return

queue.enqueue_task(PlanTask(
self._logger,
cast(list[Plan], staging_queue)
))

for plan in staging_queue:
cast(list[Plan], self.plan_staging_queue).remove(plan)

crashed_plans.append((plan, exc))
_enqueue_new_plans()

if crashed_plans:
for outcome in queue.run(stop_on_error=False):
_enqueue_new_plans()

if outcome.exc:
outcome.logger.fail(str(outcome.exc))

failed_tasks.append(outcome)
continue

if failed_tasks:
raise tmt.utils.GeneralError(
'plan failed',
causes=[exc for _, exc in crashed_plans])
causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None])

# crashed_plans: list[tuple[Plan, Exception]] = []

# except Exception as exc:
# if self.opt('on-plan-error') == 'quit':
# raise tmt.utils.GeneralError(
# 'plan failed',
# causes=[exc])

# Update the last run id at the very end
# (override possible runs created during execution)
Expand Down
29 changes: 22 additions & 7 deletions tmt/queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import dataclasses
import functools
import itertools
import queue
from collections.abc import Iterator
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar
Expand Down Expand Up @@ -33,6 +35,8 @@ class Task(Generic[TaskResultT]):
to their defaults "manually".
"""

id: Optional[int]

#: A logger to use for logging events related to the outcome.
logger: Logger

Expand Down Expand Up @@ -292,38 +296,49 @@ def go(self) -> Iterator['Self']:
guest.inject_logger(old_logger)


class Queue(list[TaskT]):
class Queue(queue.Queue[TaskT]):
""" Queue class for running tasks """

_task_counter: 'itertools.count[int]'

def __init__(self, name: str, logger: Logger) -> None:
super().__init__()

self.name = name
self._logger = logger
self._task_counter = itertools.count(start=1)

def enqueue_task(self, task: TaskT) -> None:
""" Put new task into a queue """

self.append(task)
task.id = next(self._task_counter)

self.put(task)

self._logger.info(
f'queued {self.name} task #{len(self)}',
f'queued {self.name} task #{task.id}',
task.name,
color='cyan')

def run(self) -> Iterator[TaskT]:
def run(self, stop_on_error: bool = True) -> Iterator[TaskT]:
"""
Start crunching the queued tasks.

Tasks are executed in the order, for each task/guest
combination a :py:class:`Task` instance is yielded.
"""

for i, task in enumerate(self):
while True:
try:
task = self.get_nowait()

except queue.Empty:
return

self._logger.info('')

self._logger.info(
f'{self.name} task #{i + 1}',
f'{self.name} task #{task.id}',
task.name,
color='cyan')

Expand All @@ -336,5 +351,5 @@ def run(self) -> Iterator[TaskT]:
yield outcome

# TODO: make this optional
if failed_tasks:
if failed_tasks and stop_on_error:
return
4 changes: 4 additions & 0 deletions tmt/steps/provision/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2274,6 +2274,7 @@ def go(self) -> Iterator['ProvisionTask']:

except SystemExit as exc:
yield ProvisionTask(
id=None,
logger=new_logger,
result=None,
guest=None,
Expand All @@ -2284,6 +2285,7 @@ def go(self) -> Iterator['ProvisionTask']:

except Exception as exc:
yield ProvisionTask(
id=None,
logger=new_logger,
result=None,
guest=None,
Expand All @@ -2294,6 +2296,7 @@ def go(self) -> Iterator['ProvisionTask']:

else:
yield ProvisionTask(
id=None,
logger=new_logger,
result=None,
guest=phase.guest(),
Expand All @@ -2316,6 +2319,7 @@ def enqueue(
phases: list[ProvisionPlugin[ProvisionStepData]],
logger: Logger) -> None:
self.enqueue_task(ProvisionTask(
id=None,
logger=logger,
result=None,
guest=None,
Expand Down
Loading