Skip to content

Commit

Permalink
Allow output_prefix to be a directory or file-prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
asmacdo committed May 1, 2024
1 parent 9a827e3 commit a908b4f
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions src/duct.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,6 @@ def serialize(self):


def create_and_parse_args():
now = datetime.now()
# 'pure' iso 8601 does not make good filenames
file_safe_iso = now.strftime("%Y-%m-%d.%H-%M-%S")
parser = argparse.ArgumentParser(
description="Gathers metrics on a command and all its child processes."
)
Expand All @@ -157,8 +154,8 @@ def create_and_parse_args():
parser.add_argument(
"--output-prefix",
type=str,
default=os.getenv("DUCT_OUTPUT_PREFIX", f".duct/run-logs/{file_safe_iso}"),
help="Directory in which all logs will be saved.",
default=os.getenv("DUCT_OUTPUT_PREFIX", "{file_safe_iso}"),
help="`path+fileprefix` or directory in which all logs will be saved.",
)
parser.add_argument(
"--report-interval",
Expand Down Expand Up @@ -246,12 +243,16 @@ def monitor_process(
break

elapsed_time = time.time() - report.start_time
report.collect_sample()
sep = "" if output_prefix.endswith(os.sep) else "-"
resource_stats_log_path = "{output_prefix}{sep}{pid}-resource-usage.json"
if elapsed_time >= (report.number + 1) * report_interval:
aggregated = report.aggregate_samples()
for pid, pinfo in aggregated.items():
with open(
f"{output_prefix}/{pid}_resource_usage.json", "a"
resource_stats_log_path.format(
output_prefix=output_prefix, sep=sep, pid=pid
),
"a",
) as resource_statistics_log:
pinfo["elapsed_time"] = elapsed_time
resource_statistics_log.write(json.dumps(aggregated))
Expand All @@ -261,20 +262,20 @@ def monitor_process(

def prepare_outputs(capture_outputs, outputs, output_prefix):
if capture_outputs in ["all", "stdout"] and outputs in ["all", "stdout"]:
stdout = TeeStream(f"{output_prefix}/stdout.txt")
stdout = TeeStream(f"{output_prefix}stdout")
stdout.start()
elif capture_outputs in ["all", "stdout"] and outputs in ["none", "stderr"]:
stdout = open(f"{output_prefix}/stdout.txt")
stdout = open(f"{output_prefix}stdout")
elif capture_outputs in ["none", "stderr"] and outputs in ["all", "stdout"]:
stdout = subprocess.PIPE
else:
stdout = subprocess.DEVNULL

if capture_outputs in ["all", "stderr"] and outputs in ["all", "stderr"]:
stderr = TeeStream(f"{output_prefix}/stderr.txt")
stderr = TeeStream(f"{output_prefix}stderr")
stderr.start()
elif capture_outputs in ["all", "stderr"] and outputs in ["none", "stdout"]:
stderr = open(f"{output_prefix}/stderr.txt")
stderr = open(f"{output_prefix}/stderr")
elif capture_outputs in ["none", "stdout"] and outputs in [
"all",
"stderr",
Expand All @@ -285,15 +286,37 @@ def prepare_outputs(capture_outputs, outputs, output_prefix):
return stdout, stderr


def format_output_prefix(output_prefix_template):
f_kwargs = {}
if "file_safe_iso" in output_prefix_template:
# 'pure' iso 8601 does not make good filenames
f_kwargs["file_safe_iso"] = datetime.now().strftime("%Y-%m-%d.%H-%M-%S")
if f_kwargs:
return output_prefix_template.format(**f_kwargs)
else:
return output_prefix_template


def ensure_directories(path):
if path.endswith(os.sep): # If it ends in "/" (for linux) treat as a dir
os.makedirs(path, exist_ok=True)
else:
# Path does not end with a separator, treat the last part as a filename
directory = os.path.dirname(path)
if directory: # If there's a directory part, create it
os.makedirs(directory, exist_ok=True)


def main():
"""A wrapper to execute a command, monitor and log the process details."""
args = create_and_parse_args()
os.makedirs(args.output_prefix, exist_ok=True)
formatted_output_prefix = format_output_prefix(args.output_prefix)
ensure_directories(formatted_output_prefix)
stdout, stderr = prepare_outputs(
args.capture_outputs, args.outputs, args.output_prefix
args.capture_outputs, args.outputs, formatted_output_prefix
)
process = subprocess.Popen(
[str(args.command)] + args.arguments.copy(),
[str(args.command)] + args.arguments,
stdout=stdout,
stderr=stderr,
preexec_fn=os.setsid,
Expand All @@ -311,7 +334,7 @@ def main():
process,
args.report_interval,
args.sample_interval,
args.output_prefix,
formatted_output_prefix,
]
monitoring_thread = threading.Thread(
target=monitor_process, args=monitoring_args
Expand All @@ -320,10 +343,9 @@ def main():
monitoring_thread.join()

if args.record_types in ["all", "system-summary"]:
with open(
f"{args.output_prefix}/system-report.session-{report.session_id}.json",
"a",
) as system_logs:
sep = "" if formatted_output_prefix.endswith(os.sep) else "-"
system_info_path = f"{formatted_output_prefix}{sep}info.json"
with open(system_info_path, "a") as system_logs:
report.end_time = time.time()
report.run_time_seconds = f"{report.end_time - report.start_time}"
report.get_system_info()
Expand Down

0 comments on commit a908b4f

Please sign in to comment.