Both processes and threads are independent sequences of execution. The typical difference is that threads (of the same process) run in a shared memory space, while processes run in separate memory spaces.
MPI runs many programs (processes) and easily configures communications amongst them.
This program starts all the programs that should run. Typically you provide it your program and it runs it n times.
Here I show how mpirun just runs other programs for you.
! echo '-ls-'; ls -lart; echo
! echo '-date-'; date; echo
! echo '-mpirun-'; mpirun -n 1 ls -lart : -n 1 date; echo
-ls- total 1312 drwxr-sr-x 15 yannpaul scv 32768 May 15 11:14 .. drwxr-sr-x 2 yannpaul scv 512 May 15 12:57 .ipynb_checkpoints drwxr-sr-x 5 yannpaul scv 512 May 20 12:35 RISE drwxr-sr-x 5 yannpaul scv 512 May 29 15:30 intro -rw-r--r-- 1 yannpaul scv 59 May 29 15:31 .gitignore -rw-r--r-- 1 yannpaul scv 229 Jun 1 10:51 Makefile -rw-r--r-- 1 yannpaul scv 334420 Jun 1 10:52 slides.html -rw-r--r-- 1 yannpaul scv 1049 Jun 1 10:57 conda_requirements.txt -rw-r--r-- 1 yannpaul scv 7 Jun 1 10:58 requirements.txt -rw-r--r-- 1 yannpaul scv 661 Jun 1 11:33 README drwxr-sr-x 2 yannpaul scv 512 Jun 1 11:53 legacy drwxr-sr-x 8 yannpaul scv 32768 Jun 3 16:53 .git drwxr-sr-x 2 yannpaul scv 512 Sep 18 11:57 __pycache__ drwxr-sr-x 14 yannpaul scv 32768 Sep 18 12:07 present_env -rw-r--r-- 1 yannpaul scv 2809 Sep 18 15:13 examples.py -rw-r--r-- 1 yannpaul scv 2481 Sep 18 15:13 examples.pyc -rw-r--r-- 1 yannpaul scv 328 Sep 18 15:32 #html_template.tpl# -rw-r--r-- 1 yannpaul scv 6145 Sep 18 16:10 Untitled.ipynb -rw-r--r-- 1 yannpaul scv 448 Sep 21 09:52 example.tpl -rw-r--r-- 1 yannpaul scv 394862 Sep 21 09:53 introduction_to_mpi.html -rw-r--r-- 1 yannpaul scv 217826 Sep 21 09:54 introduction_to_mpi.ipynb drwxr-sr-x 9 yannpaul scv 32768 Sep 21 09:54 . -date- Mon Sep 21 09:56:37 EDT 2015 -mpirun- Mon Sep 21 09:56:37 EDT 2015 total 1312 drwxr-sr-x 15 yannpaul scv 32768 May 15 11:14 .. drwxr-sr-x 2 yannpaul scv 512 May 15 12:57 .ipynb_checkpoints drwxr-sr-x 5 yannpaul scv 512 May 20 12:35 RISE drwxr-sr-x 5 yannpaul scv 512 May 29 15:30 intro -rw-r--r-- 1 yannpaul scv 59 May 29 15:31 .gitignore -rw-r--r-- 1 yannpaul scv 229 Jun 1 10:51 Makefile -rw-r--r-- 1 yannpaul scv 334420 Jun 1 10:52 slides.html -rw-r--r-- 1 yannpaul scv 1049 Jun 1 10:57 conda_requirements.txt -rw-r--r-- 1 yannpaul scv 7 Jun 1 10:58 requirements.txt -rw-r--r-- 1 yannpaul scv 661 Jun 1 11:33 README drwxr-sr-x 2 yannpaul scv 512 Jun 1 11:53 legacy drwxr-sr-x 8 yannpaul scv 32768 Jun 3 16:53 .git drwxr-sr-x 2 yannpaul scv 512 Sep 18 11:57 __pycache__ drwxr-sr-x 14 yannpaul scv 32768 Sep 18 12:07 present_env -rw-r--r-- 1 yannpaul scv 2809 Sep 18 15:13 examples.py -rw-r--r-- 1 yannpaul scv 2481 Sep 18 15:13 examples.pyc -rw-r--r-- 1 yannpaul scv 328 Sep 18 15:32 #html_template.tpl# -rw-r--r-- 1 yannpaul scv 6145 Sep 18 16:10 Untitled.ipynb -rw-r--r-- 1 yannpaul scv 448 Sep 21 09:52 example.tpl -rw-r--r-- 1 yannpaul scv 394862 Sep 21 09:53 introduction_to_mpi.html -rw-r--r-- 1 yannpaul scv 217826 Sep 21 09:54 introduction_to_mpi.ipynb drwxr-sr-x 9 yannpaul scv 32768 Sep 21 09:54 .
This is a silly example, because the running programs don't take care of mpirun
's communication protocols. We'll learn today how to interact with mpirun
to get information about the other processes. When using mpirun
you don't have to worry about where your processes are exectued, and mpirun
puts them on multiple machines depending on how they are configured.
You incorporate the MPI library in your code, allowing it to interact with mpirun
and other instances of your application.
Basically the same answers to Why parallelize my code?
Specific to MPI:
One specification, many implementations:
C:
#include "mpi.h"
C++:
#include "boost/mpi/environment.hpp"
#include "boost/mpi/communicator.hpp"
namespace mpi = boost::mpi;
Fortran:
include mpif.h
Python
from mpi4py import MPI
MPI_Init
-- called before any other MPI library routine. Initializes communication with mpirun
MPI_Finalize
-- called at the end of MPI part of code, signals to mpirun
that this process is all done.
Typically they are the first and last things called in an application.
note - python takes care of calls to these routines for you when you import the library and when your program ends.
MPI_Init()
Communication between processes involves:
Point-to-point Communication
Collective Communication - several forms of organized, group communications
C Type | MPI Name |
---|---|
int | MPI_INT |
unsigned int | MPI_UNSIGNED |
float | MPI_FLOAT |
double | MPI_DOUBLE |
char | MPI_CHAR |
Fortran Type | MPI Name |
---|---|
INTEGER | MPI_INTEGER |
REAL | MPI_READ |
DOUBLE | MPI_DOUBLE_PRECISION |
COMPLEX | MPI_COMPLEX |
CHARACTER | MPI_CHARACTER |
python inspects your data and handles this information for you.
MPI_PACKED, MPI_BYTE -- fine-grained control of data transfer
user defined types, i.e. structures.
How do we perform this integral in parallel
$\int\limits_a^b cos(x) dx$
Well we make it the sum of the integrals of part of the space (p domains):
$\sum\limits_{i=0}^{p-1}\int\limits_{a_i}^{a_{i+1}} cos(x) dx$
Each integral can be solved numerically using the midpoint rule (n points per domain):
$\sum\limits_{i=0}^{p-1}\sum\limits_{j=0}^{n-1} cos(a_{ij}) * h$
Where $h$ is the step size and $a_{ij}$ is the position:
$h = (a - b) / n / p$
$a_{ij} = a + i * n * h + (j + 0.5) * h$
File: intro/basics/F77/example1.f
13 implicit none
14 integer n, p, i, j
15 real h, integral_sum, a, b, integral, pi, ai
16
17 pi = acos(-1.0) ! = 3.14159...
18 a = 0.0 ! lower limit of integration
19 b = pi*1./2. ! upper limit of integration
20 p = 4 ! number of processes (partitions)
21 n = 500 ! number of increment within each process
22 h = (b-a)/n/p ! length of increment
23
24 integral_sum = 0.0 ! stores answer to the integral
25 do i=0,p-1 ! sum of integrals over all processes
26 ai = a + i*n*h ! lower limit of integration for partition i
27 integral_sum = integral_sum + integral(ai,h,n)
28 enddo
29
30 print *,'The integral_sum =', integral_sum
31
32 end
33
34 real function integral(ai, h, n)
35 implicit none
36 integer n, j
37 real h, ai, aij
38
39 integral = 0.0 ! initialize integral
40 do j=0,n-1 ! sum integrals
41 aij = ai +(j+0.5)*h ! abscissa mid-point
42 integral = integral + cos(aij)*h
43 enddo
44
45 return
File: intro/basics/C/example1.c
20 int n, p, i, j, ierr;
21 float h, integral_sum, a, b, pi, my_int, ai;
22
23 pi = acos(-1.0); /* = 3.14159... */
24 a = 0.; /* lower limit of integration */
25 b = pi*1./2.; /* upper limit of integration */
26 p = 4; /* number of processes (partitions) */
27 n = 500; /* number of increment within each process */
28 h = (b-a)/n/p; /* length of increment */
29
30 integral_sum = 0.0;
31
32 /* sum of integrals over all processes */
33 for (i=0; i<p; i++) {
34 ai = a + i*n*h; /* lower limit of integration for partition i */
35 integral_sum += integral(ai,h,n);
36 }
37
38 printf("The integral sum =%f\n",integral_sum);
39 }
40
41 float integral(float ai, float h, int n) {
42 int j;
43 float aij, integ;
44
45 integ = 0.0; /* initialize */
46 for (j=0;j<n;j++) { /* sum integrals */
47 aij = ai + (j+0.5)*h; /* mid-point */
48 integ += cos(aij)*h;
49 }
50 return integ;
File: intro/basics/C++/example1.cc
16 int n, p, i;
17 float h, result, a, b, pi, ai;
18
19 pi = acos(-1.0); /* = 3.14159... */
20 a = 0.; /* lower limit of integration */
21 b = pi*1./2.; /* upper limit of integration */
22 p = 4;
23 n = 500; /* number of increment within each process */
24 h = (b-a)/n/p; /* length of increment */
25
26 result = 0.0;
27 for (i=0; i<p; i++) {
28 ai = a + i*n*h; /* lower limit of integration for partition myid */
29 result += integral(ai, h, n, cos_wrapper);
30 }
31
32 std::cout << "The result = "
33 << std::setiosflags(std::ios::fixed)
34 << std::setprecision(4) << result << std::endl;
35 }
File: intro/basics/python/example1.py
1 from math import cos, pi
2
3 def main():
4 lower_limit = 0.0
5 upper_limit = pi * 0.5
6 npartitions = 4
7 partition_nsteps = 500
8 step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
9
10 integral_sum = 0.0
11 for i in range(npartitions):
12 start = lower_limit + i * partition_nsteps * step
13 integral_sum += integral(start, step, partition_nsteps)
14
15 print "The integral sum =", integral_sum
16
17
18 def integral(start, step, nsteps):
19 total = 0
20 for i in range(nsteps):
21 pos = start + (i + 0.5) * step
22 total += cos(pos) * step
23 return total
24
25 if __name__ == "__main__":
This is an example of Single Program Multiple Data parallelization. The single program is the inegral
function. The multiple data are the limits of each partition.
To parallelize this appliation we'll use:
MPI_Init
, MPI_Finalize
-- start things up and then stop them
MPI_Comm_rank
-- what is the ID (called rank) of the current process
MPI_Comm_size
-- how many processes are running
MPI_Send
, MPI_Recv
-- send and recieve a buffer of information
MPI uses communicators to organize who is getting what. Each communicator is associated with a Group
of processes.
MPI_COMM_WORLD
is a communicator (id or object depending on language) that represents all the processes. It's there by default.
Later we'll discuss making your own Group
's and their corresponding Communicator
's
File: intro/basics/F77/example1_1.f
15 implicit none
16 integer n, p, i, j, proc, ierr, master, myid, tag, comm
17 real h, a, b, integral, pi, ai, my_int, integral_sum
18 include "mpif.h" ! brings in pre-defined MPI constants, ...
19 integer status(MPI_STATUS_SIZE) ! size defined in mpif.h
20 data master/0/ ! processor 0 collects integral sums from other processors
21
22 comm = MPI_COMM_WORLD
23 call MPI_Init(ierr) ! starts MPI
24 call MPI_Comm_rank(comm, myid, ierr) ! get current proc ID
25 call MPI_Comm_size(comm, p, ierr) ! get number of procs
File: intro/basics/C/example1_1.c
1 #include <mpi.h>
2 #include <math.h>
3 #include <stdio.h>
4
5 float integral(float ai, float h, int n);
6
7 void main(int argc, char* argv[]){
8 /*###########################################################################
9 #
10 # This is an MPI example on parallel integration to demonstrate the use of:
11 #
12 # * MPI_Init, MPI_Comm_rank, MPI_Comm_size, MPI_Finalize
13 # * MPI_Recv, MPI_Send
14 #
15 # Dr. Kadin Tseng
16 # Scientific Computing and Visualization
17 # Boston University
18 # 1998
19 #
20 # Updated 2015 by Yann Tambouret
21 #
22 ###########################################################################*/
23
24 int n, p, myid, tag, proc, ierr;
25 float h, integral_sum, a, b, ai, pi, my_int;
26 int master = 0; /* processor performing total sum */
27 MPI_Comm comm;
28 MPI_Status status;
29
30 comm = MPI_COMM_WORLD;
31 ierr = MPI_Init(&argc,&argv); /* starts MPI */
32 MPI_Comm_rank(comm, &myid); /* get current process id */
33 MPI_Comm_size(comm, &p); /* get number of processes */
File: intro/basics/C++/example1_1.cc
3 #include "boost/mpi/environment.hpp"
4 #include "boost/mpi/communicator.hpp"
5 #include <iostream>
6 #include <iomanip>
7 #include "integral.h"
8 #include <math.h>
9
10 namespace mpi = boost::mpi;
11
12 float cos_wrapper(float pos) {
13 // wrap cos becuase it's double precission
14 return cos(pos);
15 }
16
17 int main() {
18
19 int n, p, i;
20 float h, result, a, b, pi, ai, myint;
21
22 pi = acos(-1.0); /* = 3.14159... */
23 a = 0.; /* lower limit of integration */
24 b = pi*1./2.; /* upper limit of integration */
25 n = 500; /* number of increment within each process */
26
27 int myid;
28 float my_int;
29 int tag=123, master=0;
30 mpi::environment env;
31 mpi::communicator world;
32 p = world.size();
33 myid = world.rank();
File: intro/basics/python/example1_1.py
1 from mpi4py import MPI
2 from math import cos, pi
3
4 def main():
5 master = 0
6 comm = MPI.COMM_WORLD
7 myid = comm.Get_rank()
8 npartitions = comm.Get_size()
File: intro/basics/F77/example1_1.f
27 pi = acos(-1.0) ! = 3.14159...
28 a = 0.0 ! lower limit of integration
29 b = pi/2. ! upper limit of integration
30 n = 500 ! number of increment within each process
31 tag = 123 ! set the tag to identify this particular job
32 h = (b-a)/n/p ! length of increment
33
34 ai = a + myid*n*h ! lower limit of integration for partition myid
35 my_int = integral(ai, h, n)
36 write(*,"('Process ',i2,' has the partial sum of',f10.6)")
File: intro/basics/C/example1_1.c
35 pi = acos(-1.0); /* = 3.14159... */
36 a = 0.; /* lower limit of integration */
37 b = pi*1./2.; /* upper limit of integration */
38 n = 500; /* number of increment within each process */
39 tag = 123; /* set the tag to identify this particular job */
40 h = (b-a)/n/p; /* length of increment */
41
42 ai = a + myid*n*h; /* lower limit of integration for partition myid */
43 my_int = integral(ai, h, n); /* 0<=myid<=p-1 */
File: intro/basics/C++/example1_1.cc
37 my_int = integral(ai, h, n, cos_wrapper);
File: intro/basics/python/example1_1.py
10 lower_limit = 0.0
11 upper_limit = pi * 0.5
12 tag = 123
13 partition_nsteps = 500
14 step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
15
16 start = lower_limit + myid * partition_nsteps * step
17 my_integral = integral(start, step, partition_nsteps)
18 print "Process {0:2d} has the partial sum of {1:10.6f}".format(
19 myid, my_integral)
File: intro/basics/F77/example1_1.f
39 call MPI_Send(
40 & my_int, 1, MPI_REAL, ! buffer, size, datatype
41 & master, ! where to send message
42 & tag, ! message tag
File: intro/basics/C/example1_1.c
47 MPI_Send(
48 &my_int, 1, MPI_FLOAT,
49 master, /* message destination */
50 tag, /* message tag */
51 comm);
File: intro/basics/C++/example1_1.cc
39 world.send(0, tag, my_int);
File: intro/basics/python/example1_1.py
21 comm.send(my_integral, dest=master, tag=tag)
File: intro/basics/F77/example1_1.f
45 if(myid .eq. master) then ! do following only on master ...
46 integral_sum = 0.0 ! initialize integral_sum to zero
47 do proc=0,p-1 ! loop on processors to collect local sum
48 call MPI_Recv(
49 & my_int, 1, MPI_REAL,
50 & proc, ! message source
51 & tag, ! message tag
52 & comm, status, ierr) ! status reports source, tag
53 integral_sum = integral_sum + my_int ! sum my_int from processors
54 enddo
55 print *,'The integral =',integral_sum
56 endif
File: intro/basics/C/example1_1.c
53 if(myid == master) { /* Receives serialized */
54 integral_sum = 0.0;
55 for (proc=0;proc<p;proc++) {
56 MPI_Recv(
57 &my_int, 1, MPI_FLOAT,
58 proc, /* message source */
59 tag, /* message tag */
60 comm, &status); /* status reports source, tag */
61 integral_sum += my_int;
62 }
63 printf("The Integral =%f\n",integral_sum); /* sum of my_int */
64 }
File: intro/basics/C++/example1_1.cc
40 if (myid == master) {
41 result = 0;
42 for (i=0; i<p; i++) {
43 world.recv(i, tag, myint);
44 result += myint;
45 }
46 std::cout << "The result = "
47 << std::setiosflags(std::ios::fixed)
48 << std::setprecision(4) << result << std::endl;
49 }
File: intro/basics/python/example1_1.py
23 if myid == master:
24 integral_sum = 0.0
25 for i in range(npartitions):
26 integral_sum += comm.recv(source=i, tag=tag)
27 print "The integral sum =", integral_sum
master
be?¶Hint: What does master
represent, and then what value can it be?
Hint 1: go through the code as if your ID is 0.
Hint 2: What needs to happen for a MPI_Send
to be successful
You can send messages to yourself. Since, when you send info, a temporary buffer is used under the hood and this is not necissary when sending to yourself, it's traditionally been up to the MPI vendor whether or not to actually use a buffer for self messaging. Historically this has ment that this process is flaky, but now it is part of the MPI standard and should be supported universally.
!mpif77 -o intro/basics/F77/example1_1.x intro/basics/F77/example1_1.f
!mpif77 --show
gfortran -I/usr/local/IT/mpi/openmpi-1.6.4/gcc-4.4/include -pthread -Wl,-rpath -Wl,/usr/local/IT/mpi/openmpi-1.6.4/gcc-4.4/lib -L/usr/local/IT/mpi/openmpi-1.6.4/gcc-4.4/lib -lmpi_f77 -lmpi -ldl -lm -lnuma -Wl,--export-dynamic -lrt -lnsl -lutil -lm -ldl
language | mpi compiler |
---|---|
C | mpicc |
fortran 77 | mpif77 |
fortran 90 | mpif90 |
C++ | mpicxx |
mpic++ | |
mpiCC |
mpirun
-np 4
-- replace '4' with the number of processes you want.
MPI will take care of where they run. There's an automatic way of placing them on different nodes and there's a ton of options for controlling where they end up.
intro/basics/F77/example1_1.x
-- your application
!mpirun -np 4 intro/basics/F77/example1_1.x
Process 0 has the partial sum of 0.382684 Process 1 has the partial sum of 0.324423 Process 2 has the partial sum of 0.216773 Process 3 has the partial sum of 0.076120 The integral = 1.0000001
Python:
mpirun -np 4 python intro/basics/python/example1_1.py
The standard python on SCC is missing mpi4py, so you need to load a module to access this library, for example:
module load anacoda/2.2.0
C++:
You need the properly compiled boost library, which on SCC is:
module load boost/1.58.0
Use MPI_Comm_Rank MPI_Comm_Size to figure out processor id and number of processors, then print out hello message:
Hello world from 0 of 4
Hello world from 1 of 4
Hello world from 2 of 4
Hello world from 3 of 4
You will need:
MPI_Comm_Rank
to get id number of current processorsMPI_Comm_Size
to get number of processorsFile: intro/basics/F77/example1_2.f
15 implicit none
16 integer n, p, i, j, proc, ierr, master, myid, tag, comm
17 real h, a, b, integral, pi, ai, my_int, integral_sum
18 include "mpif.h" ! brings in pre-defined MPI constants, ...
19 integer status(MPI_STATUS_SIZE) ! size defined in mpif.h
20 data master/0/ ! processor 0 collects integral sums from other processors
21
22 comm = MPI_COMM_WORLD
23 call MPI_Init(ierr) ! starts MPI
24 call MPI_Comm_rank(comm, myid, ierr) ! get current proc ID
25 call MPI_Comm_size(comm, p, ierr) ! get number of procs
26
27 pi = acos(-1.0) ! = 3.14159...
28 a = 0.0 ! lower limit of integration
29 b = pi/2. ! upper limit of integration
30 n = 500 ! number of increment within each process
31 tag = 123 ! set the tag to identify this particular job
32 h = (b-a)/n/p ! length of increment
33
34 ai = a + myid*n*h ! lower limit of integration for partition myid
35 my_int = integral(ai, h, n)
36 write(*,"('Process ',i2,' has the partial sum of',f10.6)")
37 & myid,my_int
38
39 if(myid .eq. master) then ! do following only on master ...
40 integral_sum = my_int !starts with value on master
41 do proc=1,p-1 ! loop on procs to collect local sums, exclude master
42 call MPI_Recv(
43 & my_int, 1, MPI_REAL,
44 & proc, ! message source
45 & tag, ! message tag
46 & comm, status, ierr) ! status reports source, tag
47 integral_sum = integral_sum + my_int ! sum my_int from processors
48 enddo
49 print *,'The integral =',integral_sum
50 else
51 call MPI_Send(
52 & my_int, 1, MPI_REAL, ! buffer, size, datatype
53 & master, ! where to send message
54 & tag, ! message tag
55 & comm, ierr)
56 endif
57
58 call MPI_Finalize(ierr) ! MPI finish up ...
File: intro/basics/C/example1_2.c
30 comm = MPI_COMM_WORLD;
31 ierr = MPI_Init(&argc,&argv); /* starts MPI */
32 MPI_Comm_rank(comm, &myid); /* get current process id */
33 MPI_Comm_size(comm, &p); /* get number of processes */
34
35 pi = acos(-1.0); /* = 3.14159... */
36 a = 0.; /* lower limit of integration */
37 b = pi*1./2.; /* upper limit of integration */
38 n = 500; /* number of increment within each process */
39 tag = 123; /* set the tag to identify this particular job */
40 h = (b-a)/n/p; /* length of increment */
41
42 ai = a + myid*n*h; /* lower limit of integration for partition myid */
43 my_int = integral(ai, h, n); /* 0<=myid<=p-1 */
44
45 printf("Process %d has the partial integral of %f\n", myid,my_int);
46
47 if(myid == master) { /* Receives serialized */
48 integral_sum = my_int;
49 for (proc=1;proc<p;proc++) {
50 MPI_Recv(&my_int, 1, MPI_FLOAT,
51 proc, /* message source */
52 tag, /* message tag */
53 comm,
54 &status); /* status reports source, tag */
55 integral_sum += my_int;
56 }
57 printf("The Integral =%f\n",integral_sum); /* sum of my_int */
58 } else {
59 MPI_Send(&my_int, 1, MPI_FLOAT,
60 master, /* message destination */
61 tag, /* message tag */
62 comm);
63 }
64
65 MPI_Finalize(); /* let MPI finish up ... */
File: intro/basics/C++/example1_2.cc
22 pi = acos(-1.0); /* = 3.14159... */
23 a = 0.; /* lower limit of integration */
24 b = pi*1./2.; /* upper limit of integration */
25 n = 500; /* number of increment within each process */
26
27 int myid;
28 float my_int;
29 mpi::environment env;
30 mpi::communicator world;
31 p = world.size();
32 myid = world.rank();
33 h = (b-a)/n/p; /* length of increment */
34 ai = a + myid*n*h; /* lower limit of integration for partition myid */
35
36 my_int = integral(ai, h, n, cos_wrapper);
37
38 if (myid == master) {
39 result = my_int;
40 for (i=1; i<p; i++) {
41 world.recv(i, tag, myint);
42 result += myint;
43 }
44 std::cout << "The result = "
45 << std::setiosflags(std::ios::fixed)
46 << std::setprecision(4) << result << std::endl;
47 } else {
48 world.send(0, tag, my_int);
49 }
File: intro/basics/python/example1_2.py
4 def main():
5 master = 0
6 comm = MPI.COMM_WORLD
7 myid = comm.Get_rank()
8 npartitions = comm.Get_size()
9
10 lower_limit = 0.0
11 upper_limit = pi * 0.5
12 tag = 123
13 partition_nsteps = 500
14 step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
15
16 start = lower_limit + myid * partition_nsteps * step
17 my_integral = integral(start, step, partition_nsteps)
18 print "Process {0:2d} has the partial sum of {1:10.6f}".format(
19 myid, my_integral)
20
21 if myid == master:
22 integral_sum = my_integral
23 for i in range(1, npartitions):
24 integral_sum += comm.recv(source=i, tag=tag)
25 print "The integral sum =", integral_sum
26 else:
27 comm.send(my_integral, dest=master, tag=tag)
Hint: what order are the results collected?
Asynchronous or non-blocking communication is possible. Normally when you send or receive, the call/function waits until the operation is complete. You can use alternative methods to return right away so you can continue to work while the data is sent/received behind the seen.
Old Method | Non-blocking Method |
---|---|
MPI_Send | MPI_Isend |
MPI_Recv | MPI_Irecv |
You should be careful not to use the buffers involved in these non-blocking communications while the tasks are on going.
You can use MPI_Probe
to see if there is any communication on going and MPI_Wait
to wait until communication is complete.
With MPI-3, there are now non-blocking collective communcation, a somewhat more advanced topic.
File: intro/basics/F77/example1_3.f
39 if(myid .eq. master) then ! the following is serial
40 integral_sum = my_int
41 do proc=1,p-1
42 call MPI_Recv(
43 & my_int, 1, MPI_REAL,
44 & MPI_ANY_SOURCE, ! message source
45 & MPI_ANY_TAG, ! message tag
46 & comm, status, ierr) ! status identifies source, tag
47 integral_sum = integral_sum + my_int
48 enddo
49 write(*,*)'The Integral =', integral_sum ! sum of my_int
50 else
51 call MPI_Isend(
52 & my_int, 1, MPI_REAL, ! buffer, size, datatype
53 & master, tag, ! destination and tag
54 & comm, request, ierr) ! get handle for MPI_Wait to check status
55 call other_work(myid) ! because of Isend, gets here immediately
56 call MPI_Wait(request, status, ierr) ! block until Isend is done
57 endif
58
59 call MPI_Finalize(ierr) ! let MPI finish up ...
File: intro/basics/C/example1_3.c
50 if(myid == master) {
51 integral_sum = my_int;
52 for (proc=1;proc<p;proc++) {
53 MPI_Recv(
54 &my_int, 1, MPI_FLOAT,
55 MPI_ANY_SOURCE, /* message source */
56 MPI_ANY_TAG, /* message tag */
57 comm, &status); /* status identifies source, tag */
58 integral_sum += my_int;
59 }
60 printf("The Integral =%f\n",integral_sum); /* sum of my_int */
61 }
62 else {
63 MPI_Isend(&my_int, 1, MPI_FLOAT, master, tag,
64 comm, &request); /* send my_int to master */
65 other_work(myid);
66 MPI_Wait(&request, &status); /* block until Isend is done */
File: intro/basics/C++/example1_3.cc
35 mpi::request request;
36 p = world.size();
37 myid = world.rank();
38 h = (b-a)/n/p; /* length of increment */
39 ai = a + myid*n*h; /* lower limit of integration for partition myid */
40
41 my_int = integral(ai, h, n, cos_wrapper);
42
43 if (myid == master) {
44 result = my_int;
45 for (i=1; i<p; i++) {
46 world.recv(mpi::any_source, tag, myint);
47 result += myint;
48 }
49 std::cout << "The result = "
50 << std::setiosflags(std::ios::fixed)
51 << std::setprecision(4) << result << std::endl;
52 } else {
53 request = world.isend(0, tag, my_int);
54 other_work(myid);
55 wait(requst);
File: intro/basics/python/example1_3.py
21 if myid == master:
22 integral_sum = my_integral
23 for i in range(1, npartitions):
24 integral_sum += comm.recv(source=MPI.ANY_SOURCE,
25 tag=MPI.ANY_TAG)
26 print "The integral sum =", integral_sum
27 else:
28 request = comm.isend(my_integral, dest=master, tag=tag)
29 other_work(myid)
30 request.Wait()
31
32
33 def other_work(myid):
34 print "more work on process", myid
Modify example1_3 so that
File: intro/basics/F77/example1_4.f
16 implicit none
17 integer n, p, i, j, proc, ierr, master, myid, tag, comm
18 real h, a, b, integral, pi, ai, my_int, integral_sum, buf(50)
19 include "mpif.h" ! brings in pre-defined MPI constants, ...
20 data master/0/ ! processor 0 collects integral sums from other processors
21
22 comm = MPI_COMM_WORLD
23 call MPI_Init(ierr) ! starts MPI
24 call MPI_Comm_rank(comm, myid, ierr) ! get current proc ID
25 call MPI_Comm_size(comm, p, ierr) ! get number of procs
26
27 pi = acos(-1.0) ! = 3.14159...
28 a = 0.0 ! lower limit of integration
29 b = pi*1./2. ! upper limit of integration
30 n = 500 ! number of intervals in (b-a)/p
31 h = (b-a)/n/p ! length of increment
32
33 ai = a + myid*n*h ! lower limit of integration for partition myid
34 my_int = integral(ai, h, n)
35
36 write(*,"('Process ',i2,' has the partial sum of',f10.6)")
37 & myid,my_int
38
39 call MPI_Gather(
40 & my_int, 1, MPI_REAL,
41 & buf, 1, MPI_REAL,
42 & master,
43 & comm, ierr)
44
45 if(myid .eq. master) then
46 integral_sum = 0.0
47 do i=1,p
48 integral_sum = integral_sum + buf(i)
49 enddo
50 print *,'The Integral =',integral_sum
51 endif
File: intro/basics/C/example1_4.c
34 pi = acos(-1.0); /* = 3.14159... */
35 a = 0.; /* lower limit of integration */
36 b = pi*1./2.; /* upper limit of integration */
37 n = 500; /* number of increment within each process */
38 h = (b-a)/n/p; /* length of increment */
39
40 ai = a + myid*n*h; /* lower limit of integration for partition myid */
41 my_int = integral(ai, h, n);
42 printf("Process %d has the partial result of %f\n", myid, my_int);
43
44 MPI_Gather(
45 &my_int, 1, MPI_FLOAT,
46 buf, 1, MPI_FLOAT,
47 master, comm);
48
49 if(myid == master) {
50 integral_sum = 0.0;
51 for (i=0; i<p; i++) {
52 printf("On Master, Process %d has the partial result of %f\n",
53 i, buf[i]);
54
55 integral_sum += buf[i];
56 }
57 printf("The Integral =%f\n",integral_sum);
58 }
File: intro/basics/C++/example1_4.cc
28 int myid;
29 float my_int;
30 float all_ints[50];
31 mpi::environment env;
32 mpi::communicator world;
33 p = world.size();
34 myid = world.rank();
35
36 h = (b-a)/n/p; /* length of increment */
37 ai = a + myid*n*h; /* lower limit of integration for partition myid */
38
39 my_int = integral(ai, h, n, cos_wrapper);
40 gather(world, my_int, all_ints, master);
41
42 if (myid == master) {
43 result = 0;
44 for (i=0; i<p; i++) {
45 result += all_ints[i];
46 }
47 std::cout << "The result = "
48 << std::setiosflags(std::ios::fixed)
49 << std::setprecision(4) << result << std::endl;
50 }
51 }
File: intro/basics/python/example1_4.py
4 def main():
5 master = 0
6 comm = MPI.COMM_WORLD
7 myid = comm.Get_rank()
8 npartitions = comm.Get_size()
9
10 lower_limit = 0.0
11 upper_limit = pi * 0.5
12 tag = 123
13 partition_nsteps = 500
14 step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
15
16 start = lower_limit + myid * partition_nsteps * step
17 my_integral = integral(start, step, partition_nsteps)
18 print "Process {0:2d} has the partial sum of {1:10.6f}".format(
19 myid, my_integral)
20
21 integrals = comm.gather(my_integral, root=master)
22
23 if myid == master:
24 integral_sum = 0
25 for part_integral in integrals:
26 integral_sum += part_integral
27 print "The integral sum =", integral_sum
File: intro/basics/F77/example1_5.f
16 implicit none
17 integer n, p, i, j, proc, ierr, master, myid, tag, comm
18 real h, a, b, integral, pi, ai, my_int, integral_sum
19 real tick, tock
20 include "mpif.h" ! brings in pre-defined MPI constants, ...
21 integer status(MPI_STATUS_SIZE) ! size defined in mpif.h
22 data master/0/ ! processor 0 collects integral sums from other processors
23
24 comm = MPI_COMM_WORLD
25 call MPI_Init(ierr) ! starts MPI
26 call MPI_Comm_rank(comm, myid, ierr) ! get current proc ID
27 call MPI_Comm_size(comm, p, ierr) ! get number of procs
28
29 pi = acos(-1.0) ! = 3.14159...
30 a = 0.0 ! lower limit of integration
31 b = pi*1./2. ! upper limit of integration
32 tag = 123 ! set the tag to identify this particular job
33
34 if(myid .eq. master) then
35 print *,'The requested number of processors =',p
36 print *,'enter number of increments within each process'
37 read(*,*)n
38 endif
39 c** Broadcast "n" to all processes
40 tick = MPI_Wtime()
41 call MPI_Bcast( ! Broadcast "n" to all procs
42 & n, 1, MPI_INTEGER,
43 & master, comm, ierr)
44 tock = MPI_Wtime()
45 print *, 'The time to broadcast on', p, 'is', tock - tick
46
47 h = (b-a)/n/p ! length of increment
48 ai = a + myid*n*h ! lower limit of integration for partition myid
49 my_int = integral(ai, h, n)
50
51 write(*,"('Process ',i2,' has the partial sum of',f10.6)")
52 & myid,my_int
53
54 call MPI_Reduce( ! a collective reduction operation
55 & my_int, integral_sum, 1, MPI_REAL,
56 & MPI_SUM,
57 & master,
58 & comm, ierr)
59
60 if(myid .eq. master) then
61 print *,'The Integral =',integral_sum
File: intro/basics/C/example1_5.c
36 /* get n on master */
37 if(myid == master) {
38 printf("The requested number of processors is %d\n",p);
39 printf("Enter number of increments within each process\n");
40 (void) fgets(line, sizeof(line), stdin);
41 (void) sscanf(line, "%d", &n);
42 }
43 /* Broadcast "n" to all processes */
44 MPI_Bcast(&n, 1, MPI_INT, master, comm);
45
46 pi = acos(-1.0); /* = 3.14159... */
47 a = 0.; /* lower limit of integration */
48 b = pi*1./2.; /* upper limit of integration */
49 h = (b-a)/n/p; /* length of increment */
50
51 ai = a + myid*n*h; /* lower limit of integration for partition myid */
52 my_int = integral(ai, h, n); /* 0<=myid<=p-1 */
53
54 printf("Process %d has the partial result of %f\n", myid,my_int);
55
56 MPI_Reduce(
57 &my_int, &integral_sum, 1, MPI_FLOAT,
58 MPI_SUM,
59 master, comm);
60
61 if(myid == 0) {
62 printf("The result =%f\n",integral_sum);
63 }
64 MPI_Finalize(); /* let MPI finish up ... */
File: intro/basics/C++/example1_5.cc
31 /* get n on master */
32 if(myid == master) {
33 std::cout << "The requested number of processors is "
34 << p << std::endl;
35 std::cout << "Enter number of increments within each process: ";
36 std::cin >> n;
37 }
38 /* Broadcast "n" to all processes */
39 broadcast(world, n, 0);
40
41 pi = acos(-1.0); /* = 3.14159... */
42 a = 0.; /* lower limit of integration */
43 b = pi*1./2.; /* upper limit of integration */
44 h = (b-a)/n/p; /* length of increment */
45 ai = a + myid*n*h; /* lower limit of integration for partition myid */
46
47 my_int = integral(ai, h, n, cos_wrapper);
48 reduce(world, my_int, result, std::plus<float>(), 0);
49
50 if (myid == master) {
51 std::cout << "The result = "
52 << std::setiosflags(std::ios::fixed)
53 << std::setprecision(8) << result << std::endl;
54 }
File: intro/basics/python/example1_5.py
4 def main():
5 master = 0
6 comm = MPI.COMM_WORLD
7 myid = comm.Get_rank()
8 npartitions = comm.Get_size()
9
10 partition_nsteps = 500
11 if myid == master:
12 print "The requested number of processors =", npartitions
13 # stdout is not flushed until new line, so to see message need
14 # extra \n character
15 partition_nsteps = int(raw_input(
16 'Enter the number of increaments within each process\n'))
17
18 partition_nsteps = comm.bcast(partition_nsteps, root=master)
19
20 lower_limit = 0.0
21 upper_limit = pi * 0.5
22 tag = 123
23 step = (upper_limit - lower_limit) / float(partition_nsteps * npartitions)
24
25 start = lower_limit + myid * partition_nsteps * step
26 my_integral = integral(start, step, partition_nsteps)
27 print "Process {0:2d} has the partial sum of {1:10.6f}".format(
28 myid, my_integral)
29
30 integral_sum = comm.reduce(my_integral, op=MPI.SUM, root=master)
31
32 if myid == master:
33 print "The integral sum =", integral_sum
Speed up $S$ is the ratio $T_1$, the time for one process, to $T_N$, the time for $N$ processes, and when there is $f$ fraction of the code that cannot be parallelized, then
$ S = \frac{T_1}{T_N} < \frac{T_1}{(f + \frac{1 - f}{N})T_1} < \frac{1}{f}$
as
$ N \rightarrow \infty $
So if 90% of the code is parallelizable, then at best you can get 10x speed up.
Parallel Efficiency is $\frac{S}{N}$, and is ideally 1.
+---------+ +----------+ +----------+ +-----------+
| P0 | | P1 | | P2 | | P3 |
| 0 | | 1 | | 2 | | 3 |
+---------+ +--+-------+ +---------++ +-----------+
| | | |
| | | |
+v-----v+ +v-------v+
| P0 | | P2 |
| 1 | | 5 |
| | | |
+-------+--------+ +------+---------+
| |
| |
| |
+---v--v---+
| P0 |
| 6 |
| |
+----------+
Each implementation has it's own method for collective communication. It's not always the fastest. Collective communications traditionally are blocking, though now in MPI 3 there are non-blocking versions.
Regardless, the main point is: collective communication is expensive, use it sparingly.
Process 0 | Process 1* | Process 2 | Process 3 | Operation | Process 0 | Process 1* | Process 2 | Process 3 |
---|---|---|---|---|---|---|---|---|
b | MPI_Bcast | b | b | b | b | |||
a | b | c | d | MPI_Gather | a,b,c,d | |||
a | b | c | d | MPI_Allgather | a,b,c,d | a,b,c,d | a,b,c,d | a,b,c,d |
a,b,c,d | MPI_Scatter | a | b | c | d | |||
a,b,c,d | e,f,g,h | i,j,k,l | m,n,o,p | MPI_Alltoall | a,e,i,m | b,f,j,n | c,g,k,o | d,h,l,p |
SendBuff | SendBuff | SendBuff | SendBuff | RecvBuff | RecvBuff | RecvBuff | RecvBuff |
MPI_Wtime()
returns a precise value of the time (in seconds) on a particular node.
Call it before and after a block of code and use the differences to get timing information for a particular worker.
MPI_Wtick()
returns the resolution of the timer.
MPI_Probe
- Checks to see if a message is comming. If used with MPI_ANY_SOURCE
, then can identify source from status
.
MPI_Get_count
- Returns the number of elements comming in message. (useful for dynamic allocation).
Sometimes you want to break your problem into multiple domains (think a 2D grid partitioned vertically and horizontally).
00000 11111
00000 11111
00000 11111
00000 11111
00000 11111
22222 33333
22222 33333
22222 33333
22222 33333
22222 33333
Then you can make new groups for vertical communication ((0, 2) and (1, 3)) and others for horizontal communication ((0, 1) and (2, 3)). Then for each group you'd make a corresponding communicator. Within these communicators, these processors will get communicator-specific id's of 0 and 1 respectively.
From there you can organize your code to be communicator independant. A Communicator will be an argument, and from that you'll get the number of processes working on that task and what ID the current process is within that communicator's group.
From there you can build more complex geometries that may or may not be reflected in the networking (the process with the next id might be physically near you in the cluster or on the other side of the system).
The details of this are beyond this class, but here's a decent reference.
module load totalview
mpirun -np 4 -tv your_app_compiled_for_debugging
This is an advnanced topic, but if you want to send data of compound data types (structs) or if you want to send contiguous or non-contiguous data of the same type, then you can define your own MPI types.
For more information, see: http://static.msi.umn.edu/tutorial/scicomp/general/MPI/mpi_data.html
Matrix Multiplication
Communicate only the relevant parts of A to all processes.
Repeat Practice Problem, but
Checkout LLNL's practice problems for different ways to cut up numerical methods so they can be parallelized.