Skip to content

Commit

Permalink
Refactor start sampling (#33)
Browse files Browse the repository at this point in the history
* switch to NDJSON

* Playing with UX

* Aggregate samples as we go

* fixup: rm debug statement

* Pretty it up a bit

* Fix: full command should be space seperated

* Periods look cleaner than hyphens for iso8601

Fixes: #32

* Reformat output_prefix each time its used

We want to include whatever PID we are monitoring in the prefix, not
just the PID of the top level duct process.

Fixes: #34

* Dry up maxing/aggregation logic

* Logs parent dir is useful, __repr__ unused

* Write all pids in sample into a single file

* Update src/duct.py

Co-authored-by: Yaroslav Halchenko <[email protected]>

---------

Co-authored-by: Yaroslav Halchenko <[email protected]>
  • Loading branch information
asmacdo and yarikoptic committed May 30, 2024
1 parent 090b4f0 commit f786261
Showing 1 changed file with 114 additions and 91 deletions.
205 changes: 114 additions & 91 deletions src/duct.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
import argparse
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
import json
import os
Expand All @@ -11,31 +10,63 @@
import sys
import threading
import time
from typing import Any, DefaultDict, Dict, List, Optional, TextIO, Tuple, Union
from typing import Any, Dict, Optional, TextIO, Tuple, Union

__version__ = "0.0.1"
ENV_PREFIXES = ("PBS_", "SLURM_", "OSG")


class Colors:
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m" # Reset to default color
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


class Report:
"""Top level report"""

start_time: float
command: str
session_id: int
gpus: Optional[list]
unaggregated_samples: List[Dict]
number: int
system_info: Dict[str, Any] # Use more specific types if possible

def __init__(self, command: str, session_id: int) -> None:
def __init__(
self,
command: str,
arguments,
session_id: int,
output_prefix: str,
process,
datetime_filesafe,
) -> None:
self.start_time = time.time()
self.command = command
self._command = command
self.arguments = arguments
self.session_id = session_id
self.gpus = []
self.unaggregated_samples = []
self.number = 0
self.system_info = {}
self.output_prefix = output_prefix
self.max_values = defaultdict(dict)
self.process = process
self._sample = defaultdict(dict)
self.datetime_filesafe = datetime_filesafe

@property
def command(self):
return " ".join([self._command] + self.arguments)

@property
def elapsed_time(self):
return time.time() - self.start_time

def collect_environment(self):
self.env = (
Expand Down Expand Up @@ -72,6 +103,14 @@ def get_system_info(self):
except subprocess.CalledProcessError:
self.gpus = ["Failed to query GPU info"]

def update_max_resources(self, maxes, sample):
for pid in sample:
if pid in maxes:
for key, value in sample[pid].items():
maxes[pid][key] = max(maxes[pid].get(key, value), value)
else:
maxes[pid] = sample[pid]

def collect_sample(self):
process_data = {}
try:
Expand All @@ -88,6 +127,7 @@ def collect_sample(self):
for line in output.splitlines()[1:]:
if line:
pid, pcpu, pmem, rss, vsz, etime, cmd = line.split(maxsplit=6)

process_data[pid] = {
# %CPU
"pcpu": float(pcpu),
Expand All @@ -97,57 +137,52 @@ def collect_sample(self):
"rss": int(rss),
# Virtual Memory size
"vsz": int(vsz),
"timestamp": datetime.now().astimezone().isoformat(),
}
except subprocess.CalledProcessError:
process_data["error"] = "Failed to query process data"

self.unaggregated_samples.append(process_data)

def aggregate_samples(self):
max_values = {}
while self.unaggregated_samples:
sample = self.unaggregated_samples.pop()
for pid, metrics in sample.items():
if pid not in max_values:
max_values[
pid
] = metrics.copy() # Make a copy of the metrics for the first entry
else:
# Update each metric to the maximum found so far
for key in metrics:
max_values[pid][key] = max(max_values[pid][key], metrics[key])
return max_values

def __repr__(self):
return json.dumps(
{
"Command": self.command,
"System": self.system_info,
"ENV": self.env,
"GPU": self.gpus,
}
)


@dataclass
class SubReport:
"""Group of aggregated statestics on a session"""

number: int = 0

pids_dummy: DefaultDict[Any, List[Any]] = field(
default_factory=lambda: defaultdict(list)
)
session_data = None
elapsed_time = None

def serialize(self):
return {
"Subreport Number": self.number,
"Number": self.number,
"Elapsed Time": self.elapsed_time,
"Session Data": self.session_data,
}
return process_data

def write_pid_samples(self):
resource_stats_log_path = f"{self.output_prefix}usage.json"
with open(resource_stats_log_path, "a") as resource_statistics_log:
resource_statistics_log.write(json.dumps(self._sample) + "\n")

def print_max_values(self):
for pid, maxes in self.max_values.items():
print(f"PID {pid} Maximum Values: {maxes}")

def finalize(self):
if not self.process.returncode:
print(Colors.OKGREEN)
else:
print(Colors.FAIL)

print("-----------------------------------------------------")
print(" duct report")
print("-----------------------------------------------------")
print(f"Exit Code: {self.process.returncode}")
print(f"{Colors.OKCYAN}Command: {self.command}")
print(f"Wall Clock Time: {self.elapsed_time}")
print(f"Number of Processes: {len(self.max_values)}")
for pid, values in self.max_values.items():
values.pop("timestamp") # Meaningless
print(f" {pid} Max Usage: {values}")


def monitor_process(report, process, report_interval, sample_interval):
while True:
if process.poll() is not None: # the passthrough command has finished
break
# print(f"Resource stats log path: {resource_stats_log_path}")
sample = report.collect_sample()
report.update_max_resources(report._sample, sample)
if report.elapsed_time >= (report.number + 1) * report_interval:
report.write_pid_samples()
report.update_max_resources(report.max_values, report._sample)
report._sample = defaultdict(dict) # Reset sample
report.number += 1
time.sleep(sample_interval)


def create_and_parse_args():
Expand Down Expand Up @@ -259,28 +294,6 @@ def close(self):
self.infile.close()


def monitor_process(report, process, report_interval, sample_interval, output_prefix):
while True:
if process.poll() is not None: # the passthrough command has finished
break

elapsed_time = time.time() - report.start_time
resource_stats_log_path = "{output_prefix}usage.json"
if elapsed_time >= (report.number + 1) * report_interval:
aggregated = report.aggregate_samples()
for pid, pinfo in aggregated.items():
with open(
resource_stats_log_path.format(
output_prefix=output_prefix, pid=pid
),
"a",
) as resource_statistics_log:
pinfo["elapsed_time"] = elapsed_time
resource_statistics_log.write(json.dumps(aggregated))
report.number += 1
time.sleep(sample_interval)


def prepare_outputs(
capture_outputs: str, outputs: str, output_prefix: str
) -> Tuple[Union[TextIO, TailPipe, int], Union[TextIO, TailPipe, int]]:
Expand Down Expand Up @@ -309,17 +322,6 @@ def prepare_outputs(
return stdout, stderr


def format_output_prefix(output_prefix_template: str) -> str:
datenow = datetime.now()
f_kwargs = {
# 'pure' iso 8601 does not make good filenames
"datetime": datenow.isoformat(),
"datetime_filesafe": datenow.strftime("%Y-%m-%dT%H-%M-%S"),
"pid": os.getpid(),
}
return output_prefix_template.format(**f_kwargs)


def ensure_directories(path: str) -> None:
if path.endswith(os.sep): # If it ends in "/" (for linux) treat as a dir
os.makedirs(path, exist_ok=True)
Expand All @@ -333,7 +335,11 @@ def ensure_directories(path: str) -> None:
def main():
"""A wrapper to execute a command, monitor and log the process details."""
args = create_and_parse_args()
formatted_output_prefix = format_output_prefix(args.output_prefix)
datetime_filesafe = datetime.now().strftime("%Y.%m.%dT%H.%M.%S")
duct_pid = os.getpid()
formatted_output_prefix = args.output_prefix.format(
datetime_filesafe=datetime_filesafe, pid=duct_pid
)
ensure_directories(formatted_output_prefix)
stdout, stderr = prepare_outputs(
args.capture_outputs, args.outputs, formatted_output_prefix
Expand All @@ -347,21 +353,33 @@ def main():
else:
stderr_file = stderr

full_command = " ".join([str(args.command)] + args.arguments)
print(f"{Colors.OKCYAN}-----------------------------------------------------")
print(f"duct is executing {full_command}...")
print()
print(f"Log files will be written to {formatted_output_prefix}")
print(f"-----------------------------------------------------{Colors.ENDC}")
process = subprocess.Popen(
[str(args.command)] + args.arguments,
stdout=stdout_file,
stderr=stderr_file,
preexec_fn=os.setsid,
)
session_id = os.getsid(process.pid) # Get session ID of the new process
report = Report(args.command, session_id)
report = Report(
args.command,
args.arguments,
session_id,
formatted_output_prefix,
process,
datetime_filesafe,
)
if args.record_types in ["all", "processes-samples"]:
monitoring_args = [
report,
process,
args.report_interval,
args.sample_interval,
formatted_output_prefix,
]
monitoring_thread = threading.Thread(
target=monitor_process, args=monitoring_args
Expand All @@ -372,20 +390,25 @@ def main():
if args.record_types in ["all", "system-summary"]:
report.collect_environment()
report.get_system_info()
system_info_path = f"{formatted_output_prefix}info.json"
system_info_path = f"{args.output_prefix}info.json".format(
pid=duct_pid, datetime_filesafe=datetime_filesafe
)
with open(system_info_path, "a") as system_logs:
report.end_time = time.time()
report.run_time_seconds = f"{report.end_time - report.start_time}"
report.get_system_info()
system_logs.write(str(report))

process.wait()
report.process = process
if isinstance(stdout, TailPipe):
stdout_file.close()
stdout.close()
if isinstance(stderr, TailPipe):
stderr_file.close()
stderr.close()
report.finalize()
print(f"Log files location: {report.output_prefix}")


if __name__ == "__main__":
Expand Down

0 comments on commit f786261

Please sign in to comment.