diff --git a/src/con_duct/__main__.py b/src/con_duct/__main__.py index 8bfd3ec..ffe8895 100644 --- a/src/con_duct/__main__.py +++ b/src/con_duct/__main__.py @@ -13,7 +13,7 @@ import sys import threading import time -from typing import IO, Any, TextIO +from typing import IO, Any, Optional, TextIO from . import __version__ ENV_PREFIXES = ("PBS_", "SLURM_", "OSG") @@ -162,19 +162,34 @@ def prepare_paths(self, clobber: bool, capture_outputs: Outputs) -> None: @dataclass class Averages: - rss: float = 0.0 - vsz: float = 0.0 - pmem: float = 0.0 - pcpu: float = 0.0 + rss: Optional[float] = None + vsz: Optional[float] = None + pmem: Optional[float] = None + pcpu: Optional[float] = None num_samples: int = 0 def update(self: Averages, other: Sample) -> None: assert_num(other.total_rss, other.total_vsz, other.total_pmem, other.total_pcpu) - self.num_samples += 1 - self.rss += (other.total_rss - self.rss) / self.num_samples - self.vsz += (other.total_vsz - self.vsz) / self.num_samples - self.pmem += (other.total_pmem - self.pmem) / self.num_samples - self.pcpu += (other.total_pcpu - self.pcpu) / self.num_samples + if not self.num_samples: + self.num_samples += 1 + self.rss = other.total_rss + self.vsz = other.total_vsz + self.pmem = other.total_pmem + self.pcpu = other.total_pcpu + else: + assert self.rss is not None + assert self.vsz is not None + assert self.pmem is not None + assert self.pcpu is not None + assert other.total_rss is not None + assert other.total_vsz is not None + assert other.total_pmem is not None + assert other.total_pcpu is not None + self.num_samples += 1 + self.rss += (other.total_rss - self.rss) / self.num_samples + self.vsz += (other.total_vsz - self.vsz) / self.num_samples + self.pmem += (other.total_pmem - self.pmem) / self.num_samples + self.pcpu += (other.total_pcpu - self.pcpu) / self.num_samples @classmethod def from_sample(cls, sample: Sample) -> Averages: @@ -194,17 +209,17 @@ def from_sample(cls, sample: Sample) -> Averages: class Sample: stats: dict[int, ProcessStats] = field(default_factory=dict) averages: Averages = field(default_factory=Averages) - total_rss: int = 0 - total_vsz: int = 0 - total_pmem: float = 0.0 - total_pcpu: float = 0.0 + total_rss: Optional[int] = None + total_vsz: Optional[int] = None + total_pmem: Optional[float] = None + total_pcpu: Optional[float] = None timestamp: str = "" # TS of last sample collected def add_pid(self, pid: int, stats: ProcessStats) -> None: - self.total_rss += stats.rss - self.total_vsz += stats.vsz - self.total_pmem += stats.pmem - self.total_pcpu += stats.pcpu + self.total_rss = self.total_rss or 0 + stats.rss + self.total_vsz = self.total_vsz or 0 + stats.vsz + self.total_pmem = self.total_vsz or 0 + stats.pmem + self.total_pcpu = self.total_pcpu or 0 + stats.pcpu self.stats[pid] = stats self.timestamp = max(self.timestamp, stats.timestamp) @@ -218,10 +233,14 @@ def max(self: Sample, other: Sample) -> Sample: output.add_pid(pid, mine) else: output.add_pid(pid, other.stats[pid]) - output.total_pmem = max(self.total_pmem, other.total_pmem) - output.total_pcpu = max(self.total_pcpu, other.total_pcpu) - output.total_rss = max(self.total_rss, other.total_rss) - output.total_vsz = max(self.total_vsz, other.total_vsz) + assert other.total_pmem is not None + assert other.total_pcpu is not None + assert other.total_rss is not None + assert other.total_vsz is not None + output.total_pmem = max(self.total_pmem or 0, other.total_pmem) + output.total_pcpu = max(self.total_pcpu or 0, other.total_pcpu) + output.total_rss = max(self.total_rss or 0, other.total_rss) + output.total_vsz = max(self.total_vsz or 0, other.total_vsz) return output def for_json(self) -> dict[str, Any]: @@ -312,7 +331,7 @@ def get_system_info(self) -> None: except subprocess.CalledProcessError: self.gpus = None - def collect_sample(self) -> Sample: + def collect_sample(self) -> Optional[Sample]: assert self.session_id is not None sample = Sample() try: @@ -341,8 +360,8 @@ def collect_sample(self) -> Sample: timestamp=datetime.now().astimezone().isoformat(), ), ) - except subprocess.CalledProcessError: - pass + except subprocess.CalledProcessError: # when session_id has no processes + return None return sample def write_subreport(self) -> None: @@ -359,30 +378,14 @@ def execution_summary(self) -> dict[str, Any]: "command": self.command, "logs_prefix": self.log_paths.prefix, "wall_clock_time": self.elapsed_time, - "peak_rss": ( - self.max_values.total_rss if self.max_values.stats else "unknown" - ), - "average_rss": ( - self.averages.rss if self.averages.num_samples >= 1 else "unknown" - ), - "peak_vsz": ( - self.max_values.total_vsz if self.max_values.stats else "unknown" - ), - "average_vsz": ( - self.averages.vsz if self.averages.num_samples >= 1 else "unknown" - ), - "peak_pmem": ( - self.max_values.total_pmem if self.max_values.stats else "unknown" - ), - "average_pmem": ( - self.averages.pmem if self.averages.num_samples >= 1 else "unknown" - ), - "peak_pcpu": ( - self.max_values.total_pcpu if self.max_values.stats else "unknown" - ), - "average_pcpu": ( - self.averages.pcpu if self.averages.num_samples >= 1 else "unknown" - ), + "peak_rss": self.max_values.total_rss, + "average_rss": self.averages.rss, + "peak_vsz": self.max_values.total_vsz, + "average_vsz": self.averages.vsz, + "peak_pmem": self.max_values.total_pmem, + "average_pmem": self.averages.pmem, + "peak_pcpu": self.max_values.total_pcpu, + "average_pcpu": self.averages.pcpu, "num_samples": self.averages.num_samples, "num_reports": self.number, } @@ -403,7 +406,10 @@ def dump_json(self) -> str: @cached_property def execution_summary_formatted(self) -> str: - return self.summary_format.format_map(self.execution_summary) + human_readable = { + k: "unknown" if v is None else v for k, v in self.execution_summary.items() + } + return self.summary_format.format_map(human_readable) @dataclass @@ -543,6 +549,10 @@ def monitor_process( break sample = report.collect_sample() # Report averages should be updated prior to sample aggregation + if ( + sample is None + ): # passthrough has probably finished before sample could be collected + continue report.averages.update(sample) if report.current_sample is None: sample.averages = Averages.from_sample(sample) diff --git a/test/test_report.py b/test/test_report.py index 0132f2c..47a26d7 100644 --- a/test/test_report.py +++ b/test/test_report.py @@ -1,7 +1,14 @@ from __future__ import annotations from datetime import datetime +from unittest import mock import pytest -from con_duct.__main__ import Averages, ProcessStats, Sample +from con_duct.__main__ import ( + EXECUTION_SUMMARY_FORMAT, + Averages, + ProcessStats, + Report, + Sample, +) stat0 = ProcessStats( pcpu=0.0, pmem=0, rss=0, vsz=0, timestamp="2024-06-11T10:09:37-04:00" @@ -102,7 +109,7 @@ def test_averages_three_samples() -> None: (0, 0.0, 0, 0.0), (2.5, 3.5, 8192, 16384), (100.0, 99.9, 65536, 131072), - ] + ], ) def test_process_stats_green(pcpu: float, pmem: float, rss: int, vsz: int) -> None: # Assert does not raise @@ -123,7 +130,7 @@ def test_process_stats_green(pcpu: float, pmem: float, rss: int, vsz: int) -> No (1, 2, "one", 4), (1, 2, 3, "value"), ("2", "fail", "or", "more"), - ] + ], ) def test_process_stats_red(pcpu: float, pmem: float, rss: int, vsz: int) -> None: with pytest.raises(AssertionError): @@ -134,3 +141,18 @@ def test_process_stats_red(pcpu: float, pmem: float, rss: int, vsz: int) -> None vsz=vsz, timestamp=datetime.now().astimezone().isoformat(), ) + + +@mock.patch("con_duct.__main__.LogPaths") +@mock.patch("con_duct.__main__.subprocess.Popen") +def test_execution_summary_formatted( + mock_popen: mock.MagicMock, mock_log_paths: mock.MagicMock +) -> None: + mock_log_paths.prefix = "mock_prefix" + report = Report( + "_cmd", [], None, mock_popen, mock_log_paths, EXECUTION_SUMMARY_FORMAT, False + ) + + output = report.execution_summary_formatted + assert "None" not in output + assert "unknown" in output