Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coll/han: Implement MPI_Alltoallv in han using SMSC and XPMEM #12747

Merged
merged 1 commit into from
Aug 19, 2024

Conversation

lrbison
Copy link
Contributor

@lrbison lrbison commented Aug 12, 2024

Extension of the previous han MPI_Alltoall algorithm, this change adds MPI_Alltoallv to han for a hierarchy-aware algorithm which uses XPMEM via the SMSC module in order to directly read data from ranks on the same host.

The provides significant speed-up over the basic implementation when small messages are used, as many messages can be coalesced and packed into fewer sends.

Introduces MCA parameters:

  • coll_han_alltoallv_smsc_avg_send_limit
  • coll_han_alltoallv_smsc_noncontig_limit

@lrbison
Copy link
Contributor Author

lrbison commented Aug 12, 2024

Here is some motivating data on a 32-host EFA cluster of hpc7g.16xlarge instances:

image

In particular this change makes small message sizes much faster for Alltoallv. The most improvement should be expected when messages are small, and there are many ranks on a single host.

Consider the scenario of 2 hosts (A and B) with 64 ranks per host transferring 4 byte messages. In total the collective will send 128*128 = 16K messages. The existing basic algorithm will cause 64*64=4K local messages to be sent via SM on both host A and B, and another 4K from host A and the last 4K from host B. After performing an Allgather, this algorithm sends 1 message per rank per host, so host A sends 64 messages to host B and vice versa (and the other 64 messages are "send-to-self" sends). This results in significantly less overhead and contention when compared to the basic algorithm.

However once the message size is large enough, the extra copy incurred by the pack-and-send method used starts to cause mixed results, so I have implemented a size-dependent fallback to the previous implementation. In fact, I find that the size I picked is close to the right answer for small clusters, but could still leave some performance on the table for large clusters (like the 32-node one shown above, which could gain 30% performance between 8KB and 64KB message sizes).

Comment on lines 34 to 38
#if 1
#define DEBUG_PRINT(...) do{ fprintf( stdout, __VA_ARGS__ ); } while( false )
#else
#define DEBUG_PRINT(...) do{ } while ( false )
#endif
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I will remove these in future revision.

Copy link
Member

@bosilca bosilca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left quite a few comments in the PR. In addition to them I would like to understand why it is necessary to heavily manipulate datatypes ? If I understand the code you are aggregating on the source for the same destination, so overall from the same architecture to the same architecture. Thus, you can cut most of the datatype use and simply rely on packed data. Or I might have missed something in the algorithm, in which case please help me understand.

ompi/mca/coll/han/coll_han_alltoall.c Outdated Show resolved Hide resolved
ompi/mca/coll/han/coll_han_alltoall.c Outdated Show resolved Hide resolved
}
buf += length;

if( type->opt_desc.used ) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The opt_desc is only valid in homogeneous cases. Otherwise, the desc shall be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any insight you can offer here would be appreciated. I admit I don't understand all the corner cases for datatypes and how they are stored.

We are only serializing and de-serializing these datatypes on the low communicator (ranks on the same host), so that particular exchange is guaranteed to be homogeneous.

What precisely does homogeneous mean here? Does it mean Open MPI was compiled to support non-homogeneous runs? Does it mean that at runtime we have detected non-homogeneous configuration?

If there is a flag that all ranks will have set, I can easily check that flag and fall back to the basic implementation.

I am also attempting to use the non-opt desc in the following block, but don't think any of my testing has yet exercised that path.

ompi/mca/coll/han/coll_han_alltoallv.c Outdated Show resolved Hide resolved
ompi/mca/coll/han/coll_han_alltoallv.c Outdated Show resolved Hide resolved
ompi/mca/coll/han/coll_han_alltoallv.c Outdated Show resolved Hide resolved
Extension of the previous han MPI_Alltoall algorithm, this change adds
MPI_Alltoallv to han for a hierarchy-aware algorithm which uses XPMEM
via the SMSC module in order to directly read data from ranks on the
same host.

The provides significant speed-up over the basic implementation when
small messages are used, as many messages can be coalesced and packed
into fewer sends.

Introduces MCA parameters:
 - coll_han_alltoallv_smsc_avg_send_limit
 - coll_han_alltoallv_smsc_noncontig_limit

Signed-off-by: Luke Robison <[email protected]>
@lrbison
Copy link
Contributor Author

lrbison commented Aug 13, 2024

I would like to understand why it is necessary to heavily manipulate datatypes

@bosilca

Sure. The motivation is to have every rank on one host be able to mmap local sendbufs and therefore bypass a bunch of local messaging, and coalesce sends into big enough chunks to max out the NIC. I am not attempting to change data types, just to share enough of the datatype description among the ranks in the low communicator so that opal_convertor_pack can be called.

I chose to assign the packing work to the sending rank, which means that the sending rank must understand how to pack not just its own send datatype, but also the other low-comm ranks' send-datatypes. To do this I leaned on some code you and Joseph shared a few months back. Using that I can serialize the opal datatypes of each rank and share them among the low-comm, and deserialize each of them on the other side without any modification. Now that the sending rank understand the data type of all low-comm ranks, it packs the data from each of them into a series of buffers, and sends those buffers as MPI_PACKED datatype.

On the receiving side, the data is unpacked using only the user-provided recv datatype since all data received is of the same type.

Some alternatives:

  • Rather than have the sender do packing work, instead expose the send buffers and make it the job of the originating ranks to pack their data into the sender's waiting bounce buffers. This is similar to "push mode" in my alltoall PR. It requires additional synchronization which will be even more costly here where I use many small in-flight buffers rather than one big buffer as in alltoall.
  • send() the data between low-comm ranks and ignore SMSC. But this incurs lots of additional message overhead which I found to be costly, and the intermediate rank must still know how many bytes it must receive especially if we intend to post any of those recvs in parallel.
  • send() the data directly to where it goes. That's just coll_basic_alltoallv.

@bosilca
Copy link
Member

bosilca commented Aug 14, 2024

Rather than have the sender do packing work, instead expose the send buffers and make it the job of the originating ranks to pack their data into the sender's waiting bounce buffers. This is similar to "push mode" in my alltoall PR. It requires additional synchronization which will be even more costly here where I use many small in-flight buffers rather than one big buffer as in alltoall.

Yes, but instead of sequentialization the datatype creation and packing you can get them working in parallel. Technically, each process gets the packed size of the local datatype (mixed with the count), then do a scan on the local comm to get the starting point, and then all local processes in parallel do the pack onto the target architecture. I understand that all processes will have to do this for a different set of data, but at least this will save the time for creating a bunch of datatypes on each process.

Here I assume that even if the job is heterogeneous, all processes on the same node are homogeneous. This makes the convertor identical on all processes that participate to a local packing and then unpacking. A similar approach can be used on the receiver side, allowing all processes to unpack in parallel.

@lrbison
Copy link
Contributor Author

lrbison commented Aug 14, 2024

I probably should write up a diagram to help visualize how the data movement happens. For now here is some text: Let's consider M ranks per host. The algorithm is executed in rounds, with as many rounds as hosts. Let us consider a particular round where the host with rank 0 is sending to another host which has rank X, where X=M*<integer>. Simultaneously Rank X will be packing and sending back to rank 0.

Yes, but instead of sequentialization the datatype creation and packing...

Data movement is already in parallel. While rank 0 is packing data from 0:M to send to rank X+0, rank K is also packing data from rank 0:M to send to rank X+K (for K on the same host as 0, and X being the first rank on some remote host).

You are right though that we are making a bunch of extra opal datatypes. However I had some debug code that skipped all the actual packing and sending, and the rest of the overhead was quite small (I don't have the numbers), so I concluded that overhead is not worth concerning myself over.

On the receiving in parallel: The sender (rank K) has already done the gather work across ranks, collecting data which ranks 0-M need to get to the receiver (X+K). The receiver therefore only needs to unpack and "scatter" it across it's own recv buffer, according to the offsets the user provided.

Technically, each process gets the packed size of the local datatype (mixed with the count), then do a scan on the local comm to get the starting point

This is true, but made more tricky by the fact that my implementation doesn't need to allocate enough packing buffers to hold all the data, instead it continually reuses a set of 8 buffers of 128KB each. This means we'll have to notify local ranks each time a new buffer is ready for packing.

Here I assume that even if the job is heterogeneous, all processes on the same node are homogeneous

Agreed. But does that mean that desc_opt is necessarily available?

@lrbison
Copy link
Contributor Author

lrbison commented Aug 14, 2024

Here is a quick picture:

image

@bosilca
Copy link
Member

bosilca commented Aug 15, 2024

What happened to Rank 2 ? The unlucky number if supposed to be 13 !?!?!

@bosilca
Copy link
Member

bosilca commented Aug 15, 2024

Agreed. But does that mean that desc_opt is necessarily available?

Yes, if an optimized description exists then opt_desc will point to it , otherwise 'opt_descwill point back todesc. We can argue about what an optimized description` is, for me it anything that will lead to more regular or lesser memory copies.

@bosilca bosilca merged commit f2efd55 into open-mpi:main Aug 19, 2024
13 of 14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants