diff --git a/examples/fl/fl/project/aggregator.py b/examples/fl/fl/project/aggregator.py index c0bbeafa1..8a1e7f283 100644 --- a/examples/fl/fl/project/aggregator.py +++ b/examples/fl/fl/project/aggregator.py @@ -1,39 +1,10 @@ -from utils import ( - get_aggregator_fqdn, - prepare_node_cert, - prepare_ca_cert, - prepare_plan, - prepare_cols_list, - prepare_init_weights, - create_workspace, - get_weights_path, -) - import os import shutil from subprocess import check_call from distutils.dir_util import copy_tree -def start_aggregator( - input_weights, - node_cert_folder, - ca_cert_folder, - output_logs, - output_weights, - plan_path, - collaborators, - report_path, -): - - workspace_folder = os.path.join(output_logs, "workspace") - create_workspace(workspace_folder) - prepare_plan(plan_path, workspace_folder) - prepare_cols_list(collaborators, workspace_folder) - prepare_init_weights(input_weights, workspace_folder) - fqdn = get_aggregator_fqdn(workspace_folder) - prepare_node_cert(node_cert_folder, "server", f"agg_{fqdn}", workspace_folder) - prepare_ca_cert(ca_cert_folder, workspace_folder) +def start_aggregator(workspace_folder, output_logs, output_weights, report_path): check_call(["fx", "aggregator", "start"], cwd=workspace_folder) @@ -41,7 +12,8 @@ def start_aggregator( # perhaps investigate overriding plan entries? # NOTE: logs and weights are copied, even if target folders are not empty - copy_tree(os.path.join(workspace_folder, "logs"), output_logs) + if os.path.exists(os.path.join(workspace_folder, "logs")): + copy_tree(os.path.join(workspace_folder, "logs"), output_logs) # NOTE: conversion fails since openfl needs sample data... # weights_paths = get_weights_path(fl_workspace) @@ -56,5 +28,5 @@ def start_aggregator( # Cleanup shutil.rmtree(workspace_folder, ignore_errors=True) - with open(report_path, 'w') as f: + with open(report_path, "w") as f: f.write("IsDone: 1") diff --git a/examples/fl/fl/project/collaborator.py b/examples/fl/fl/project/collaborator.py index 38c5048b6..fb4cdd1c2 100644 --- a/examples/fl/fl/project/collaborator.py +++ b/examples/fl/fl/project/collaborator.py @@ -1,32 +1,29 @@ -from utils import ( - get_collaborator_cn, - prepare_node_cert, - prepare_ca_cert, - prepare_plan, - create_workspace, -) import os +from utils import get_collaborator_cn import shutil from subprocess import check_call -def start_collaborator( - data_path, - labels_path, - node_cert_folder, - ca_cert_folder, - plan_path, - output_logs, -): - workspace_folder = os.path.join(output_logs, "workspace") - create_workspace(workspace_folder) - prepare_plan(plan_path, workspace_folder) +def start_collaborator(workspace_folder): cn = get_collaborator_cn() - prepare_node_cert(node_cert_folder, "client", f"col_{cn}", workspace_folder) - prepare_ca_cert(ca_cert_folder, workspace_folder) - - # set log files - check_call(["fx", "collaborator", "start", "-n", cn], cwd=workspace_folder) + check_call( + [os.environ.get("OPENFL_EXECUTABLE", "fx"), "collaborator", "start", "-n", cn], + cwd=workspace_folder, + ) # Cleanup shutil.rmtree(workspace_folder, ignore_errors=True) + + +def check_connectivity(workspace_folder): + cn = get_collaborator_cn() + check_call( + [ + os.environ.get("OPENFL_EXECUTABLE", "fx"), + "collaborator", + "connectivity_check", + "-n", + cn, + ], + cwd=workspace_folder, + ) diff --git a/examples/fl/fl/project/hooks.py b/examples/fl/fl/project/hooks.py index dd3960ba4..9dc59582f 100644 --- a/examples/fl/fl/project/hooks.py +++ b/examples/fl/fl/project/hooks.py @@ -30,9 +30,9 @@ def collaborator_pre_training_hook( ca_cert_folder, plan_path, output_logs, + workspace_folder, ): cn = get_collaborator_cn() - workspace_folder = os.path.join(output_logs, "workspace") target_data_folder = os.path.join(workspace_folder, "data", cn) os.makedirs(target_data_folder, exist_ok=True) @@ -69,6 +69,7 @@ def collaborator_post_training_hook( ca_cert_folder, plan_path, output_logs, + workspace_folder, ): pass @@ -82,6 +83,7 @@ def aggregator_pre_training_hook( plan_path, collaborators, report_path, + workspace_folder, ): pass @@ -95,5 +97,6 @@ def aggregator_post_training_hook( plan_path, collaborators, report_path, + workspace_folder, ): pass diff --git a/examples/fl/fl/project/mlcube.py b/examples/fl/fl/project/mlcube.py index 9e4a7e728..064440e95 100644 --- a/examples/fl/fl/project/mlcube.py +++ b/examples/fl/fl/project/mlcube.py @@ -1,9 +1,7 @@ """MLCube handler file""" -import os -import shutil import typer -from collaborator import start_collaborator +from collaborator import start_collaborator, check_connectivity from aggregator import start_aggregator from plan import generate_plan from hooks import ( @@ -12,22 +10,11 @@ collaborator_pre_training_hook, collaborator_post_training_hook, ) +from utils import generic_setup, generic_teardown, setup_collaborator, setup_aggregator app = typer.Typer() -def _setup(output_logs): - tmp_folder = os.path.join(output_logs, ".tmp") - os.makedirs(tmp_folder, exist_ok=True) - # TODO: this should be set before any code imports tempfile - os.environ["TMPDIR"] = tmp_folder - - -def _teardown(output_logs): - tmp_folder = os.path.join(output_logs, ".tmp") - shutil.rmtree(tmp_folder, ignore_errors=True) - - @app.command("train") def train( data_path: str = typer.Option(..., "--data_path"), @@ -37,23 +24,27 @@ def train( plan_path: str = typer.Option(..., "--plan_path"), output_logs: str = typer.Option(..., "--output_logs"), ): - _setup(output_logs) - collaborator_pre_training_hook( + workspace_folder = generic_setup(output_logs) + setup_collaborator( data_path=data_path, labels_path=labels_path, node_cert_folder=node_cert_folder, ca_cert_folder=ca_cert_folder, plan_path=plan_path, output_logs=output_logs, + workspace_folder=workspace_folder, ) - start_collaborator( + check_connectivity(workspace_folder) + collaborator_pre_training_hook( data_path=data_path, labels_path=labels_path, node_cert_folder=node_cert_folder, ca_cert_folder=ca_cert_folder, plan_path=plan_path, output_logs=output_logs, + workspace_folder=workspace_folder, ) + start_collaborator(workspace_folder=workspace_folder) collaborator_post_training_hook( data_path=data_path, labels_path=labels_path, @@ -61,8 +52,9 @@ def train( ca_cert_folder=ca_cert_folder, plan_path=plan_path, output_logs=output_logs, + workspace_folder=workspace_folder, ) - _teardown(output_logs) + generic_teardown(output_logs) @app.command("start_aggregator") @@ -76,8 +68,8 @@ def start_aggregator_( collaborators: str = typer.Option(..., "--collaborators"), report_path: str = typer.Option(..., "--report_path"), ): - _setup(output_logs) - aggregator_pre_training_hook( + workspace_folder = generic_setup(output_logs) + setup_aggregator( input_weights=input_weights, node_cert_folder=node_cert_folder, ca_cert_folder=ca_cert_folder, @@ -86,8 +78,9 @@ def start_aggregator_( plan_path=plan_path, collaborators=collaborators, report_path=report_path, + workspace_folder=workspace_folder, ) - start_aggregator( + aggregator_pre_training_hook( input_weights=input_weights, node_cert_folder=node_cert_folder, ca_cert_folder=ca_cert_folder, @@ -96,6 +89,13 @@ def start_aggregator_( plan_path=plan_path, collaborators=collaborators, report_path=report_path, + workspace_folder=workspace_folder, + ) + start_aggregator( + workspace_folder=workspace_folder, + output_logs=output_logs, + output_weights=output_weights, + report_path=report_path, ) aggregator_post_training_hook( input_weights=input_weights, @@ -106,8 +106,9 @@ def start_aggregator_( plan_path=plan_path, collaborators=collaborators, report_path=report_path, + workspace_folder=workspace_folder, ) - _teardown(output_logs) + generic_teardown(output_logs) @app.command("generate_plan") diff --git a/examples/fl/fl/project/utils.py b/examples/fl/fl/project/utils.py index d92add606..a0da69a16 100644 --- a/examples/fl/fl/project/utils.py +++ b/examples/fl/fl/project/utils.py @@ -3,6 +3,56 @@ import shutil +def generic_setup(output_logs): + tmpfolder = os.path.join(output_logs, ".tmp") + os.makedirs(tmpfolder, exist_ok=True) + # NOTE: this should be set before any code imports tempfile + os.environ["TMPDIR"] = tmpfolder + workspace_folder = os.path.join(output_logs, "workspace") + os.makedirs(workspace_folder, exist_ok=True) + create_workspace(workspace_folder) + return workspace_folder + + +def setup_collaborator( + data_path, + labels_path, + node_cert_folder, + ca_cert_folder, + plan_path, + output_logs, + workspace_folder, +): + prepare_plan(plan_path, workspace_folder) + cn = get_collaborator_cn() + prepare_node_cert(node_cert_folder, "client", f"col_{cn}", workspace_folder) + prepare_ca_cert(ca_cert_folder, workspace_folder) + + +def setup_aggregator( + input_weights, + node_cert_folder, + ca_cert_folder, + output_logs, + output_weights, + plan_path, + collaborators, + report_path, + workspace_folder, +): + prepare_plan(plan_path, workspace_folder) + prepare_cols_list(collaborators, workspace_folder) + prepare_init_weights(input_weights, workspace_folder) + fqdn = get_aggregator_fqdn(workspace_folder) + prepare_node_cert(node_cert_folder, "server", f"agg_{fqdn}", workspace_folder) + prepare_ca_cert(ca_cert_folder, workspace_folder) + + +def generic_teardown(output_logs): + tmp_folder = os.path.join(output_logs, ".tmp") + shutil.rmtree(tmp_folder, ignore_errors=True) + + def create_workspace(fl_workspace): plan_folder = os.path.join(fl_workspace, "plan") workspace_config = os.path.join(fl_workspace, ".workspace") diff --git a/examples/fl/fl/test.sh b/examples/fl/fl/test.sh index 95bd5b673..ae856d794 100644 --- a/examples/fl/fl/test.sh +++ b/examples/fl/fl/test.sh @@ -13,10 +13,19 @@ COL1="medperf mlcube run --mlcube ./mlcube_col1 --task train -e MEDPERF_PARTICIP COL2="medperf mlcube run --mlcube ./mlcube_col2 --task train -e MEDPERF_PARTICIPANT_LABEL=col2@example.com" COL3="medperf mlcube run --mlcube ./mlcube_col3 --task train -e MEDPERF_PARTICIPANT_LABEL=col3@example.com" -gnome-terminal -- bash -c "$AGG; bash" -gnome-terminal -- bash -c "$COL1; bash" -gnome-terminal -- bash -c "$COL2; bash" -gnome-terminal -- bash -c "$COL3; bash" +# gnome-terminal -- bash -c "$AGG; bash" +# gnome-terminal -- bash -c "$COL1; bash" +# gnome-terminal -- bash -c "$COL2; bash" +# gnome-terminal -- bash -c "$COL3; bash" +rm agg.log col1.log col2.log col3.log +$AGG >>agg.log & +sleep 6 +$COL1 >>col1.log & +sleep 6 +$COL2 >>col2.log & +sleep 6 +$COL3 >>col3.log & +wait # docker run --env MEDPERF_PARTICIPANT_LABEL=col1@example.com --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/data:/mlcube_io0:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/labels:/mlcube_io1:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/node_cert:/mlcube_io2:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/ca_cert:/mlcube_io3:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace:/mlcube_io4:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/logs:/mlcube_io5 -it --entrypoint bash mlcommons/medperf-fl:1.0.0 # python /mlcube_project/mlcube.py train --data_path=/mlcube_io0 --labels_path=/mlcube_io1 --node_cert_folder=/mlcube_io2 --ca_cert_folder=/mlcube_io3 --plan_path=/mlcube_io4/plan.yaml --output_logs=/mlcube_io5