Entropic Thoughts

Reading Notes: MPI and Message Passing Concurrency

Reading Notes: MPI and Message Passing Concurrency

This article is seven years old, but it was never published. I don’t know why.


mpi is not meant to be used. It is a low-level interface designed to build libraries upon, because it is expensive to develop supercomputing libraries tailored to each specific high-performance number-crunching cluster system, so libraries are built on mpi, which is then implemented on the high-performance clusters. This means mpi is meant to be sandwiched between domain-specific libraries and technology-specific cluster implementations.

But we can use mpi directly. I did, and these were the things I learned. They may not be true universally, but they were true for the way I used Open​mpi.1 If you want more, this set of very short tutorials is one of the best resources I’ve found. I initially dismissed them because I misunderstood what they were, but they’re a great reference.

Basic Concepts

There are two major architectural patterns supported by mpi:

Hierarchical
This is where we have one root process (often with rank 0) coordinating the work of several worker processes.
Decentralised
This is when all processes are independent and perform the same computations, except with some clever coordination between them. This pattern was the best fit for my task.

The mpi runtime (mpiexec) launches several identical instances of our program, and as the code runs, the instances will eventually execute the same call to, e.g. MPI_Allgather. When they do, the mpi implementation does some magic behind the scences and when the execution moves on, all instances have a copy of the data on all other instances. Whenever one instance crashes, it brings all other instances down with it. So while instances are distinct, they operate as one unit.

To ensure necessary code is run, mpi requires initialisation (with MPI_Init(&argc, &argv)) and finalisation (with MPI_Finalize()).

I should also say that mpi processes are split into indepedent communication units. So a program consists of communication units, which consists of processes2 The code can then specify an informal hierarchy over these processes, but that’s not required or necessarily desired.. The default unit which all new processes are members of is called MPI_COMM_WORLD.

The number of processes in a unit can be read with MPI_Comm_size(comm, &size)3 Here is where you may, for example, want comm to be MPI_COMM_WORLD.. The communication unit-unique id of the current process4 A process can be a member of multiple communication units, in which case the id number may be different in some or all of them. is called its rank and can be read with with MPI_Comm_rank(comm, &rank).

Basic Operations

  • MPI_Send(start, count, type, rank, tag, comm)
    • Send message (blocking operation), where
      • start is a pointer to a buffer
      • count is how many elements to send from buffer
      • type is data type to read, which can be, e.g.,
        • Primitive types like MPI_INT or MPI_DOUBLE_PRECISION
        • An array of primitive types
        • Custom datatypes (e.g. array of pairs), definable with mpi functions
      • rank and comm identify recipient
      • tag allows the recipient to listen for particular types of message
  • MPI_Recv(start, count, type, rank, tag, comm, status)
    • Receive mesage (blocking) with MPI_Recv, same arguments as MPI_Send with one addition:
      • status is a pointer to a MPI_Status object, which will contain stuff like who sent the message and what it was tagged with.
      • count specifies how many we will accept, not how many we expect!
      • rank can be ignored by asking for messages from MPI_ANY_SOURCE
      • tag can be ignored by asking for messages tagged MPI_ANY_TAG
  • MPI_Barrier(comm)
    • When a process in a communication unit reaches the barrier, it blocks until all other processes in the same communication unit have called the barrier procedure. This can be used to create critical sections5 I used this to make i/o thread safe. by looping through all rank numbers, having a barrier at the end of each iteration, and only letting the process whose rank matches the current iteration perform the critical operation.

Combined Operations

The mpi interface also specifies various abstractions built upon the basic send/receive. These have been useful for my task:

  • MPI_Bcast(…)
    • Sends a value to all processes. We specify the rank of a root process and a buffer. The value in the buffer of the root process gets filled into the buffers of the other processes in the same communication unit.
  • MPI_Scatter(…)
    • If a root possesses a collection of values, which can be split into sub-collections and processed individually, calling MPI_Scatter with the root process specified will divide the collection amoung the available processes in the communcation unit.
  • MPI_Gather
    • In the opposite scenario, assemble sub-collections into a large collection in the root process.
      • MPI_Allgather
        • does the same as the above but will assemble the large collection in all processes, not just the root process. Imagine it like gathering to the root process and then broadcasting the result to all other processes.
      • MPI_Reduce
        • If, on the other hand, we want to combine the elements of the collection6 Where “combine” can be e.g. to compute a total sum of the elements, or a logical conjunction of all elements. and that’s why we’re thinking about gathering them to a root process, we can instead use MPI_Reduce which will do this combining operation efficiently.
        • MPI_Allreduce
          • If we want the result of this combination operation to be available to all processes, MPI_Allreduce is what we want. Again, we can imagine it like combining into a root process, and then broadcasting it to all other processes.

Caveats

One problem I ran into several times when writing mpi code was asymmetric execution of processes. What I mean is that each mpi process should, essentially, execute the exact same code. The reason for this is that if not all processes in a communication unit call, e.g., MPI_Allreduce, the other processes will hang waiting to rendezvous with the other processes.

If no condition in the code depends on the rank of the running process, this will not be a problem. However, sometimes we need to branch based on rank. When we do, we need to be very careful that all code paths call the same mpi procedures.