Writing Parallel Python Code
Python is one of the most popular programming languages in use today.
When working with a cluster computer, it is natural to ask how to take
advantage of all of these nodes and cores in order to speed up
computation as much as possible. On a laptop, one common approach is to
Pool class in the Python
multiprocessing library in order to
distribute computation to other cores on the machine. While this
approach certainly works on a cluster too, it does not allow you to take
full advantage of the available computing power. Each job is limited to
a single node and all the cores that are currently available on it. For
ARGO this places a hard upper limit of 64 cores per job, although a
practical limit of 20-25 cores is more appropriate if you want to get
your job scheduled in a reasonable amount of time.
mpi4py library has a Pool-like class that is very similar to the
one in the
multiprocessing library. This wiki page describes how to
mpi4py to setup an environment and Python code to take advantage
of a much larger number of cores.
Installing mpi4py in a Python Virtual Environment
When installing Python modules, we recommend using a Python Virtual Environment. When working on a particular project you may want to install a number of different packages. We recommend creating one VE for each project and installing everything that you need into it.
For the purposes of this demonstration, lets create a virtual
environment called MPIpool, and install
mpi4py into it.
[jdoe@ARGO-1 ~]$ module load python/3.7 [jdoe@ARGO-1 ~]$ python -m virtualenv ~/MPIpool [jdoe@ARGO-1 ~]$ source ~/MPIpool/bin/activate (MPIpool) [jdoe@ARGO-1 ~]$ pip install mpi4py Collecting mpi4py Using cached https://files.pythonhosted.org/packages/04/f5/a615603ce4ab7f40b65dba63759455e3da610d9a155d4d4cece1d8fd6706/mpi4py-3.0.2.tar.gz Installing collected packages: mpi4py Running setup.py install for mpi4py ... done Successfully installed mpi4py-3.0.2
Using MPIPoolExecutor in a Python Program
Here we have a sample Python program that calculates prime numbers. It
MPIPoolExecutor class to farm out calculations to "workers".
The workers can be running on any node and core in the cluster. There
must always be one "manager" that is responsible for farming out the
work, and collecting the results when finished.
# MPIpool.py from mpi4py.futures import MPIPoolExecutor import math import textwrap def calc_primes(range_tuple): """Calculate all the prime numbers in the given range.""" low, high = range_tuple if low <= 2 < high: primes =  else: primes =  start = max(3,low) # Don't start below 3 if start % 2 == 0: # Make sure start is odd, i.e. skip evens start += 1 for num in range(start, high, 2): # increment by 2's, i.e. skip evens if all(num % i != 0 for i in range(3, int(math.sqrt(num)) + 1, 2)): primes.append(num) return primes def determine_subranges(fullrange, num_subranges): """ Break fullrange up into smaller sets of ranges that cover all the same numbers. """ subranges =  inc = fullrange // num_subranges for i in range(fullrange, fullrange, inc): subranges.append( (i, min(i+inc, fullrange)) ) return( subranges ) if __name__ == '__main__': fullrange = (0, 100000000) num_subranges = 1000 subranges = determine_subranges(fullrange, num_subranges) executor = MPIPoolExecutor() prime_sets = executor.map(calc_primes, subranges) executor.shutdown() # flatten the list of lists primes = [p for plist in prime_sets for p in plist] print(textwrap.fill(str(primes),80))
The main work is done in the
calc_primes() function, which is what the
workers run. It calculates all the prime numbers within a range defined
rangeTuple, a vector that contains two values: the lower and upper
bounds of the range.
The rest of the code runs on the "manager". It calls the
determine_subranges() function to define the different pieces of work
to send to the workers. The
MPIPoolExecutor.map() function actually
handles all the complexity of coordinating communications with workers,
farming out the different tasks, and then collecting the results.
mpi4py documentation suggest that when
MPIPoolExecutor, your code should use the
if __name__ == '__main__': code construct at the bottom of your main
file in order to prevent workers from spawning more
Submitting the Program to Slurm
Here we provide a Slurm script for running such a job.
#!/bin/sh ## Give your job a name to distinguish it from other jobs you run. #SBATCH --job-name=MPIpool ## General partitions: all-HiPri, bigmem-HiPri -- (12 hour limit) ## all-LoPri, bigmem-LoPri, gpuq (5 days limit) ## Restricted: CDS_q, CS_q, STATS_q, HH_q, GA_q, ES_q, COS_q (10 day limit) #SBATCH --partition=all-HiPri ## Separate output and error messages into 2 files. ## NOTE: %u=userID, %x=jobName, %N=nodeID, %j=jobID, %A=arrayID, %a=arrayTaskID #SBATCH --output=/scratch/%u/%x-%N-%j.out # Output file #SBATCH --error=/scratch/%u/%x-%N-%j.err # Error file ## Slurm can send you updates via email #SBATCH --mail-type=BEGIN,END,FAIL # ALL,NONE,BEGIN,END,FAIL,REQUEUE,.. #SBATCH --mail-user=<GMUnetID>@gmu.edu # Put your GMU email address here ## Specify how much memory your job needs. (2G is the default) #SBATCH --mem=1G # Total memory needed per task (units: K,M,G,T) ## Specify how much time your job needs. (default: see partition above) #SBATCH --time=0-02:00 # Total time needed for job: Days-Hours:Minutes #SBATCH --ntasks=51 # 50 workers, 1 manager ## Load the relevant modules needed for the job module load python/3.7 source ~/MPIpool/bin/activate ## Run your program or script mpirun -np $SLURM_NTASKS python -m mpi4py.futures MPIpool.py
Be sure to replace the
>) with you own email address.
Note that we set
--ntasks=51 in order to allocation 1 manager and 50
workers. There must always be only 1 manager and at least 1 worker. Note
that we use the
$SLURM_NTASKS environment variable in the call to
mpirun to make sure that the number of cores used equals the number
allocated by the
mpi4py is based on the MPI libraries, we need to load one of
the MPI modules. Here I have chosen OpenMPI. The
program must be used to properly launch an MPI program, and this program
is no exception.
The runtime for this program using 50 workers is about 1 minute. That is significantly faster than the 45 minutes needed to run the program using a single core. Of course there is a point of diminishing returns (and even an added cost) in adding more and more workers. It is good to experiment with different numbers to see how many workers are optimal. The maximum number of cores that a user can request is currently 300. This may change in the future.
This is an example of an algorithm that is "embarrassingly parallel". It is very easy to divide it up into smaller pieces and pass them out. Many algorithms are not so easy to parallelize in this way. MPI is a very mature library, and it has the tools to handle problems that are much more complex than this. It is the de facto standard for doing large scale parallelization, and if that is your goal you can benefit from learning more about it. Those interested in a more "Pythonic" library may want to look into Dask.