Examples Language:

Introduction to MPI

Message Passing Interface

A library for writing distributed applications.

Outline

  • Basic Concepts
  • Basic MPI Functions
  • Point-to-point Communication
  • Collective Communications
  • Timing

Intermediate/Advanced Topics

  • Third Party Libraries
  • Dynamic Memory Allocation
  • Asynchronous Communication
  • Domain Decomposition
  • Convergence and Global Communications
  • Testing
Examples Language:

Basic Concepts

Threads vs Processes

Both processes and threads are independent sequences of execution. The typical difference is that threads (of the same process) run in a shared memory space, while processes run in separate memory spaces.

-- StackOverflow

MPI runs many programs (processes) and easily configures communications amongst them.

mpirun

This program starts all the programs that should run. Typically you provide it your program and it runs it n times.

Here I show how mpirun just runs other programs for you.

In [2]:
! echo '-ls-'; ls -lart; echo
! echo '-date-'; date; echo
! echo '-mpirun-'; mpirun -n 1 ls -lart : -n 1 date; echo
-ls-
total 1312
drwxr-sr-x 15 yannpaul scv  32768 May 15 11:14 ..
drwxr-sr-x  2 yannpaul scv    512 May 15 12:57 .ipynb_checkpoints
drwxr-sr-x  5 yannpaul scv    512 May 20 12:35 RISE
drwxr-sr-x  5 yannpaul scv    512 May 29 15:30 intro
-rw-r--r--  1 yannpaul scv     59 May 29 15:31 .gitignore
-rw-r--r--  1 yannpaul scv    229 Jun  1 10:51 Makefile
-rw-r--r--  1 yannpaul scv 334420 Jun  1 10:52 slides.html
-rw-r--r--  1 yannpaul scv   1049 Jun  1 10:57 conda_requirements.txt
-rw-r--r--  1 yannpaul scv      7 Jun  1 10:58 requirements.txt
-rw-r--r--  1 yannpaul scv    661 Jun  1 11:33 README
drwxr-sr-x  2 yannpaul scv    512 Jun  1 11:53 legacy
drwxr-sr-x  8 yannpaul scv  32768 Jun  3 16:53 .git
drwxr-sr-x  2 yannpaul scv    512 Sep 18 11:57 __pycache__
drwxr-sr-x 14 yannpaul scv  32768 Sep 18 12:07 present_env
-rw-r--r--  1 yannpaul scv   2809 Sep 18 15:13 examples.py
-rw-r--r--  1 yannpaul scv   2481 Sep 18 15:13 examples.pyc
-rw-r--r--  1 yannpaul scv    328 Sep 18 15:32 #html_template.tpl#
-rw-r--r--  1 yannpaul scv   6145 Sep 18 16:10 Untitled.ipynb
-rw-r--r--  1 yannpaul scv    448 Sep 21 09:52 example.tpl
-rw-r--r--  1 yannpaul scv 394862 Sep 21 09:53 introduction_to_mpi.html
-rw-r--r--  1 yannpaul scv 217826 Sep 21 09:54 introduction_to_mpi.ipynb
drwxr-sr-x  9 yannpaul scv  32768 Sep 21 09:54 .

-date-
Mon Sep 21 09:56:37 EDT 2015

-mpirun-
Mon Sep 21 09:56:37 EDT 2015
total 1312
drwxr-sr-x 15 yannpaul scv  32768 May 15 11:14 ..
drwxr-sr-x  2 yannpaul scv    512 May 15 12:57 .ipynb_checkpoints
drwxr-sr-x  5 yannpaul scv    512 May 20 12:35 RISE
drwxr-sr-x  5 yannpaul scv    512 May 29 15:30 intro
-rw-r--r--  1 yannpaul scv     59 May 29 15:31 .gitignore
-rw-r--r--  1 yannpaul scv    229 Jun  1 10:51 Makefile
-rw-r--r--  1 yannpaul scv 334420 Jun  1 10:52 slides.html
-rw-r--r--  1 yannpaul scv   1049 Jun  1 10:57 conda_requirements.txt
-rw-r--r--  1 yannpaul scv      7 Jun  1 10:58 requirements.txt
-rw-r--r--  1 yannpaul scv    661 Jun  1 11:33 README
drwxr-sr-x  2 yannpaul scv    512 Jun  1 11:53 legacy
drwxr-sr-x  8 yannpaul scv  32768 Jun  3 16:53 .git
drwxr-sr-x  2 yannpaul scv    512 Sep 18 11:57 __pycache__
drwxr-sr-x 14 yannpaul scv  32768 Sep 18 12:07 present_env
-rw-r--r--  1 yannpaul scv   2809 Sep 18 15:13 examples.py
-rw-r--r--  1 yannpaul scv   2481 Sep 18 15:13 examples.pyc
-rw-r--r--  1 yannpaul scv    328 Sep 18 15:32 #html_template.tpl#
-rw-r--r--  1 yannpaul scv   6145 Sep 18 16:10 Untitled.ipynb
-rw-r--r--  1 yannpaul scv    448 Sep 21 09:52 example.tpl
-rw-r--r--  1 yannpaul scv 394862 Sep 21 09:53 introduction_to_mpi.html
-rw-r--r--  1 yannpaul scv 217826 Sep 21 09:54 introduction_to_mpi.ipynb
drwxr-sr-x  9 yannpaul scv  32768 Sep 21 09:54 .

This is a silly example, because the running programs don't take care of mpirun's communication protocols. We'll learn today how to interact with mpirun to get information about the other processes. When using mpirun you don't have to worry about where your processes are exectued, and mpirun puts them on multiple machines depending on how they are configured.

libmpi

You incorporate the MPI library in your code, allowing it to interact with mpirun and other instances of your application.

Why MPI?

Basically the same answers to Why parallelize my code?

  • Divide and conquer
  • Do bigger runs
  • Do higher quality runs
  • Access more memory
  • Better portability*

Specific to MPI:

  • Abstract away communication technologies
  • Long track record of use

Implementations

One specification, many implementations:

  • OpenMPI (on SCC)
  • MPICH2 (maintained by ANL)
  • MVAPICH2 (derivative of MPICH2)
  • Vendor implementations (Intel, SGI, IBM, …)

MPI Basics

Accessing the library

C:

#include "mpi.h"

C++:

#include "boost/mpi/environment.hpp"
#include "boost/mpi/communicator.hpp"

namespace mpi = boost::mpi;

boost::mpi

Fortran:

include mpif.h

Python

from mpi4py import MPI

MPI_Init and MPI_Finalize

MPI_Init -- called before any other MPI library routine. Initializes communication with mpirun

MPI_Finalize -- called at the end of MPI part of code, signals to mpirun that this process is all done.

Typically they are the first and last things called in an application.

note - python takes care of calls to these routines for you when you import the library and when your program ends.

MPI routines

  • C, C++, and Python are case sensitive, for example exactly MPI_Init()
  • Fortran is not
  • For C and C++, the return values are integers and the correspond to error codes.
  • For Fortran, the methods are all subroutines and the error code is an argument
  • For Python, the entire interface is object oriented and the errors are raised Exceptions

Messaging Basics

Communication between processes involves:

  • data, an array of standard types or user defined structured types
  • message envelope -- source, destination, tag, communicator; more on all this soon

Modes of Communication

  • Point-to-point Communication

    • blocking - wait till communication is done
    • non-blocking - let it happen in the background, go on with other work
  • Collective Communication - several forms of organized, group communications

Standard Data Types

C Type MPI Name
int MPI_INT
unsigned int MPI_UNSIGNED
float MPI_FLOAT
double MPI_DOUBLE
char MPI_CHAR
Fortran Type MPI Name
INTEGER MPI_INTEGER
REAL MPI_READ
DOUBLE MPI_DOUBLE_PRECISION
COMPLEX MPI_COMPLEX
CHARACTER MPI_CHARACTER

python inspects your data and handles this information for you.

MPI Data Types

MPI_PACKED, MPI_BYTE -- fine-grained control of data transfer

user defined types, i.e. structures.

Example 1: Integration

How do we perform this integral in parallel

$\int\limits_a^b cos(x) dx$

Well we make it the sum of the integrals of part of the space (p domains):

$\sum\limits_{i=0}^{p-1}\int\limits_{a_i}^{a_{i+1}} cos(x) dx$

Each integral can be solved numerically using the midpoint rule (n points per domain):

$\sum\limits_{i=0}^{p-1}\sum\limits_{j=0}^{n-1} cos(a_{ij}) * h$

Where $h$ is the step size and $a_{ij}$ is the position:

$h = (a - b) / n / p$

$a_{ij} = a + i * n * h + (j + 0.5) * h$

Serial Version

File: intro/basics/F77/example1.f

13       implicit none
14       integer n, p, i, j
15       real h, integral_sum, a, b, integral, pi, ai
16 
17       pi = acos(-1.0)   !  = 3.14159...
18       a = 0.0           ! lower limit of integration
19       b = pi*1./2.      ! upper limit of integration
20       p = 4             ! number of processes (partitions)
21       n = 500           ! number of increment within each process
22       h = (b-a)/n/p     ! length of increment
23 
24       integral_sum = 0.0      ! stores answer to the integral
25       do i=0,p-1        ! sum of integrals over all processes
26         ai = a + i*n*h  ! lower limit of integration for partition i
27         integral_sum = integral_sum + integral(ai,h,n)
28       enddo
29 
30       print *,'The integral_sum =', integral_sum
31 
32       end
33       
34       real function integral(ai, h, n)
35       implicit none
36       integer n, j
37       real h, ai, aij
38 
39       integral = 0.0                ! initialize integral
40       do j=0,n-1                    ! sum integrals
41         aij = ai +(j+0.5)*h         ! abscissa mid-point
42         integral = integral + cos(aij)*h
43       enddo
44 
45       return

File: intro/basics/C/example1.c

20       int n, p, i, j, ierr;
21       float h, integral_sum, a, b, pi, my_int, ai;
22 
23       pi = acos(-1.0);  /* = 3.14159... */
24       a = 0.;           /* lower limit of integration */
25       b = pi*1./2.;     /* upper limit of integration */
26       p = 4;            /* number of processes (partitions) */
27       n = 500;          /* number of increment within each process */
28       h = (b-a)/n/p;    /* length of increment */
29 
30       integral_sum = 0.0;
31       
32       /* sum of integrals over all processes */
33       for (i=0; i<p; i++)  {
34         ai = a + i*n*h; /* lower limit of integration for partition i */
35 	integral_sum += integral(ai,h,n);
36       }
37 
38       printf("The integral sum =%f\n",integral_sum);
39 }
40 
41 float integral(float ai, float h, int n) {
42       int j;
43       float aij, integ;
44 
45       integ = 0.0;                 /* initialize */
46       for (j=0;j<n;j++) {          /* sum integrals */
47         aij = ai + (j+0.5)*h;      /* mid-point */
48         integ += cos(aij)*h;
49       }
50       return integ;

File: intro/basics/C++/example1.cc

16   int n, p, i;
17   float h, result, a, b, pi, ai;
18 
19   pi = acos(-1.0);  /* = 3.14159... */
20   a = 0.;           /* lower limit of integration */
21   b = pi*1./2.;     /* upper limit of integration */
22   p = 4;
23   n = 500;          /* number of increment within each process */
24   h = (b-a)/n/p;    /* length of increment */
25   
26   result = 0.0;
27   for (i=0; i<p; i++)  {
28     ai = a + i*n*h;  /* lower limit of integration for partition myid */
29     result += integral(ai, h, n, cos_wrapper);
30   }
31 
32     std::cout << "The result = "
33 	      << std::setiosflags(std::ios::fixed)
34 	      << std::setprecision(4) << result << std::endl;
35 }

File: intro/basics/python/example1.py

 1 from math import cos, pi
 2 
 3 def main():
 4     lower_limit = 0.0       
 5     upper_limit = pi * 0.5  
 6     npartitions = 4
 7     partition_nsteps = 500
 8     step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
 9 
10     integral_sum = 0.0
11     for i in range(npartitions):
12         start = lower_limit + i * partition_nsteps * step
13         integral_sum += integral(start, step, partition_nsteps)
14 
15     print "The integral sum =", integral_sum
16     
17 
18 def integral(start, step, nsteps):
19     total = 0
20     for i in range(nsteps):
21         pos = start + (i + 0.5) * step
22         total += cos(pos) * step
23     return total
24 
25 if __name__ == "__main__":

Parallel Integration

This is an example of Single Program Multiple Data parallelization. The single program is the inegral function. The multiple data are the limits of each partition.

To parallelize this appliation we'll use:

MPI_Init, MPI_Finalize -- start things up and then stop them

MPI_Comm_rank -- what is the ID (called rank) of the current process

MPI_Comm_size -- how many processes are running

MPI_Send, MPI_Recv -- send and recieve a buffer of information

MPI uses communicators to organize who is getting what. Each communicator is associated with a Group of processes.

MPI_COMM_WORLD is a communicator (id or object depending on language) that represents all the processes. It's there by default.

Later we'll discuss making your own Group's and their corresponding Communicator's

File: intro/basics/F77/example1_1.f

15       implicit none
16       integer n, p, i, j, proc, ierr, master, myid, tag, comm
17       real h, a, b, integral, pi, ai, my_int, integral_sum
18       include "mpif.h"  ! brings in pre-defined MPI constants, ...
19       integer status(MPI_STATUS_SIZE)  ! size defined in mpif.h
20       data master/0/    ! processor 0 collects integral sums from other processors
21 
22       comm = MPI_COMM_WORLD      
23       call MPI_Init(ierr)                        ! starts MPI
24       call MPI_Comm_rank(comm, myid, ierr)      ! get current proc ID
25       call MPI_Comm_size(comm, p, ierr)         ! get number of procs

File: intro/basics/C/example1_1.c

 1 #include <mpi.h>
 2 #include <math.h>
 3 #include <stdio.h>
 4 
 5 float integral(float ai, float h, int n);
 6 
 7 void main(int argc, char* argv[]){
 8 /*###########################################################################
 9  #                                                                             
10  # This is an MPI example on parallel integration to demonstrate the use of:   
11  #                                                                             
12  # * MPI_Init, MPI_Comm_rank, MPI_Comm_size, MPI_Finalize                      
13  # * MPI_Recv, MPI_Send                                                        
14  #                                                                             
15  # Dr. Kadin Tseng                                                             
16  # Scientific Computing and Visualization                                      
17  # Boston University                                                           
18  # 1998                                                                        
19  #                                                                             
20  # Updated 2015 by Yann Tambouret
21  #
22  ###########################################################################*/
23 
24       int n, p, myid, tag, proc, ierr;
25       float h, integral_sum, a, b, ai, pi, my_int;
26       int master = 0;  /* processor performing total sum */
27       MPI_Comm comm;
28       MPI_Status status;
29 
30       comm = MPI_COMM_WORLD;      
31       ierr = MPI_Init(&argc,&argv);           /* starts MPI */
32       MPI_Comm_rank(comm, &myid);            /* get current process id */
33       MPI_Comm_size(comm, &p);               /* get number of processes */

File: intro/basics/C++/example1_1.cc

 3 #include "boost/mpi/environment.hpp"
 4 #include "boost/mpi/communicator.hpp"
 5 #include <iostream>
 6 #include <iomanip>
 7 #include "integral.h"
 8 #include <math.h>
 9 
10 namespace mpi = boost::mpi;
11 
12 float cos_wrapper(float pos) {
13   // wrap cos becuase it's double precission
14   return cos(pos);
15 }
16 
17 int main() {
18 
19   int n, p, i;
20   float h, result, a, b, pi, ai, myint;
21   
22   pi = acos(-1.0);  /* = 3.14159... */
23   a = 0.;           /* lower limit of integration */
24   b = pi*1./2.;     /* upper limit of integration */
25   n = 500;          /* number of increment within each process */
26 
27   int myid;
28   float my_int;
29   int tag=123, master=0;
30   mpi::environment env;
31   mpi::communicator world;
32   p = world.size();
33   myid = world.rank();

File: intro/basics/python/example1_1.py

1 from mpi4py import MPI
2 from math import cos, pi
3 
4 def main():
5     master = 0
6     comm = MPI.COMM_WORLD
7     myid = comm.Get_rank()
8     npartitions = comm.Get_size()

File: intro/basics/F77/example1_1.f

27       pi = acos(-1.0)   !  = 3.14159...
28       a = 0.0           ! lower limit of integration
29       b = pi/2.         ! upper limit of integration
30       n = 500           ! number of increment within each process
31       tag = 123         ! set the tag to identify this particular job
32       h = (b-a)/n/p     ! length of increment
33 
34       ai = a + myid*n*h ! lower limit of integration for partition myid
35       my_int = integral(ai, h, n) 
36       write(*,"('Process ',i2,' has the partial sum of',f10.6)")

File: intro/basics/C/example1_1.c

35       pi = acos(-1.0);  /* = 3.14159... */
36       a = 0.;           /* lower limit of integration */
37       b = pi*1./2.;     /* upper limit of integration */
38       n = 500;          /* number of increment within each process */
39       tag = 123;        /* set the tag to identify this particular job */
40       h = (b-a)/n/p;    /* length of increment */
41 
42       ai = a + myid*n*h;  /* lower limit of integration for partition myid */
43       my_int = integral(ai, h, n);   /* 0<=myid<=p-1 */

File: intro/basics/C++/example1_1.cc

37   my_int = integral(ai, h, n, cos_wrapper);

File: intro/basics/python/example1_1.py

10     lower_limit = 0.0
11     upper_limit = pi * 0.5  
12     tag = 123
13     partition_nsteps = 500
14     step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
15 
16     start = lower_limit + myid * partition_nsteps * step
17     my_integral = integral(start, step, partition_nsteps)
18     print "Process {0:2d} has the partial sum of {1:10.6f}".format(
19         myid, my_integral)

File: intro/basics/F77/example1_1.f

39       call MPI_Send(  
40      &     my_int, 1, MPI_REAL,   ! buffer, size, datatype
41      &     master,     ! where to send message
42      &     tag,         ! message tag

File: intro/basics/C/example1_1.c

47       MPI_Send( 
48               &my_int, 1, MPI_FLOAT, 
49               master,        /* message destination */
50               tag,           /* message tag */
51               comm);

File: intro/basics/C++/example1_1.cc

39   world.send(0, tag, my_int);

File: intro/basics/python/example1_1.py

21     comm.send(my_integral, dest=master, tag=tag)

File: intro/basics/F77/example1_1.f

45       if(myid .eq. master) then      ! do following only on master ...
46         integral_sum = 0.0           ! initialize integral_sum to zero
47         do proc=0,p-1   ! loop on processors to collect local sum
48           call MPI_Recv( 
49      &       my_int, 1, MPI_REAL, 
50      &       proc,     ! message source
51      &       tag,         ! message tag
52      &       comm, status, ierr)        ! status reports source, tag
53           integral_sum = integral_sum + my_int   ! sum my_int from processors
54         enddo
55         print *,'The integral =',integral_sum
56       endif

File: intro/basics/C/example1_1.c

53       if(myid == master) {  /* Receives serialized */
54         integral_sum = 0.0;
55         for (proc=0;proc<p;proc++) {
56           MPI_Recv( 
57                 &my_int, 1, MPI_FLOAT, 
58                 proc,        /* message source */
59                 tag,         /* message tag */
60                 comm, &status);     /* status reports source, tag */
61           integral_sum += my_int;
62         }
63         printf("The Integral =%f\n",integral_sum); /* sum of my_int */
64       }

File: intro/basics/C++/example1_1.cc

40   if (myid == master) {
41     result = 0;
42     for (i=0; i<p; i++) {
43       world.recv(i, tag, myint);
44       result += myint;
45     }
46     std::cout << "The result = "
47 	      << std::setiosflags(std::ios::fixed)
48 	      << std::setprecision(4) << result << std::endl;
49   }

File: intro/basics/python/example1_1.py

23     if myid == master:
24         integral_sum = 0.0
25         for i in range(npartitions):
26             integral_sum += comm.recv(source=i, tag=tag)
27         print "The integral sum =", integral_sum

What value should master be?

Hint: What does master represent, and then what value can it be?

Why might this code fail?

Hint 1: go through the code as if your ID is 0.

Hint 2: What needs to happen for a MPI_Send to be successful

Self Messaging

You can send messages to yourself. Since, when you send info, a temporary buffer is used under the hood and this is not necissary when sending to yourself, it's traditionally been up to the MPI vendor whether or not to actually use a buffer for self messaging. Historically this has ment that this process is flaky, but now it is part of the MPI standard and should be supported universally.

How to Compile

In [45]:
!mpif77 -o intro/basics/F77/example1_1.x intro/basics/F77/example1_1.f
In [46]:
!mpif77 --show
gfortran -I/usr/local/IT/mpi/openmpi-1.6.4/gcc-4.4/include -pthread -Wl,-rpath -Wl,/usr/local/IT/mpi/openmpi-1.6.4/gcc-4.4/lib -L/usr/local/IT/mpi/openmpi-1.6.4/gcc-4.4/lib -lmpi_f77 -lmpi -ldl -lm -lnuma -Wl,--export-dynamic -lrt -lnsl -lutil -lm -ldl
language mpi compiler
C mpicc
fortran 77 mpif77
fortran 90 mpif90
C++ mpicxx
mpic++
mpiCC

How to Run/Example Output

  1. mpirun
  2. -np 4 -- replace '4' with the number of processes you want.

    MPI will take care of where they run. There's an automatic way of placing them on different nodes and there's a ton of options for controlling where they end up.

  3. intro/basics/F77/example1_1.x -- your application

In [47]:
!mpirun -np 4 intro/basics/F77/example1_1.x
Process  0 has the partial sum of  0.382684
Process  1 has the partial sum of  0.324423
Process  2 has the partial sum of  0.216773
Process  3 has the partial sum of  0.076120
 The integral =   1.0000001    

Python:

mpirun -np 4 python intro/basics/python/example1_1.py

Note:

The standard python on SCC is missing mpi4py, so you need to load a module to access this library, for example:

module load anacoda/2.2.0

C++:

You need the properly compiled boost library, which on SCC is:

module load boost/1.58.0

Practice 1: hello mpi world

Use MPI_Comm_Rank MPI_Comm_Size to figure out processor id and number of processors, then print out hello message:

Hello world from 0 of 4
Hello world from 1 of 4
Hello world from 2 of 4
Hello world from 3 of 4

You will need:

  1. to access mpi library (import, include, etc.)
  2. to Initialize and Finalize (if required by the language)
  3. to use MPI_Comm_Rank to get id number of current processors
  4. to use MPI_Comm_Size to get number of processors

Saving time Sending

Does master need to send info to itself?

File: intro/basics/F77/example1_2.f

15       implicit none
16       integer n, p, i, j, proc, ierr, master, myid, tag, comm
17       real h, a, b, integral, pi, ai, my_int, integral_sum
18       include "mpif.h"  ! brings in pre-defined MPI constants, ...
19       integer status(MPI_STATUS_SIZE)  ! size defined in mpif.h
20       data master/0/    ! processor 0 collects integral sums from other processors
21 
22       comm = MPI_COMM_WORLD      
23       call MPI_Init(ierr)                        ! starts MPI
24       call MPI_Comm_rank(comm, myid, ierr)      ! get current proc ID
25       call MPI_Comm_size(comm, p, ierr)         ! get number of procs
26 
27       pi = acos(-1.0)   !  = 3.14159...
28       a = 0.0           ! lower limit of integration
29       b = pi/2.         ! upper limit of integration
30       n = 500           ! number of increment within each process
31       tag = 123         ! set the tag to identify this particular job
32       h = (b-a)/n/p     ! length of increment
33 
34       ai = a + myid*n*h ! lower limit of integration for partition myid
35       my_int = integral(ai, h, n) 
36       write(*,"('Process ',i2,' has the partial sum of',f10.6)")
37      &  myid,my_int
38 
39       if(myid .eq. master) then      ! do following only on master ...
40         integral_sum = my_int           !starts with value on master 
41         do proc=1,p-1   ! loop on procs to collect local sums, exclude master
42           call MPI_Recv( 
43      &       my_int, 1, MPI_REAL, 
44      &       proc,        ! message source
45      &       tag,         ! message tag
46      &       comm, status, ierr)    ! status reports source, tag
47           integral_sum = integral_sum + my_int   ! sum my_int from processors
48         enddo
49         print *,'The integral =',integral_sum
50       else
51         call MPI_Send(  
52      &     my_int, 1, MPI_REAL,   ! buffer, size, datatype
53      &     master,     ! where to send message
54      &     tag,         ! message tag
55      &     comm, ierr)
56       endif
57 
58       call MPI_Finalize(ierr)                          ! MPI finish up ...

File: intro/basics/C/example1_2.c

30       comm = MPI_COMM_WORLD;      
31       ierr = MPI_Init(&argc,&argv);           /* starts MPI */
32       MPI_Comm_rank(comm, &myid);            /* get current process id */
33       MPI_Comm_size(comm, &p);               /* get number of processes */
34 
35       pi = acos(-1.0);  /* = 3.14159... */
36       a = 0.;           /* lower limit of integration */
37       b = pi*1./2.;     /* upper limit of integration */
38       n = 500;          /* number of increment within each process */
39       tag = 123;        /* set the tag to identify this particular job */
40       h = (b-a)/n/p;    /* length of increment */
41 
42       ai = a + myid*n*h;  /* lower limit of integration for partition myid */
43       my_int = integral(ai, h, n);   /* 0<=myid<=p-1 */
44 
45       printf("Process %d has the partial integral of %f\n", myid,my_int);
46 
47       if(myid == master) {  /* Receives serialized */
48         integral_sum = my_int;
49         for (proc=1;proc<p;proc++) {
50           MPI_Recv(&my_int, 1, MPI_FLOAT, 
51 		   proc,        /* message source */
52 		   tag,         /* message tag */
53 		   comm,
54 		   &status);    /* status reports source, tag */
55           integral_sum += my_int;
56         }
57         printf("The Integral =%f\n",integral_sum); /* sum of my_int */
58       } else {
59 	MPI_Send(&my_int, 1, MPI_FLOAT, 
60 		 master,        /* message destination */
61 		 tag,           /* message tag */
62 		 comm);
63       }
64       
65       MPI_Finalize();                        /* let MPI finish up ... */

File: intro/basics/C++/example1_2.cc

22   pi = acos(-1.0);  /* = 3.14159... */
23   a = 0.;           /* lower limit of integration */
24   b = pi*1./2.;     /* upper limit of integration */
25   n = 500;          /* number of increment within each process */
26 
27   int myid;
28   float my_int;
29   mpi::environment env;
30   mpi::communicator world;
31   p = world.size();
32   myid = world.rank();
33   h = (b-a)/n/p;    /* length of increment */
34   ai = a + myid*n*h;  /* lower limit of integration for partition myid */
35 
36   my_int = integral(ai, h, n, cos_wrapper);
37 
38   if (myid == master) {
39     result = my_int;
40     for (i=1; i<p; i++) {
41       world.recv(i, tag, myint);
42       result += myint;
43     }
44     std::cout << "The result = "
45 	      << std::setiosflags(std::ios::fixed)
46 	      << std::setprecision(4) << result << std::endl;
47   } else {
48     world.send(0, tag, my_int);
49   }

File: intro/basics/python/example1_2.py

 4 def main():
 5     master = 0
 6     comm = MPI.COMM_WORLD
 7     myid = comm.Get_rank()
 8     npartitions = comm.Get_size()
 9 
10     lower_limit = 0.0
11     upper_limit = pi * 0.5  
12     tag = 123
13     partition_nsteps = 500
14     step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
15 
16     start = lower_limit + myid * partition_nsteps * step
17     my_integral = integral(start, step, partition_nsteps)
18     print "Process {0:2d} has the partial sum of {1:10.6f}".format(
19         myid, my_integral)
20 
21     if myid == master:
22         integral_sum = my_integral
23         for i in range(1, npartitions):
24             integral_sum += comm.recv(source=i, tag=tag)
25         print "The integral sum =", integral_sum
26     else:
27         comm.send(my_integral, dest=master, tag=tag)

Practice 2: hello mpi world, from master

  1. Like practice 1, find out id and number of processes
  2. Send from master to all the other processes a messeage: "Hello world from master!"
  3. Have each process print out the message with it's id: "From 2, received message: 'Hello world from master!'"

How can we speed up the collecting of results?

Hint: what order are the results collected?

Saving time Receiving, Saving time Sending

Asynchronous or non-blocking communication is possible. Normally when you send or receive, the call/function waits until the operation is complete. You can use alternative methods to return right away so you can continue to work while the data is sent/received behind the seen.

Old Method Non-blocking Method
MPI_Send MPI_Isend
MPI_Recv MPI_Irecv

You should be careful not to use the buffers involved in these non-blocking communications while the tasks are on going.

You can use MPI_Probe to see if there is any communication on going and MPI_Wait to wait until communication is complete.

With MPI-3, there are now non-blocking collective communcation, a somewhat more advanced topic.

File: intro/basics/F77/example1_3.f

39       if(myid .eq. master) then               ! the following is serial
40         integral_sum = my_int
41         do proc=1,p-1
42           call MPI_Recv( 
43      &       my_int, 1, MPI_REAL, 
44      &       MPI_ANY_SOURCE,     ! message source
45      &       MPI_ANY_TAG,         ! message tag
46      &       comm, status, ierr)     ! status identifies source, tag
47           integral_sum = integral_sum + my_int
48         enddo
49         write(*,*)'The Integral =', integral_sum   ! sum of my_int
50       else
51         call MPI_Isend(  
52      &     my_int, 1, MPI_REAL,   ! buffer, size, datatype
53      &     master, tag,           ! destination and tag
54      &     comm, request, ierr)   ! get handle for MPI_Wait to check status
55         call other_work(myid)     ! because of Isend, gets here immediately
56         call MPI_Wait(request, status, ierr)  ! block until Isend is done
57       endif
58 
59       call MPI_Finalize(ierr)                  ! let MPI finish up ...

File: intro/basics/C/example1_3.c

50       if(myid == master) {
51         integral_sum = my_int;
52         for (proc=1;proc<p;proc++) {
53           MPI_Recv(  
54                 &my_int, 1, MPI_FLOAT, 
55                 MPI_ANY_SOURCE,      /* message source */
56                 MPI_ANY_TAG,         /* message tag */
57                 comm, &status);       /* status identifies source, tag */
58           integral_sum += my_int;
59         }
60         printf("The Integral =%f\n",integral_sum); /* sum of my_int */
61       }
62       else {
63         MPI_Isend(&my_int, 1, MPI_FLOAT, master, tag, 
64                       comm, &request);      /* send my_int to master */
65         other_work(myid);
66         MPI_Wait(&request, &status);   /* block until Isend is done */

File: intro/basics/C++/example1_3.cc

35   mpi::request request;
36   p = world.size();
37   myid = world.rank();
38   h = (b-a)/n/p;    /* length of increment */
39   ai = a + myid*n*h;  /* lower limit of integration for partition myid */
40 
41   my_int = integral(ai, h, n, cos_wrapper);
42 
43   if (myid == master) {
44     result = my_int;
45     for (i=1; i<p; i++) {
46       world.recv(mpi::any_source, tag, myint);
47       result += myint;
48     }
49     std::cout << "The result = "
50 	      << std::setiosflags(std::ios::fixed)
51 	      << std::setprecision(4) << result << std::endl;
52   } else {
53     request = world.isend(0, tag, my_int);
54     other_work(myid);
55     wait(requst);

File: intro/basics/python/example1_3.py

21     if myid == master:
22         integral_sum = my_integral
23         for i in range(1, npartitions):
24             integral_sum += comm.recv(source=MPI.ANY_SOURCE,
25                                       tag=MPI.ANY_TAG)
26         print "The integral sum =", integral_sum
27     else:
28         request = comm.isend(my_integral, dest=master, tag=tag)
29         other_work(myid)
30         request.Wait()
31 
32 
33 def other_work(myid):
34     print "more work on process", myid

Practice 3: Integrate with Checking

Modify example1_3 so that

  1. each process also sends it's id.
  2. Make an array to keep track of each process's status.
  3. Have master print out the status of each process.

Collective communication

File: intro/basics/F77/example1_4.f

16       implicit none
17       integer n, p, i, j, proc, ierr, master, myid, tag, comm
18       real h, a, b, integral, pi, ai, my_int, integral_sum, buf(50)
19       include "mpif.h"  ! brings in pre-defined MPI constants, ...
20       data master/0/    ! processor 0 collects integral sums from other processors
21 
22       comm = MPI_COMM_WORLD      
23       call MPI_Init(ierr)                        ! starts MPI
24       call MPI_Comm_rank(comm, myid, ierr)      ! get current proc ID
25       call MPI_Comm_size(comm, p, ierr)         ! get number of procs
26 
27       pi = acos(-1.0)   !  = 3.14159...
28       a = 0.0           ! lower limit of integration
29       b = pi*1./2.      ! upper limit of integration
30       n = 500           ! number of intervals in (b-a)/p
31       h = (b-a)/n/p     ! length of increment
32 
33       ai = a + myid*n*h ! lower limit of integration for partition myid
34       my_int = integral(ai, h, n) 
35 
36       write(*,"('Process ',i2,' has the partial sum of',f10.6)")
37      &          myid,my_int
38 
39       call MPI_Gather( 
40      &     my_int, 1, MPI_REAL, 
41      &     buf, 1, MPI_REAL, 
42      &     master, 
43      &     comm, ierr)
44 
45       if(myid .eq. master) then
46         integral_sum = 0.0
47         do i=1,p
48           integral_sum = integral_sum + buf(i)
49         enddo
50         print *,'The Integral =',integral_sum
51       endif

File: intro/basics/C/example1_4.c

34       pi = acos(-1.0);  /* = 3.14159... */
35       a = 0.;           /* lower limit of integration */
36       b = pi*1./2.;     /* upper limit of integration */
37       n = 500;          /* number of increment within each process */
38       h = (b-a)/n/p;    /* length of increment */
39 
40       ai = a + myid*n*h;  /* lower limit of integration for partition myid */
41       my_int = integral(ai, h, n);
42       printf("Process %d has the partial result of %f\n", myid, my_int);
43       
44       MPI_Gather( 
45             &my_int, 1, MPI_FLOAT, 
46             buf, 1, MPI_FLOAT, 
47             master, comm); 
48 
49       if(myid == master) {
50         integral_sum = 0.0;
51         for (i=0; i<p; i++) {
52 	  printf("On Master, Process %d has the partial result of %f\n",
53 		 i, buf[i]);
54       
55           integral_sum += buf[i];
56         }
57         printf("The Integral =%f\n",integral_sum);
58       }

File: intro/basics/C++/example1_4.cc

28   int myid;
29   float my_int;
30   float all_ints[50];
31   mpi::environment env;
32   mpi::communicator world;
33   p = world.size();
34   myid = world.rank();
35   
36   h = (b-a)/n/p;    /* length of increment */
37   ai = a + myid*n*h;  /* lower limit of integration for partition myid */
38 
39   my_int = integral(ai, h, n, cos_wrapper);
40   gather(world, my_int, all_ints, master);
41 
42   if (myid == master) {
43     result = 0;
44     for (i=0; i<p; i++) {
45       result += all_ints[i];
46     }
47     std::cout << "The result = "
48 	      << std::setiosflags(std::ios::fixed)
49 	      << std::setprecision(4) << result << std::endl;
50   } 
51 }

File: intro/basics/python/example1_4.py

 4 def main():
 5     master = 0
 6     comm = MPI.COMM_WORLD
 7     myid = comm.Get_rank()
 8     npartitions = comm.Get_size()
 9 
10     lower_limit = 0.0
11     upper_limit = pi * 0.5  
12     tag = 123
13     partition_nsteps = 500
14     step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
15 
16     start = lower_limit + myid * partition_nsteps * step
17     my_integral = integral(start, step, partition_nsteps)
18     print "Process {0:2d} has the partial sum of {1:10.6f}".format(
19         myid, my_integral)
20 
21     integrals = comm.gather(my_integral, root=master)
22 
23     if myid == master:
24         integral_sum = 0
25         for part_integral in integrals:
26             integral_sum += part_integral
27         print "The integral sum =", integral_sum

Collective Computation

File: intro/basics/F77/example1_5.f

16       implicit none
17       integer n, p, i, j, proc, ierr, master, myid, tag, comm
18       real h, a, b, integral, pi, ai, my_int, integral_sum
19       real tick, tock
20       include "mpif.h"  ! brings in pre-defined MPI constants, ...
21       integer status(MPI_STATUS_SIZE)  ! size defined in mpif.h
22       data master/0/    ! processor 0 collects integral sums from other processors
23 
24       comm = MPI_COMM_WORLD      
25       call MPI_Init(ierr)                        ! starts MPI
26       call MPI_Comm_rank(comm, myid, ierr)      ! get current proc ID
27       call MPI_Comm_size(comm, p, ierr)         ! get number of procs
28 
29       pi = acos(-1.0)   !  = 3.14159...
30       a = 0.0           ! lower limit of integration
31       b = pi*1./2.      ! upper limit of integration
32       tag = 123         ! set the tag to identify this particular job
33 
34       if(myid .eq. master) then
35         print *,'The requested number of processors =',p
36         print *,'enter number of increments within each process'
37         read(*,*)n
38       endif
39 c**   Broadcast "n" to all processes
40       tick = MPI_Wtime()
41       call MPI_Bcast(         ! Broadcast "n" to all procs
42      &    n, 1, MPI_INTEGER, 
43      &     master, comm, ierr)
44       tock = MPI_Wtime()
45       print *, 'The time to broadcast on', p, 'is', tock - tick
46 
47       h = (b-a)/n/p     ! length of increment
48       ai = a + myid*n*h ! lower limit of integration for partition myid
49       my_int = integral(ai, h, n) 
50 
51       write(*,"('Process ',i2,' has the partial sum of',f10.6)")
52      &          myid,my_int
53 
54       call MPI_Reduce(              ! a collective reduction operation
55      &      my_int, integral_sum, 1, MPI_REAL,  
56      &      MPI_SUM,  
57      &      master,  
58      &      comm, ierr)
59 
60       if(myid .eq. master) then
61         print *,'The Integral =',integral_sum

File: intro/basics/C/example1_5.c

36   /* get n on master */
37   if(myid == master) {
38     printf("The requested number of processors is %d\n",p);
39     printf("Enter number of increments within each process\n");
40     (void) fgets(line, sizeof(line), stdin);
41     (void) sscanf(line, "%d", &n);
42   }
43   /* Broadcast "n" to all processes */
44   MPI_Bcast(&n, 1, MPI_INT, master, comm);
45 
46   pi = acos(-1.0);  /* = 3.14159... */
47   a = 0.;           /* lower limit of integration */
48   b = pi*1./2.;     /* upper limit of integration */
49   h = (b-a)/n/p;    /* length of increment */
50 
51   ai = a + myid*n*h;  /* lower limit of integration for partition myid */
52   my_int = integral(ai, h, n);   /* 0<=myid<=p-1 */
53 
54   printf("Process %d has the partial result of %f\n", myid,my_int);
55   
56   MPI_Reduce(
57 	     &my_int, &integral_sum, 1, MPI_FLOAT, 
58 	     MPI_SUM, 
59 	     master, comm);
60 
61   if(myid == 0) {
62     printf("The result =%f\n",integral_sum);
63   }
64   MPI_Finalize();                       /* let MPI finish up ... */

File: intro/basics/C++/example1_5.cc

31   /* get n on master */
32   if(myid == master) {
33     std::cout << "The requested number of processors is "
34 	      << p << std::endl;
35     std::cout << "Enter number of increments within each process: ";
36     std::cin >> n;
37   }
38   /* Broadcast "n" to all processes */
39   broadcast(world, n, 0);
40 
41   pi = acos(-1.0);  /* = 3.14159... */
42   a = 0.;           /* lower limit of integration */
43   b = pi*1./2.;     /* upper limit of integration */
44   h = (b-a)/n/p;    /* length of increment */
45   ai = a + myid*n*h;  /* lower limit of integration for partition myid */
46 
47   my_int = integral(ai, h, n, cos_wrapper);
48   reduce(world, my_int, result, std::plus<float>(), 0);
49 
50   if (myid == master) {
51     std::cout << "The result = "
52 	      << std::setiosflags(std::ios::fixed)
53 	      << std::setprecision(8) << result << std::endl;
54   } 

File: intro/basics/python/example1_5.py

 4 def main():
 5     master = 0
 6     comm = MPI.COMM_WORLD
 7     myid = comm.Get_rank()
 8     npartitions = comm.Get_size()
 9 
10     partition_nsteps = 500
11     if myid == master:
12         print "The requested number of processors =", npartitions
13         # stdout is not flushed until new line, so to see message need
14         # extra \n character
15         partition_nsteps = int(raw_input(
16             'Enter the number of increaments within each process\n'))
17 
18     partition_nsteps = comm.bcast(partition_nsteps, root=master)
19 
20     lower_limit = 0.0
21     upper_limit = pi * 0.5  
22     tag = 123
23     step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
24 
25     start = lower_limit + myid * partition_nsteps * step
26     my_integral = integral(start, step, partition_nsteps)
27     print "Process {0:2d} has the partial sum of {1:10.6f}".format(
28         myid, my_integral)
29 
30     integral_sum = comm.reduce(my_integral, op=MPI.SUM, root=master)
31 
32     if myid == master:
33         print "The integral sum =", integral_sum

Speedup Ratio and Parallel Efficiency

Speed up $S$ is the ratio $T_1$, the time for one process, to $T_N$, the time for $N$ processes, and when there is $f$ fraction of the code that cannot be parallelized, then

$ S = \frac{T_1}{T_N} < \frac{T_1}{(f + \frac{1 - f}{N})T_1} < \frac{1}{f}$

as

$ N \rightarrow \infty $

So if 90% of the code is parallelizable, then at best you can get 10x speed up.

Parallel Efficiency is $\frac{S}{N}$, and is ideally 1.

How Reduce Works

+---------+  +----------+  +----------+      +-----------+
|    P0   |  |   P1     |  |     P2   |      |     P3    |
|    0    |  |   1      |  |     2    |      |     3     |
+---------+  +--+-------+  +---------++      +-----------+
          |     |                    |       |            
          |     |                    |       |            
         +v-----v+                  +v-------v+           
         |  P0   |                  |   P2    |           
         |  1    |                  |   5     |           
         |       |                  |         |           
         +-------+--------+  +------+---------+           
                          |  |                            
                          |  |                            
                          |  |                            
                      +---v--v---+                        
                      |   P0     |                        
                      |   6      |                        
                      |          |                        
                      +----------+                       

Each implementation has it's own method for collective communication. It's not always the fastest. Collective communications traditionally are blocking, though now in MPI 3 there are non-blocking versions.

Regardless, the main point is: collective communication is expensive, use it sparingly.

Collective Comminations Summary

Process 0 Process 1* Process 2 Process 3 Operation Process 0 Process 1* Process 2 Process 3
b MPI_Bcast b b b b
a b c d MPI_Gather a,b,c,d
a b c d MPI_Allgather a,b,c,d a,b,c,d a,b,c,d a,b,c,d
a,b,c,d MPI_Scatter a b c d
a,b,c,d e,f,g,h i,j,k,l m,n,o,p MPI_Alltoall a,e,i,m b,f,j,n c,g,k,o d,h,l,p
SendBuff SendBuff SendBuff SendBuff RecvBuff RecvBuff RecvBuff RecvBuff

Timing with Wtime

MPI_Wtime() returns a precise value of the time (in seconds) on a particular node.

Call it before and after a block of code and use the differences to get timing information for a particular worker.

MPI_Wtick() returns the resolution of the timer.

MPI_Probe and MPI_Get_count

MPI_Probe - Checks to see if a message is comming. If used with MPI_ANY_SOURCE, then can identify source from status.

MPI_Get_count - Returns the number of elements comming in message. (useful for dynamic allocation).

Custom Groups and Communicators

Sometimes you want to break your problem into multiple domains (think a 2D grid partitioned vertically and horizontally).

00000 11111
00000 11111
00000 11111
00000 11111
00000 11111

22222 33333
22222 33333
22222 33333
22222 33333
22222 33333

Then you can make new groups for vertical communication ((0, 2) and (1, 3)) and others for horizontal communication ((0, 1) and (2, 3)). Then for each group you'd make a corresponding communicator. Within these communicators, these processors will get communicator-specific id's of 0 and 1 respectively.

From there you can organize your code to be communicator independant. A Communicator will be an argument, and from that you'll get the number of processes working on that task and what ID the current process is within that communicator's group.

From there you can build more complex geometries that may or may not be reflected in the networking (the process with the next id might be physically near you in the cluster or on the other side of the system).

The details of this are beyond this class, but here's a decent reference.

Debugging

Use totalview.

It makes life easy.

module load totalview
mpirun -np 4 -tv your_app_compiled_for_debugging

Derived Data Types

This is an advnanced topic, but if you want to send data of compound data types (structs) or if you want to send contiguous or non-contiguous data of the same type, then you can define your own MPI types.

For more information, see: http://static.msi.umn.edu/tutorial/scicomp/general/MPI/mpi_data.html

Reference:

A cheet sheet

Practice Problem

Matrix Multiplication

  1. Make random A and B on Master
  2. Broadcast A (B on C, Python) to all n processors
  3. Break up B (A on C, Python) in to n chunks, sending one chunk to each processor
  4. Start a clock
  5. Do local multiplication
  6. Calculate how long the local calculation took
  7. send local results to master
  8. send local timing to master
  9. report on timing

Next step

Communicate only the relevant parts of A to all processes.

Bonus Problem

Repeat Practice Problem, but

  1. do only vector multiplication row A by column B
  2. after each multiplication, send the result asynchronously (send it and move on to next column).
  3. before you send next column, make sure last communication succeeded.
  4. on the master node, your receiving instead of sending each column

Bonus Bonus Problems

Checkout LLNL's practice problems for different ways to cut up numerical methods so they can be parallelized.