Skip to content

Commit

Permalink
Merge branch 'main' into binyli/perfgate
Browse files Browse the repository at this point in the history
  • Loading branch information
Binyang2014 committed Jun 16, 2023
2 parents f2536fe + 8410fcd commit 5894a59
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 28 deletions.
31 changes: 20 additions & 11 deletions .azure-pipelines/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pr:
branches:
include:
- main
drafts: false

pool:
name: mscclpp
Expand Down Expand Up @@ -35,10 +34,10 @@ steps:
script: |
set -e
export PATH=/usr/local/mpi/bin:$PATH
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2 -k 1
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2 -k 2
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2 -k 3
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2 -o output.jsonl
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2 -k 1 -o output.jsonl
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2 -k 2 -o output.jsonl
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allgather_test_perf -b 1K -e 1G -f 2 -k 3 -o output.jsonl
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
Expand All @@ -49,7 +48,7 @@ steps:
script: |
set -e
export PATH=/usr/local/mpi/bin:$PATH
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/sendrecv_test_perf -b 1K -e 1G -f 2
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/sendrecv_test_perf -b 1K -e 1G -f 2 -o output.jsonl
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
Expand All @@ -60,9 +59,9 @@ steps:
script: |
set -e
export PATH=/usr/local/mpi/bin:$PATH
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 1
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 2
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -o output.jsonl
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 1 -o output.jsonl
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 2 -o output.jsonl
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
Expand All @@ -73,6 +72,16 @@ steps:
script: |
set -e
export PATH=/usr/local/mpi/bin:$PATH
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/alltoall_test_perf -b 1K -e 1G -f 2
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/alltoall_test_perf -b 1K -e 1G -f 2 -k 1
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/alltoall_test_perf -b 1K -e 1G -f 2 -o output.jsonl
mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/alltoall_test_perf -b 1K -e 1G -f 2 -k 1 -o output.jsonl
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: CheckPerfNumber
displayName: Check collective primitives performance
inputs:
targetType: 'inline'
script: |
set -e
python3 test/mscclpp-test/check_perf_result.py --perf-file output.jsonl --baseline-file test/mscclpp-test/perf_ndmv4.jsonl
workingDirectory: '$(System.DefaultWorkingDirectory)'
2 changes: 2 additions & 0 deletions .azure-pipelines/multi-nodes-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ steps:
HOSTFILE=$(System.DefaultWorkingDirectory)/test/mscclpp-test/deploy/hostfile
SSH_OPTION="StrictHostKeyChecking=no"
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
rm -rf output/*
mkdir -p output
touch output/mscclpp-it-000000
tail -f output/mscclpp-it-000000 &
Expand All @@ -85,6 +86,7 @@ steps:
HOSTFILE=$(System.DefaultWorkingDirectory)/test/mscclpp-test/deploy/hostfile
SSH_OPTION="StrictHostKeyChecking=no"
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
rm -rf output/*
mkdir -p output
touch output/mscclpp-it-000000
tail -f output/mscclpp-it-000000 &
Expand Down
1 change: 0 additions & 1 deletion .azure-pipelines/ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pr:
branches:
include:
- main
drafts: false

jobs:
- job: UnitTest
Expand Down
31 changes: 22 additions & 9 deletions test/mscclpp-test/allgather_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ __device__ void allgather0(mscclpp::channel::SimpleDeviceChannel devChan, int ra
}

__device__ void localAllGather(mscclpp::channel::SimpleDeviceChannel devChan, int rank, int worldSize,
int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) {
int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size,
bool flushAfterSignal = true) {
// this allgather algorithm works as follows:
// Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode
// and waits for data from GPU rank (i-1) % nranksPerNode
Expand All @@ -42,7 +43,8 @@ __device__ void localAllGather(mscclpp::channel::SimpleDeviceChannel devChan, in
for (int i = 1; i < nranksPerNode; i++) {
if ((remoteRank % nranksPerNode) == ((rank + i) % nranksPerNode)) {
// put your data to GPU (rank+i) % nranksPerNode and signal in one call
if ((threadIdx.x % 32) == 0) devChan.putWithSignalAndFlush(offset, size);
if (flushAfterSignal && (threadIdx.x % 32) == 0) devChan.putWithSignalAndFlush(offset, size);
if (!flushAfterSignal && (threadIdx.x % 32) == 0) devChan.putWithSignal(offset, size);
}
// wait for the data from GPU (rank-i) % nranksPerNode to arrive
if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) {
Expand Down Expand Up @@ -76,37 +78,48 @@ __device__ void allgather2(mscclpp::channel::SimpleDeviceChannel devChan, int ra
// local allgather
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
nelemsPerGPU * sizeof(int));
nelemsPerGPU * sizeof(int), false);
}
// cross-node exchange
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
// opposite side
if ((threadIdx.x % 32) == 0)
devChan.putWithSignalAndFlush(rank * nelemsPerGPU * sizeof(int),
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
devChan.putWithSignal(rank * nelemsPerGPU * sizeof(int),
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
if ((threadIdx.x % 32) == 0) devChan.wait();
}

// sync here to make sure IB flush dose not block the CUDA IPC traffic
__syncthreads();
// since all CUDA IPC share the same CUDA stream, only need to flush one of devChans
if ((remoteRank % nranksPerNode == rank % nranksPerNode) ||
(remoteRank / nranksPerNode == rank / nranksPerNode && rank % nranksPerNode == 0)) {
if ((threadIdx.x % 32) == 0) devChan.flush();
}
__syncthreads();

// Step 2
// local allgather
int otherNghr = (rank + nranksPerNode) % worldSize;
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(devChan, rank, worldSize, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int), false);
}

// cross-node exchange
if (remoteRank % nranksPerNode == rank % nranksPerNode) {
// opposite side
if ((threadIdx.x % 32) == 0)
devChan.putWithSignalAndFlush(
(rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
nelemsPerGPU / pipelineSize * sizeof(int));
devChan.putWithSignal((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
nelemsPerGPU / pipelineSize * sizeof(int));
if ((threadIdx.x % 32) == 0) devChan.wait();
}

__syncthreads();
if ((remoteRank % nranksPerNode == rank % nranksPerNode) ||
(remoteRank / nranksPerNode == rank / nranksPerNode && rank % nranksPerNode == 0)) {
if ((threadIdx.x % 32) == 0) devChan.flush();
}
__syncthreads();

// Step 3
Expand Down
14 changes: 7 additions & 7 deletions test/mscclpp-test/check_perf_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ def load_perf_file(perf_fine: str) -> dict:
with open(perf_fine, 'r') as f:
for line in f:
data = json.loads(line)
res[(data['name'], data['kernel'], data['ranks']),
data['ranksPerNode'], data['size']] = {'algBw': data['algBw'], 'busBw': data['busBw'], 'time': data['time']}
res[(data['name'], data['kernel'], data['ranks'],
data['ranksPerNode'], data['size'])] = {'algBw': data['algBw'], 'busBw': data['busBw'], 'time': data['time']}
if ('target' in data):
res[(data['name'], data['kernel'], data['ranks']),
data['ranksPerNode'], data['size']]['target'] = data['target']
res[(data['name'], data['kernel'], data['ranks'],
data['ranksPerNode'], data['size'])]['target'] = data['target']
return res


Expand All @@ -25,16 +25,16 @@ def check_perf_result(perf_result: dict, baseline: dict, time_threshold: float,
continue
if baseline[key]['target'] == 'latency':
if abs(value['time'] - baseline[key]['time']) / baseline[key]['time'] > time_threshold:
logging.error('time %f not match baseline %f with threshold %f',
logging.error('%s: time %f not match baseline %f with threshold %f', str(key),
value['time'], baseline[key]['time'], time_threshold)
res = False
elif baseline[key]['target'] == 'bandwidth':
if abs(value['algBw'] - baseline[key]['algBw']) / baseline[key]['algBw'] > bandwidth_threshold:
logging.error('algBw %f not match baseline %f with threshold %f',
logging.error('%s: algBw %f not match baseline %f with threshold %f', str(key),
value['algBw'], baseline[key]['algBw'], bandwidth_threshold)
res = False
if abs(value['busBw'] - baseline[key]['busBw']) / baseline[key]['busBw'] > bandwidth_threshold:
logging.error('busBw %f not match baseline %f with threshold %f',
logging.error('%s: busBw %f not match baseline %f with threshold %f', str(key),
value['busBw'], baseline[key]['busBw'], bandwidth_threshold)
res = False
return res
Expand Down

0 comments on commit 5894a59

Please sign in to comment.