diff --git a/lisa/messages.py b/lisa/messages.py index 9112d161d0..add59a3fb1 100644 --- a/lisa/messages.py +++ b/lisa/messages.py @@ -125,6 +125,22 @@ class PerfMessage(MessageBase): role: str = "" test_result_id: str = "" + # Sysbench common params + min_latency_ms: float = 0.00 + max_latency_ms: float = 0.00 + avg_latency_ms: float = 0.00 + percentile_95_latency_ms: float = 0.00 + sum_latency_ms: float = 0.00 + total_time: float = 0.00 + events_avg: float = 0.00 + events_stddev: float = 0.00 + execution_time_avg: float = 0.00 + execution_time_stddev: float = 0.00 + total_events: Decimal = Decimal(0) + threads: int = 0 + events: int = 0 + time_limit_sec: int = 0 + T = TypeVar("T", bound=PerfMessage) @@ -149,6 +165,12 @@ class PerfMessage(MessageBase): ) +@dataclass +class CPUPerformanceMessage(PerfMessage): + cpu_max_prime: Decimal = Decimal(0) + cpu_speed: Decimal = Decimal(0) + + @dataclass class DiskPerformanceMessage(PerfMessage): disk_setup_type: DiskSetupType = DiskSetupType.raw @@ -168,6 +190,38 @@ class DiskPerformanceMessage(PerfMessage): randwrite_iops: Decimal = Decimal(0) randwrite_lat_usec: Decimal = Decimal(0) + # Sysbench FileIO params + read_io_per_sec: Decimal = Decimal(0) + write_io_per_sec: Decimal = Decimal(0) + read_mib_per_sec: Decimal = Decimal(0) + write_mib_per_sec: Decimal = Decimal(0) + fsyncs_per_sec: Decimal = Decimal(0) + file_fsync_all: str = "" + file_fsync_end: str = "" + total_file: int = 0 + file_total_size_in_gb: int = 0 + file_async_backlog: int = 0 + file_fsync_freq: int = 0 + file_merged_requests: int = 0 + file_rw_ratio: float = 0 + file_ops: str = "" + file_io_mode: str = "" + file_fsync_mode: str = "" + + +@dataclass +class MemoryPerformanceMessage(PerfMessage): + total_operations: Decimal = Decimal(0) + total_mib_transferred: int = 0 + block_size_in_kb: int = 0 + memory_total_size_in_gb: int = 0 + mib_per_second: float = 0.0 + operations_per_second: float = 0 + scope: str = "" + hugetlb_on: bool = False + access_mode: str = "" + operation: str = "" + @dataclass class NetworkLatencyPerformanceMessage(PerfMessage): diff --git a/lisa/tools/__init__.py b/lisa/tools/__init__.py index 4dc2e237ce..4caf8e0836 100644 --- a/lisa/tools/__init__.py +++ b/lisa/tools/__init__.py @@ -102,6 +102,7 @@ from .strace import Strace from .stress_ng import StressNg from .swap import Swap +from .sysbench import Sysbench from .sysctl import Sysctl from .systemd_analyze import SystemdAnalyze from .tar import Tar @@ -225,6 +226,7 @@ "Strace", "StressNg", "Swap", + "Sysbench", "Sysctl", "SystemdAnalyze", "Tar", diff --git a/lisa/tools/sysbench.py b/lisa/tools/sysbench.py new file mode 100644 index 0000000000..59868df82c --- /dev/null +++ b/lisa/tools/sysbench.py @@ -0,0 +1,481 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. +import inspect +import re +from typing import Any, Dict, List, cast + +from lisa import notifier +from lisa.executable import Tool +from lisa.messages import ( + CPUPerformanceMessage, + DiskPerformanceMessage, + MemoryPerformanceMessage, + create_perf_message, +) +from lisa.operating_system import Posix +from lisa.util import LisaException +from lisa.util.process import ExecutableResult + + +class Sysbench(Tool): + @property + def command(self) -> str: + return "sysbench" + + @property + def can_install(self) -> bool: + return True + + def install(self) -> bool: + posix_os: Posix = cast(Posix, self.node.os) + posix_os.install_packages("sysbench") + + return self._check_exists() + + def __run_sysbench_test( + self, + test_type: str, + threads: int, + events: int, + time_limit: int, + validation: str, + extra_args: str = "", + ) -> ExecutableResult: + args = ( + f"{test_type} run --threads={threads} --events={events}" + f" --validate={validation} --percentile=95" + f" --verbosity=5 --histogram=on" + f" --time={time_limit} --debug=on {extra_args}" + ) + + result: ExecutableResult = self.run( + args, + expected_exit_code=0, + expected_exit_code_failure_message="Sysbench Test failed", + ) + return result + + def run_cpu_perf( + self, + test_result: Any, + threads: int = 1, + events: int = 0, + cpu_max_prime: int = 10000, + time_limit: int = 10, + ) -> None: + extra_args: str = f" --cpu-max-prime={cpu_max_prime}" + perf_msg: Dict[Any, Any] = {} + testcase_name: str = inspect.stack()[1][3] + res = self.__run_sysbench_test( + test_type="cpu", + threads=threads, + events=events, + time_limit=time_limit, + validation="on", + extra_args=extra_args, + ) + perf_msg = self.__process_cpu_perf_result(res.stdout) + perf_msg["test_case_name"] = testcase_name + + self.__send_subtest_msg( + message_type=CPUPerformanceMessage, + test_name=testcase_name, + test_result=test_result, + other_fields=perf_msg, + ) + + def run_fileio_perf( + self, + operation: str, + test_result: Any, + threads: int = 1, + events: int = 0, + time_limit: int = 10, + file_io_mode: str = "sync", + file_fsync_all: str = "off", + file_fsync_mode: str = "fsync", + file_fsync_end: bool = False, + total_file: int = 128, + block_size_in_kb: int = 16, + file_total_size_in_gb: int = 2, + file_async_backlog: int = 128, + file_fsync_freq: int = 100, + file_merged_requests: int = 0, + file_rw_ratio: float = 1.5, + ) -> None: + total_size = file_total_size_in_gb * 1024 * 1024 * 1024 + block_size = block_size_in_kb * 1024 + valid_io_mode: List[str] = ["sync", "async", "mmap"] + valid_fsync_mode: List[str] = ["fsync", "fdatasync"] + valid_test_mode: Dict[str, str] = { + "seqwr": "write", + "seqrd": "read", + "rndrd": "read", + "rndwr": "write", + "seqrewr": "all", + "rndrw": "all", + } + perf_msg: Dict[Any, Any] = {} + testcase_name: str = inspect.stack()[1][3] + fsync_end: str = "on" if file_fsync_end else "off" + + if operation not in valid_test_mode.keys(): + raise LisaException( + f"Invalid test_mode. Valid test_mode: {valid_test_mode.keys()}" + ) + elif file_io_mode not in valid_io_mode: + raise LisaException( + f"Invalid file_io_mode. Valid file_io_mode: {valid_io_mode}" + ) + elif file_fsync_mode not in valid_fsync_mode: + raise LisaException( + f"Invalid file_fsync_mode. Valid file_fsync_mode: {valid_fsync_mode}" + ) + + extra_args: str = ( + f" --file-test-mode={operation} --file-num={total_file}" + f" --file-block-size={block_size} --file-total-size={total_size}" + f" --file-io-mode={file_io_mode} --file-async-backlog={file_async_backlog}" + f" --file-fsync-freq={file_fsync_freq} --file-fsync-all={file_fsync_all}" + f" --file-fsync-end={fsync_end} --file-fsync-mode={file_fsync_mode}" + f" --file-merged-requests={file_merged_requests}" + f" --file-rw-ratio={file_rw_ratio}" + ) + + self.run( + f"fileio prepare --file-total-size={total_size} --file-num={total_file}", + force_run=True, + ) + + res = self.__run_sysbench_test( + test_type="fileio", + threads=threads, + events=events, + time_limit=time_limit, + validation="off", + extra_args=extra_args, + ) + + self.run( + f"fileio cleanup --file-total-size={total_size} --file-num={total_file}", + force_run=True, + ) + + perf_msg = self.__process_fileio_perf_result( + res.stdout, + valid_test_mode.get(operation, ""), + ) + perf_msg["file_fsync_all"] = file_fsync_all + perf_msg["file_fsync_end"] = file_fsync_end + perf_msg["total_file"] = total_file + perf_msg["file_total_size_in_gb"] = file_total_size_in_gb + perf_msg["file_async_backlog"] = file_async_backlog + perf_msg["file_fsync_freq"] = file_fsync_freq + perf_msg["file_merged_requests"] = file_merged_requests + perf_msg["file_rw_ratio"] = file_rw_ratio + perf_msg["file_ops"] = operation + perf_msg["file_io_mode"] = file_io_mode + perf_msg["file_fsync_mode"] = file_fsync_mode + perf_msg["test_case_name"] = testcase_name + + self.__send_subtest_msg( + message_type=DiskPerformanceMessage, + test_name=testcase_name, + test_result=test_result, + other_fields=perf_msg, + ) + + def run_memory_perf( + self, + test_result: Any, + threads: int = 1, + events: int = 0, + time_limit: int = 10, + memory_block_size_in_kb: int = 1, + memory_total_size_in_gb: int = 100, + memory_hugetlb: bool = False, + memory_oper: str = "write", + memory_access_mode: str = "seq", + memory_scope: str = "global", + ) -> None: + block_size = memory_block_size_in_kb * 1024 + total_mem_size = memory_total_size_in_gb * 1024 * 1024 * 1024 + + valid_mem_scope: List[str] = ["global", "local"] + valid_mem_operation: List[str] = ["read", "write", "none"] + valid_mem_access_mode: List[str] = ["seq", "rnd"] + perf_msg: Dict[Any, Any] = {} + testcase_name: str = inspect.stack()[1][3] + hugetlb: str = "off" + + if memory_scope not in valid_mem_scope: + raise LisaException( + f"Invalid memory_scope. Valid memory_scope: {valid_mem_scope}" + ) + elif memory_oper not in valid_mem_operation: + raise LisaException( + f"Invalid memory_oper. Valid memory_oper: {valid_mem_operation}" + ) + elif memory_access_mode not in valid_mem_access_mode: + raise LisaException( + "Invalid memory_access_mode." + f"Valid memory_access_mode: {valid_mem_access_mode}" + ) + + if memory_hugetlb: + hugetlb = "on" + + extra_args: str = ( + f" --memory-block-size={block_size} --memory-total-size={total_mem_size}" + f" --memory-scope={memory_scope} --memory-hugetlb={hugetlb}" + f" --memory-oper={memory_oper} --memory-access-mode={memory_access_mode}" + ) + + res = self.__run_sysbench_test( + test_type="memory", + threads=threads, + events=events, + time_limit=time_limit, + validation="on", + extra_args=extra_args, + ) + + perf_msg = self.__process_memory_perf_result(res.stdout) + perf_msg["block_size_in_kb"] = memory_block_size_in_kb + perf_msg["memory_total_size_in_gb"] = memory_total_size_in_gb + perf_msg["scope"] = memory_scope + perf_msg["hugetlb_on"] = memory_hugetlb + perf_msg["access_mode"] = memory_access_mode + perf_msg["operation"] = memory_oper + perf_msg["threads"] = threads + perf_msg["events"] = events + perf_msg["time_limit_sec"] = time_limit + perf_msg["test_case_name"] = inspect.stack()[1][3] + + self.__send_subtest_msg( + message_type=MemoryPerformanceMessage, + test_name=testcase_name, + test_result=test_result, + other_fields=perf_msg, + ) + + def __process_perf_result( + self, + data: str, + ) -> Dict[Any, Any]: + # Sample Output + # ================ + # General statistics: + # total time: 10.0005s + # total number of events: 27617 + + # Latency (ms): + # min: 0.33 + # avg: 0.36 + # max: 10.14 + # 95th percentile: 0.43 + # sum: 9988.94 + + # Threads fairness: + # events (avg/stddev): 27617.0000/0.00 + # execution time (avg/stddev): 9.9889/0.00 + + # DEBUG: Verbose per-thread statistics: + + # DEBUG: thread # 0: min: 0.0003s avg: 0.0004s max: 0.0101s events: 27617 + # DEBUG: total time taken by event execution: 9.9889s + + non_debug_pattern = r"^(?!.*DEBUG: ).*$" + + result: Dict[Any, Any] = {} + + # Find all non-DEBUG lines in the text + non_debug_lines = "\n".join( + re.findall( + non_debug_pattern, + data, + re.MULTILINE, + ) + ) + + # Extract total time using regular expression + total_time_pattern = re.compile(r"total time:\s+(?P[\d.]+s)") + match = total_time_pattern.search(non_debug_lines) + if match: + result["total_time"] = match.group("total_time") + + # Extract total number of events using regular expression + total_events_pattern = re.compile( + r"total number of events:\s+(?P\d+)" + ) + match = total_events_pattern.search(non_debug_lines) + if match: + result["total_events"] = match.group("total_events") + + # Extract latency information using regular expressions + latency_param = "latency_ms" + latency_metrics_pattern = re.compile( + r"(?Pmin|avg|max|95th percentile|sum):\s+(?P[\d.]+)" + ) + matches = latency_metrics_pattern.findall(non_debug_lines) + for match in matches: + if match: + metric = match[0].strip().replace(" ", "_") + if match[0] == "95th percentile": + metric = "percentile_95" + value = float(match[1]) + result[f"{metric}_{latency_param}"] = value + + thread_events_pattern = re.compile( + r"events \(avg/stddev\):\s+(?P[\d.]+)/(?P[\d.]+)" + ) + thread_events = thread_events_pattern.search(non_debug_lines) + if thread_events: + result["events_avg"] = thread_events.group("events_avg") + result["events_stddev"] = thread_events.group("events_stddev") + + thread_exec_time_pattern = re.compile( + r"execution time \(avg/stddev\):\s+(?P[\d.]+)/(?P[\d.]+)" + ) + exec_time = thread_exec_time_pattern.search(non_debug_lines) + if exec_time: + result["execution_time_avg"] = exec_time.group("avg") + result["execution_time_stddev"] = exec_time.group("std_dev") + + return result + + def __process_memory_perf_result( + self, + data: str, + ) -> Dict[Any, Any]: + result: Dict[Any, Any] = self.__process_perf_result(data) + + total_operations = None + operations_per_second = None + total_mib_transferred = None + mib_per_second = None + + # Extract Total operations and operations per second + # Sample Output + # Total operations: 65730837 (6571036.01 per second) + total_operations_pattern = re.compile( + r"Total operations: (?P\d+) \((?P[\d.]+) per second\)" + ) + match = total_operations_pattern.search(data) + if match: + total_operations = int(match.group("operation")) + operations_per_second = float(match.group("per_sec")) + + # Extract Total MiB transferred and MiB per second + # Sample Output + # 64190.27 MiB transferred (6417.03 MiB/sec) + total_mib_transferred_pattern = re.compile( + r"(?P[\d.]+) MiB transferred \((?P[\d.]+) MiB/sec\)" + ) + match = total_mib_transferred_pattern.search(data) + if match: + total_mib_transferred = float(match.group("total_mib")) + mib_per_second = float(match.group("per_sec")) + + result["total_operations"] = total_operations + result["operations_per_second"] = operations_per_second + result["total_mib_transferred"] = total_mib_transferred + result["mib_per_second"] = mib_per_second + + return result + + def __process_cpu_perf_result( + self, + data: str, + ) -> Dict[Any, Any]: + result: Dict[Any, Any] = self.__process_perf_result(data) + + # Extract CPU speed using regular expression, please refer below output + # CPU speed: + # events per second: 2770.19 + cpu_speed_pattern = re.compile( + r"events per second:\s+(?P[\d.]+)" + ) + match = cpu_speed_pattern.search(data) + if match: + result["cpu_speed"] = match.group("event_per_sec") + + return result + + def __process_fileio_perf_result( + self, + data: str, + ops: str, + ) -> Dict[Any, Any]: + result: Dict[Any, Any] = self.__process_perf_result(data) + + # Sample Output + # ================ + # File operations: + # reads/s: 2717.15 + # writes/s: 1811.43 + # fsyncs/s: 45.39 + + # Throughput: + # read, MiB/s: 42.46 + # written, MiB/s: 28.30 + + if ops == "write" or ops == "all": + reg_ex_io_per_sec = re.compile(r"writes/s:\s+(?P[\d.]+)") + io_per_sec = reg_ex_io_per_sec.search(data) + if io_per_sec: + result["write_io_per_sec"] = io_per_sec.group("write_per_sec") + else: + result["write_io_per_sec"] = 0 + if ops == "read" or ops == "all": + reg_ex_io_per_sec = re.compile(r"reads/s:\s+(?P[\d.]+)") + io_per_sec = reg_ex_io_per_sec.search(data) + if io_per_sec: + result["read_io_per_sec"] = io_per_sec.group("read_per_sec") + else: + result["read_io_per_sec"] = 0 + + reg_ex_fsyncs_per_sec = re.compile(r"fsyncs/s:\s+(?P[\d.]+)") + fsyncs_per_sec = reg_ex_fsyncs_per_sec.search(data) + if fsyncs_per_sec: + result["fsyncs_per_sec"] = fsyncs_per_sec.group("fsyncs_per_sec") + else: + result["fsyncs_per_sec"] = 0 + + if ops == "write" or ops == "all": + reg_ex_mib_per_sec = re.compile( + r"written, MiB/s:\s+(?P[\d.]+)" + ) + mib_per_sec = reg_ex_mib_per_sec.search(data) + if mib_per_sec: + result["write_mib_per_sec"] = mib_per_sec.group("wr_mib_per_sec") + else: + result["write_mib_per_sec"] = 0 + if ops == "read" or ops == "all": + reg_ex_mib_per_sec = re.compile( + r"read, MiB/s:\s+(?P[\d.]+)" + ) + mib_per_sec = reg_ex_mib_per_sec.search(data) + if mib_per_sec: + result["read_mib_per_sec"] = mib_per_sec.group("rd_mib_per_sec") + else: + result["read_mib_per_sec"] = 0 + + return result + + def __send_subtest_msg( + self, + message_type: Any, + test_result: Any, + test_name: str, + other_fields: Dict[str, Any], + ) -> None: + subtest_msg = create_perf_message( + message_type, + self.node, + test_result, + test_name, + other_fields, + ) + print(subtest_msg) + notifier.notify(subtest_msg)