Skip to content

Commit

Permalink
update fl integration test mlcube example
Browse files Browse the repository at this point in the history
  • Loading branch information
hasan7n committed Sep 3, 2024
1 parent dee5152 commit 26b4337
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 83 deletions.
36 changes: 4 additions & 32 deletions examples/fl/fl/project/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,19 @@
from utils import (
get_aggregator_fqdn,
prepare_node_cert,
prepare_ca_cert,
prepare_plan,
prepare_cols_list,
prepare_init_weights,
create_workspace,
get_weights_path,
)

import os
import shutil
from subprocess import check_call
from distutils.dir_util import copy_tree


def start_aggregator(
input_weights,
node_cert_folder,
ca_cert_folder,
output_logs,
output_weights,
plan_path,
collaborators,
report_path,
):

workspace_folder = os.path.join(output_logs, "workspace")
create_workspace(workspace_folder)
prepare_plan(plan_path, workspace_folder)
prepare_cols_list(collaborators, workspace_folder)
prepare_init_weights(input_weights, workspace_folder)
fqdn = get_aggregator_fqdn(workspace_folder)
prepare_node_cert(node_cert_folder, "server", f"agg_{fqdn}", workspace_folder)
prepare_ca_cert(ca_cert_folder, workspace_folder)
def start_aggregator(workspace_folder, output_logs, output_weights, report_path):

check_call(["fx", "aggregator", "start"], cwd=workspace_folder)

# TODO: check how to copy logs during runtime.
# perhaps investigate overriding plan entries?

# NOTE: logs and weights are copied, even if target folders are not empty
copy_tree(os.path.join(workspace_folder, "logs"), output_logs)
if os.path.exists(os.path.join(workspace_folder, "logs")):
copy_tree(os.path.join(workspace_folder, "logs"), output_logs)

# NOTE: conversion fails since openfl needs sample data...
# weights_paths = get_weights_path(fl_workspace)
Expand All @@ -56,5 +28,5 @@ def start_aggregator(
# Cleanup
shutil.rmtree(workspace_folder, ignore_errors=True)

with open(report_path, 'w') as f:
with open(report_path, "w") as f:
f.write("IsDone: 1")
43 changes: 20 additions & 23 deletions examples/fl/fl/project/collaborator.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
from utils import (
get_collaborator_cn,
prepare_node_cert,
prepare_ca_cert,
prepare_plan,
create_workspace,
)
import os
from utils import get_collaborator_cn
import shutil
from subprocess import check_call


def start_collaborator(
data_path,
labels_path,
node_cert_folder,
ca_cert_folder,
plan_path,
output_logs,
):
workspace_folder = os.path.join(output_logs, "workspace")
create_workspace(workspace_folder)
prepare_plan(plan_path, workspace_folder)
def start_collaborator(workspace_folder):
cn = get_collaborator_cn()
prepare_node_cert(node_cert_folder, "client", f"col_{cn}", workspace_folder)
prepare_ca_cert(ca_cert_folder, workspace_folder)

# set log files
check_call(["fx", "collaborator", "start", "-n", cn], cwd=workspace_folder)
check_call(
[os.environ.get("OPENFL_EXECUTABLE", "fx"), "collaborator", "start", "-n", cn],
cwd=workspace_folder,
)

# Cleanup
shutil.rmtree(workspace_folder, ignore_errors=True)


def check_connectivity(workspace_folder):
cn = get_collaborator_cn()
check_call(
[
os.environ.get("OPENFL_EXECUTABLE", "fx"),
"collaborator",
"connectivity_check",
"-n",
cn,
],
cwd=workspace_folder,
)
5 changes: 4 additions & 1 deletion examples/fl/fl/project/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ def collaborator_pre_training_hook(
ca_cert_folder,
plan_path,
output_logs,
workspace_folder,
):
cn = get_collaborator_cn()
workspace_folder = os.path.join(output_logs, "workspace")

target_data_folder = os.path.join(workspace_folder, "data", cn)
os.makedirs(target_data_folder, exist_ok=True)
Expand Down Expand Up @@ -69,6 +69,7 @@ def collaborator_post_training_hook(
ca_cert_folder,
plan_path,
output_logs,
workspace_folder,
):
pass

Expand All @@ -82,6 +83,7 @@ def aggregator_pre_training_hook(
plan_path,
collaborators,
report_path,
workspace_folder,
):
pass

Expand All @@ -95,5 +97,6 @@ def aggregator_post_training_hook(
plan_path,
collaborators,
report_path,
workspace_folder,
):
pass
47 changes: 24 additions & 23 deletions examples/fl/fl/project/mlcube.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""MLCube handler file"""

import os
import shutil
import typer
from collaborator import start_collaborator
from collaborator import start_collaborator, check_connectivity
from aggregator import start_aggregator
from plan import generate_plan
from hooks import (
Expand All @@ -12,22 +10,11 @@
collaborator_pre_training_hook,
collaborator_post_training_hook,
)
from utils import generic_setup, generic_teardown, setup_collaborator, setup_aggregator

app = typer.Typer()


def _setup(output_logs):
tmp_folder = os.path.join(output_logs, ".tmp")
os.makedirs(tmp_folder, exist_ok=True)
# TODO: this should be set before any code imports tempfile
os.environ["TMPDIR"] = tmp_folder


def _teardown(output_logs):
tmp_folder = os.path.join(output_logs, ".tmp")
shutil.rmtree(tmp_folder, ignore_errors=True)


@app.command("train")
def train(
data_path: str = typer.Option(..., "--data_path"),
Expand All @@ -37,32 +24,37 @@ def train(
plan_path: str = typer.Option(..., "--plan_path"),
output_logs: str = typer.Option(..., "--output_logs"),
):
_setup(output_logs)
collaborator_pre_training_hook(
workspace_folder = generic_setup(output_logs)
setup_collaborator(
data_path=data_path,
labels_path=labels_path,
node_cert_folder=node_cert_folder,
ca_cert_folder=ca_cert_folder,
plan_path=plan_path,
output_logs=output_logs,
workspace_folder=workspace_folder,
)
start_collaborator(
check_connectivity(workspace_folder)
collaborator_pre_training_hook(
data_path=data_path,
labels_path=labels_path,
node_cert_folder=node_cert_folder,
ca_cert_folder=ca_cert_folder,
plan_path=plan_path,
output_logs=output_logs,
workspace_folder=workspace_folder,
)
start_collaborator(workspace_folder=workspace_folder)
collaborator_post_training_hook(
data_path=data_path,
labels_path=labels_path,
node_cert_folder=node_cert_folder,
ca_cert_folder=ca_cert_folder,
plan_path=plan_path,
output_logs=output_logs,
workspace_folder=workspace_folder,
)
_teardown(output_logs)
generic_teardown(output_logs)


@app.command("start_aggregator")
Expand All @@ -76,8 +68,8 @@ def start_aggregator_(
collaborators: str = typer.Option(..., "--collaborators"),
report_path: str = typer.Option(..., "--report_path"),
):
_setup(output_logs)
aggregator_pre_training_hook(
workspace_folder = generic_setup(output_logs)
setup_aggregator(
input_weights=input_weights,
node_cert_folder=node_cert_folder,
ca_cert_folder=ca_cert_folder,
Expand All @@ -86,8 +78,9 @@ def start_aggregator_(
plan_path=plan_path,
collaborators=collaborators,
report_path=report_path,
workspace_folder=workspace_folder,
)
start_aggregator(
aggregator_pre_training_hook(
input_weights=input_weights,
node_cert_folder=node_cert_folder,
ca_cert_folder=ca_cert_folder,
Expand All @@ -96,6 +89,13 @@ def start_aggregator_(
plan_path=plan_path,
collaborators=collaborators,
report_path=report_path,
workspace_folder=workspace_folder,
)
start_aggregator(
workspace_folder=workspace_folder,
output_logs=output_logs,
output_weights=output_weights,
report_path=report_path,
)
aggregator_post_training_hook(
input_weights=input_weights,
Expand All @@ -106,8 +106,9 @@ def start_aggregator_(
plan_path=plan_path,
collaborators=collaborators,
report_path=report_path,
workspace_folder=workspace_folder,
)
_teardown(output_logs)
generic_teardown(output_logs)


@app.command("generate_plan")
Expand Down
50 changes: 50 additions & 0 deletions examples/fl/fl/project/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,56 @@
import shutil


def generic_setup(output_logs):
tmpfolder = os.path.join(output_logs, ".tmp")
os.makedirs(tmpfolder, exist_ok=True)
# NOTE: this should be set before any code imports tempfile
os.environ["TMPDIR"] = tmpfolder
workspace_folder = os.path.join(output_logs, "workspace")
os.makedirs(workspace_folder, exist_ok=True)
create_workspace(workspace_folder)
return workspace_folder


def setup_collaborator(
data_path,
labels_path,
node_cert_folder,
ca_cert_folder,
plan_path,
output_logs,
workspace_folder,
):
prepare_plan(plan_path, workspace_folder)
cn = get_collaborator_cn()
prepare_node_cert(node_cert_folder, "client", f"col_{cn}", workspace_folder)
prepare_ca_cert(ca_cert_folder, workspace_folder)


def setup_aggregator(
input_weights,
node_cert_folder,
ca_cert_folder,
output_logs,
output_weights,
plan_path,
collaborators,
report_path,
workspace_folder,
):
prepare_plan(plan_path, workspace_folder)
prepare_cols_list(collaborators, workspace_folder)
prepare_init_weights(input_weights, workspace_folder)
fqdn = get_aggregator_fqdn(workspace_folder)
prepare_node_cert(node_cert_folder, "server", f"agg_{fqdn}", workspace_folder)
prepare_ca_cert(ca_cert_folder, workspace_folder)


def generic_teardown(output_logs):
tmp_folder = os.path.join(output_logs, ".tmp")
shutil.rmtree(tmp_folder, ignore_errors=True)


def create_workspace(fl_workspace):
plan_folder = os.path.join(fl_workspace, "plan")
workspace_config = os.path.join(fl_workspace, ".workspace")
Expand Down
17 changes: 13 additions & 4 deletions examples/fl/fl/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@ COL1="medperf mlcube run --mlcube ./mlcube_col1 --task train -e MEDPERF_PARTICIP
COL2="medperf mlcube run --mlcube ./mlcube_col2 --task train -e [email protected]"
COL3="medperf mlcube run --mlcube ./mlcube_col3 --task train -e [email protected]"

gnome-terminal -- bash -c "$AGG; bash"
gnome-terminal -- bash -c "$COL1; bash"
gnome-terminal -- bash -c "$COL2; bash"
gnome-terminal -- bash -c "$COL3; bash"
# gnome-terminal -- bash -c "$AGG; bash"
# gnome-terminal -- bash -c "$COL1; bash"
# gnome-terminal -- bash -c "$COL2; bash"
# gnome-terminal -- bash -c "$COL3; bash"
rm agg.log col1.log col2.log col3.log
$AGG >>agg.log &
sleep 6
$COL1 >>col1.log &
sleep 6
$COL2 >>col2.log &
sleep 6
$COL3 >>col3.log &
wait

# docker run --env [email protected] --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/data:/mlcube_io0:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/labels:/mlcube_io1:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/node_cert:/mlcube_io2:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/ca_cert:/mlcube_io3:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace:/mlcube_io4:ro --volume /home/hasan/work/medperf_ws/medperf/examples/fl/fl/mlcube_col1/workspace/logs:/mlcube_io5 -it --entrypoint bash mlcommons/medperf-fl:1.0.0
# python /mlcube_project/mlcube.py train --data_path=/mlcube_io0 --labels_path=/mlcube_io1 --node_cert_folder=/mlcube_io2 --ca_cert_folder=/mlcube_io3 --plan_path=/mlcube_io4/plan.yaml --output_logs=/mlcube_io5

0 comments on commit 26b4337

Please sign in to comment.