Reading Notes: MPI and Message Passing Concurrency
This article is seven years old, but it was never published. I don’t know why.
mpi is not meant to be used. It is a low-level interface designed to build libraries upon, because it is expensive to develop supercomputing libraries tailored to each specific high-performance number-crunching cluster system, so libraries are built on mpi, which is then implemented on the high-performance clusters. This means mpi is meant to be sandwiched between domain-specific libraries and technology-specific cluster implementations.
But we can use mpi directly. I did, and these were the things I learned. They may not be true universally, but they were true for the way I used Openmpi.1 If you want more, this set of very short tutorials is one of the best resources I’ve found. I initially dismissed them because I misunderstood what they were, but they’re a great reference.
Basic Concepts
There are two major architectural patterns supported by mpi:
- Hierarchical
- This is where we have one root process (often with rank 0) coordinating the work of several worker processes.
- Decentralised
- This is when all processes are independent and perform the same computations, except with some clever coordination between them. This pattern was the best fit for my task.
The mpi runtime (mpiexec
) launches several identical instances of our
program, and as the code runs, the instances will eventually execute the same
call to, e.g. MPI_Allgather
. When they do, the mpi implementation does some
magic behind the scences and when the execution moves on, all instances have a
copy of the data on all other instances. Whenever one instance crashes, it
brings all other instances down with it. So while instances are distinct, they
operate as one unit.
To ensure necessary code is run, mpi requires initialisation (with
MPI_Init(&argc, &argv)
) and finalisation (with MPI_Finalize()
).
I should also say that mpi processes are split into indepedent communication
units. So a program consists of communication units, which consists of
processes2 The code can then specify an informal hierarchy over these
processes, but that’s not required or necessarily desired.. The default unit
which all new processes are members of is called MPI_COMM_WORLD
.
The number of processes in a unit can be read with MPI_Comm_size(comm,
&size)
3 Here is where you may, for example, want comm
to be
MPI_COMM_WORLD
.. The communication unit-unique id of the current
process4 A process can be a member of multiple communication units, in which
case the id number may be different in some or all of them. is called its
rank and can be read with with MPI_Comm_rank(comm, &rank)
.
Basic Operations
MPI_Send(start, count, type, rank, tag, comm)
- Send message (blocking operation), where
start
is a pointer to a buffercount
is how many elements to send from buffertype
is data type to read, which can be, e.g.,- Primitive types like
MPI_INT
orMPI_DOUBLE_PRECISION
- An array of primitive types
- Custom datatypes (e.g. array of pairs), definable with mpi functions
- Primitive types like
rank
andcomm
identify recipienttag
allows the recipient to listen for particular types of message
- Send message (blocking operation), where
MPI_Recv(start, count, type, rank, tag, comm, status)
- Receive mesage (blocking) with
MPI_Recv
, same arguments asMPI_Send
with one addition:status
is a pointer to aMPI_Status
object, which will contain stuff like who sent the message and what it was tagged with.count
specifies how many we will accept, not how many we expect!rank
can be ignored by asking for messages fromMPI_ANY_SOURCE
tag
can be ignored by asking for messages taggedMPI_ANY_TAG
- Receive mesage (blocking) with
MPI_Barrier(comm)
- When a process in a communication unit reaches the barrier, it blocks until all other processes in the same communication unit have called the barrier procedure. This can be used to create critical sections5 I used this to make i/o thread safe. by looping through all rank numbers, having a barrier at the end of each iteration, and only letting the process whose rank matches the current iteration perform the critical operation.
Combined Operations
The mpi interface also specifies various abstractions built upon the basic send/receive. These have been useful for my task:
MPI_Bcast(…)
- Sends a value to all processes. We specify the rank of a root process and a buffer. The value in the buffer of the root process gets filled into the buffers of the other processes in the same communication unit.
MPI_Scatter(…)
- If a root possesses a collection of values, which can be split into
sub-collections and processed individually, calling
MPI_Scatter
with the root process specified will divide the collection amoung the available processes in the communcation unit.
- If a root possesses a collection of values, which can be split into
sub-collections and processed individually, calling
MPI_Gather
- In the opposite scenario, assemble sub-collections into a large collection
in the root process.
MPI_Allgather
- does the same as the above but will assemble the large collection in all processes, not just the root process. Imagine it like gathering to the root process and then broadcasting the result to all other processes.
MPI_Reduce
- If, on the other hand, we want to combine the elements of the
collection6 Where “combine” can be e.g. to compute a total sum of the
elements, or a logical conjunction of all elements. and that’s why
we’re thinking about gathering them to a root process, we can instead
use
MPI_Reduce
which will do this combining operation efficiently. MPI_Allreduce
- If we want the result of this combination operation to be available
to all processes,
MPI_Allreduce
is what we want. Again, we can imagine it like combining into a root process, and then broadcasting it to all other processes.
- If we want the result of this combination operation to be available
to all processes,
- If, on the other hand, we want to combine the elements of the
collection6 Where “combine” can be e.g. to compute a total sum of the
elements, or a logical conjunction of all elements. and that’s why
we’re thinking about gathering them to a root process, we can instead
use
- In the opposite scenario, assemble sub-collections into a large collection
in the root process.
Caveats
One problem I ran into several times when writing mpi code was asymmetric
execution of processes. What I mean is that each mpi process should,
essentially, execute the exact same code. The reason for this is that if not all
processes in a communication unit call, e.g., MPI_Allreduce
, the other
processes will hang waiting to rendezvous with the other processes.
If no condition in the code depends on the rank of the running process, this will not be a problem. However, sometimes we need to branch based on rank. When we do, we need to be very careful that all code paths call the same mpi procedures.