Skip to content

Commit

Permalink
Merge branch 'cylondata:main' into uva-cs-cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
arupcsedu committed Nov 25, 2023
2 parents 843ce21 + 6913f1c commit ee107c0
Show file tree
Hide file tree
Showing 35 changed files with 1,414 additions and 873 deletions.
16 changes: 12 additions & 4 deletions .github/workflows/conda-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- os: ubuntu-20.04
gcc: 9
ucc: "master"
ucx: "override-remote-address3"

steps:
- uses: actions/checkout@v2
Expand All @@ -39,7 +40,7 @@ jobs:
- uses: conda-incubator/setup-miniconda@v2
with:
activate-environment: cylon_dev
environment-file: conda/environments/cylon.yml
environment-file: conda/environments/cylon_NoUCX.yml

- name: Activate conda
run: conda activate cylon_dev
Expand All @@ -51,17 +52,24 @@ jobs:
cmake .. -DBUILD_SHARED_LIBS=1 -DUSE_MPI=1 -DCMAKE_INSTALL_PREFIX=$HOME/gloo/install
make install
- name: Install UCX
run: |
git clone --single-branch -b ${{ matrix.ucx }} https://github.com/mstaylor/ucx.git $HOME/ucx
cd $HOME/ucx
./autogen.sh
./configure --prefix=$HOME/ucx/install --without-go
make install
- name: Install UCC
run: |
git clone --single-branch -b ${{ matrix.ucc }} https://github.com/openucx/ucc.git $HOME/ucc
cd $HOME/ucc
echo "conda ucx: $(conda list | grep ucx)"
./autogen.sh
./configure --prefix=$HOME/ucc/install --with-ucx=$CONDA/envs/cylon_dev
./configure --prefix=$HOME/ucc/install --with-ucx=$HOME/ucx/install
make install
- name: Build cylon, pycylon and run cpp test
run: python build.py -cmake-flags="-DCYLON_UCX=1 -DCYLON_GLOO=1 -DGLOO_INSTALL_PREFIX=$HOME/gloo/install -DCYLON_UCC=1 -DUCC_INSTALL_PREFIX=$HOME/ucc/install" -ipath="$HOME/cylon/install" --cpp --python --test
run: python build.py -cmake-flags="-DCYLON_UCX=1 -DCYLON_GLOO=1 -DGLOO_INSTALL_PREFIX=$HOME/gloo/install -DCYLON_UCC=1 -DUCC_INSTALL_PREFIX=$HOME/ucc/install -DUCX_INSTALL_PREFIX=$HOME/ucx/install" -ipath="$HOME/cylon/install" --cpp --python --test

- name: Run pytest
run: python build.py -ipath="$HOME/cylon/install" --pytest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
fail-fast: false
matrix:
include:
- os: macos-11
- os: macos-latest

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion aws/scripts/Join_Weak_Scaling.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 19,
"execution_count": 8,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
Expand Down
473 changes: 473 additions & 0 deletions aws/scripts/Join_Weak_Scaling_round2.ipynb

Large diffs are not rendered by default.

71 changes: 65 additions & 6 deletions aws/scripts/cylon_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def cylon_join(data=None):
df1 = DataFrame(pd.DataFrame(data1).add_prefix("col"))
df2 = DataFrame(pd.DataFrame(data2).add_prefix("col"))

timing = {'scaling': [], 'world': [], 'rows': [], 'max_value': [], 'rank': [], 'avg_t':[], 'tot_l':[]}

for i in range(data['it']):
env.barrier()
StopWatch.start(f"join_{i}_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -100,14 +102,28 @@ def cylon_join(data=None):
if env.rank == 0:
avg_t = sum_t / env.world_size
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l)
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l, file=open(data['output_summary_filename'], 'a'))
timing['scaling'].append(data['scaling'])
timing['world'].append(env.world_size)
timing['rows'].append(num_rows)
timing['max_value'].append(max_val)
timing['rank'].append(i)
timing['avg_t'].append(avg_t)
timing['tot_l'].append(tot_l)
StopWatch.stop(f"join_{i}_{data['host']}_{data['rows']}_{data['it']}")

StopWatch.stop(f"join_total_{data['host']}_{data['rows']}_{data['it']}")

if env.rank == 0:
StopWatch.benchmark(tag=str(data), filename=data['output_scaling_filename'])
upload_file(file_name=data['output_scaling_filename'], bucket=data['s3_bucket'], object_name=data['s3_stopwatch_object_name'])


if os.path.exists(data['output_summary_filename']):
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='a', index=False, header=False)
else:
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='w', index=False, header=True)


upload_file(file_name=data['output_summary_filename'], bucket=data['s3_bucket'],
object_name=data['s3_summary_object_name'])

Expand Down Expand Up @@ -151,6 +167,8 @@ def cylon_sort(data=None):
if env.rank == 0:
print("Task# ", data['task'])

timing = {'scaling': [], 'world': [], 'rows': [], 'max_value': [], 'rank': [], 'avg_t': [], 'tot_l': []}

for i in range(data['it']):
env.barrier()
StopWatch.start(f"sort_{i}_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -166,8 +184,15 @@ def cylon_sort(data=None):
if env.rank == 0:
avg_t = sum_t / env.world_size
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l)
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l,
file=open(data['output_summary_filename'], 'a'))
timing['scaling'].append(data['scaling'])
timing['world'].append(env.world_size)
timing['rows'].append(num_rows)
timing['max_value'].append(max_val)
timing['rank'].append(i)
timing['avg_t'].append(avg_t)
timing['tot_l'].append(tot_l)
#print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l,
# file=open(data['output_summary_filename'], 'a'))


StopWatch.stop(f"sort_{i}_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -178,9 +203,14 @@ def cylon_sort(data=None):
StopWatch.benchmark(tag=str(data), filename=data['output_scaling_filename'])
upload_file(file_name=data['output_scaling_filename'], bucket=data['s3_bucket'],
object_name=data['s3_stopwatch_object_name'])

if os.path.exists(data['output_summary_filename']):
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='a', index=False, header=False)
else:
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='w', index=False, header=True)

upload_file(file_name=data['output_summary_filename'], bucket=data['s3_bucket'],
object_name=data['s3_summary_object_name'])
redis_context.clearDB()


def cylon_slice(data=None):
Expand Down Expand Up @@ -222,6 +252,7 @@ def cylon_slice(data=None):
if env.rank == 0:
print("Task# ", data['task'])

timing = {'scaling': [], 'world': [], 'rows': [], 'max_value': [], 'rank': [], 'avg_t': [], 'tot_l': []}
for i in range(data['it']):
env.barrier()
StopWatch.start(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -239,8 +270,15 @@ def cylon_slice(data=None):
if env.rank == 0:
avg_t = sum_t / env.world_size
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l)
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l,
file=open(data['output_summary_filename'], 'a'))
#print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l,
# file=open(data['output_summary_filename'], 'a'))
timing['scaling'].append(data['scaling'])
timing['world'].append(env.world_size)
timing['rows'].append(num_rows)
timing['max_value'].append(max_val)
timing['rank'].append(i)
timing['avg_t'].append(avg_t)
timing['tot_l'].append(tot_l)
StopWatch.stop(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}")

StopWatch.stop(f"slice_total_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -249,6 +287,12 @@ def cylon_slice(data=None):
StopWatch.benchmark(tag=str(data), filename=data['output_scaling_filename'])
upload_file(file_name=data['output_scaling_filename'], bucket=data['s3_bucket'],
object_name=data['s3_stopwatch_object_name'])

if os.path.exists(data['output_summary_filename']):
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='a', index=False, header=False)
else:
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='w', index=False, header=True)

upload_file(file_name=data['output_summary_filename'], bucket=data['s3_bucket'],
object_name=data['s3_summary_object_name'])

Expand All @@ -257,28 +301,43 @@ def cylon_slice(data=None):

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="cylon scaling")

parser.add_argument('-n', dest='rows', type=int, **environ_or_required('ROWS'))

parser.add_argument('-i', dest='it', type=int, **environ_or_required('PARTITIONS')) #10

parser.add_argument('-u', dest='unique', type=float, **environ_or_required('UNIQUENESS'), help="unique factor") #0.9

parser.add_argument('-s', dest='scaling', type=str, **environ_or_required('SCALING'), choices=['s', 'w'],
help="s=strong w=weak") #w

parser.add_argument('-o', dest='operation', type=str, **environ_or_required('CYLON_OPERATION'), choices=['join', 'sort', 'slice'],
help="s=strong w=weak") # w

parser.add_argument('-w', dest='world_size', type=int, help="world size", **environ_or_required('WORLD_SIZE'))

parser.add_argument("-r", dest='redis_host', type=str, help="redis address, default to 127.0.0.1",
**environ_or_required('REDIS_HOST')) #127.0.0.1

parser.add_argument("-p1", dest='redis_port', type=int, help="name of redis port", **environ_or_required('REDIS_PORT')) #6379

parser.add_argument('-f1', dest='output_scaling_filename', type=str, help="Output filename for scaling results",
**environ_or_required('OUTPUT_SCALING_FILENAME'))

parser.add_argument('-f2', dest='output_summary_filename', type=str, help="Output filename for scaling summary results",
**environ_or_required('OUTPUT_SUMMARY_FILENAME'))

parser.add_argument('-b', dest='s3_bucket', type=str, help="S3 Bucket Name", **environ_or_required('S3_BUCKET'))

parser.add_argument('-o1', dest='s3_stopwatch_object_name', type=str, help="S3 Object Name", **environ_or_required('S3_STOPWATCH_OBJECT_NAME'))

parser.add_argument('-o2', dest='s3_summary_object_name', type=str, help="S3 Object Name",
**environ_or_required('S3_SUMMARY_OBJECT_NAME'))

args = vars(parser.parse_args())

args['host'] = "aws"

if args['operation'] == 'join':
print("executing cylon join operation")
cylon_join(args)
Expand Down
Loading

0 comments on commit ee107c0

Please sign in to comment.