Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Develop][Draft] Group test starccm and openfoam to share the same cluster #6321

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 112 additions & 2 deletions tests/integration-tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from functools import partial
from itertools import product
from shutil import copyfile
from time import sleep
from traceback import format_tb
from typing import Any, Dict, List, Optional, Union

Expand Down Expand Up @@ -366,6 +367,103 @@ def _setup_custom_logger(log_file):
logger.addHandler(file_handler)


class SharedClusterDetectionTimeoutError(Exception):
"""Custom exception for shared cluster detection timeout."""

pass


class ClusterManager:
"""Cluster Manager for shared cluster fixture to avoid AttributeError: Can't pickle local object"""

def __init__(self, request, factory):
self.request = request
self.factory = factory

def cluster_factory(
self,
cluster_config,
region,
instance,
os,
scheduler,
upper_case_cluster_name=False,
custom_cli_credentials=None,
**kwargs,
):
"""Create cluster or use existing cluster."""
cluster_key = f"{region}-{instance}-{os}-{scheduler}"
request = self.request
factory = self.factory
logging.info(
"Eligible for using shared cluster, start to detect."
if cluster_key in request.session.shared_existing_clusters_started_to_create
else "Start to create shared cluster for specific region, instance type, os and scheduler"
)
if cluster_key in request.session.shared_existing_clusters_started_to_create:
for retry in range(40):
if cluster_key in request.session.shared_existing_clusters:
logging.info(
f"Shared cluster {request.session.shared_existing_clusters[cluster_key].name} detected."
)
return request.session.shared_existing_clusters[cluster_key]
else:
logging.info(f"Shared cluster not detected yet. Retrying... ({retry + 1}/40)")
sleep(60)
raise SharedClusterDetectionTimeoutError(
"Timeout: Failed to detect the shared cluster within the allowed retries."
)

request.session.shared_existing_clusters_started_to_create.add(cluster_key)
cluster_config = _write_config_to_outdir(request, cluster_config, "clusters_configs")
cluster = Cluster(
name=(
request.config.getoption("cluster")
if request.config.getoption("cluster")
else "integ-tests-{0}{1}{2}".format(
random_alphanumeric().upper() if upper_case_cluster_name else random_alphanumeric(),
"-" if request.config.getoption("stackname_suffix") else "",
request.config.getoption("stackname_suffix"),
)
),
config_file=cluster_config,
ssh_key=request.config.getoption("key_path"),
region=region,
custom_cli_credentials=custom_cli_credentials,
)
if not request.config.getoption("cluster"):
cluster.creation_response = factory.create_cluster(cluster, **kwargs)
request.session.shared_existing_clusters[cluster_key] = cluster
return cluster


@xdist_session_fixture(autouse=True)
@pytest.mark.usefixtures("setup_credentials")
def shared_clusters_factory(request):
"""
Define a fixture to manage the creation and destruction of session shared clusters.

The configs used to create clusters are dumped to output_dir/clusters_configs/{test_name}.config
"""
factory = ClustersFactory(delete_logs_on_success=request.config.getoption("delete_logs_on_success"))

if not hasattr(request.session, "shared_existing_clusters"):
logging.info("Setting shared_existing_clusters_started_to_create and shared_existing_clusters")
request.session.shared_existing_clusters = {}
request.session.shared_existing_clusters_started_to_create = set()

manager = ClusterManager(request, factory)

yield manager

if not request.config.getoption("no_delete"):
try:
test_passed = request.node.rep_call.passed
except AttributeError:
test_passed = False
factory.destroy_all_clusters(test_passed=test_passed)


@pytest.fixture(scope="class")
@pytest.mark.usefixtures("setup_credentials")
def clusters_factory(request, region):
Expand Down Expand Up @@ -509,9 +607,21 @@ def _write_config_to_outdir(request, config, config_dir):
out_dir = request.config.getoption("output_dir")

# Sanitize config file name to make it Windows compatible
# request.node.nodeid example:
# class scope request.node.nodeid example:
# 'dcv/test_dcv.py::test_dcv_configuration[eu-west-1-c5.xlarge-centos7-slurm-8443-0.0.0.0/0-/shared]'
test_file, test_name = request.node.nodeid.split("::", 1)
# module scope request.node.nodeid example:
# 'performance_tests/test_starccm_and_openfoam.py'
# TODO: Find a better way to name module_scope_test/session_scope_test
logging.info(f"request.node.nodeid: {request.node.nodeid}")
nodeid_parts = request.node.nodeid.split("::")
if len(nodeid_parts) == 2:
test_file, test_name = nodeid_parts
elif len(nodeid_parts) == 1:
test_file = nodeid_parts[0]
test_name = "module_scope_test" + random_alphanumeric()
else:
raise ValueError(f"Unexpected nodeid format: {request.node.nodeid}")

config_file_name = "{0}-{1}".format(test_file, test_name.replace("/", "_"))

os.makedirs(
Expand Down
27 changes: 27 additions & 0 deletions tests/integration-tests/tests/performance_tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
PYTEST_PARAMETERIZE_VALUES = [(NUM_COMPUTE_NODES, 1)]
TEST_RUNNER_SCRIPT = "/shared/assets/workloads/scale-test/run-scale-test.sh"
ROUND_UP_FACTOR = 100_000_000
PERF_TEST_DIFFERENCE_TOLERANCE = 3

METRICS = [
dict(name="jobRunTime", unit="ms"),
Expand Down Expand Up @@ -222,3 +223,29 @@ def write_results_to_output_dir(
paths["baseline"]["statistics.json"],
paths[candidate_configuration]["statistics.json"],
)


def perf_test_difference(observed_value, baseline_value):
percentage_difference = 100 * (observed_value - baseline_value) / baseline_value
return percentage_difference


def _log_output_performance_difference(node, performance_degradation, observed_value, baseline_value):
percentage_difference = perf_test_difference(observed_value, baseline_value)
if percentage_difference < 0:
outcome = "improvement"
elif percentage_difference == 0:
outcome = "matching baseline"
elif percentage_difference <= PERF_TEST_DIFFERENCE_TOLERANCE:
outcome = "degradation (within tolerance)"
else:
outcome = "degradation (above tolerance)"
performance_degradation[node] = {
"baseline": baseline_value,
"observed": observed_value,
"percentage_difference": percentage_difference,
}
logging.info(
f"Nodes: {node}, Baseline: {baseline_value} seconds, Observed: {observed_value} seconds, "
f"Percentage difference: {percentage_difference}%, Outcome: {outcome}"
)

This file was deleted.

Loading
Loading