using BenchmarkTools
using Distributed
using MPI
![No description has been provided for this image](./Graphics/multithreading_multiprocessing.png)
An implementation of distributed memory parallel computing is provided by module Distributed
as part of the standard library shipped with Julia.
Most modern computers possess more than one CPU, and several computers can be combined together in a cluster. Harnessing the power of these multiple CPUs allows many computations to be completed more quickly. There are two major factors that influence performance: the speed of the CPUs themselves, and the speed of their access to memory. In a cluster, it's fairly obvious that a given CPU will have fastest access to the RAM within the same computer (node).
Julia provides a multiprocessing environment based on message passing to allow programs to run on multiple processes in separate memory domains at once.
Julia’s main implementation of message passing for distributed-memory systems is contained in the Distributed module. Its approach is different from other frameworks like MPI in that communication is generally “one-sided”, meaning that the programmer needs to explicitly manage only one process in a two-process operation.
Master-worker paradigm:
- One master process coordinates a set of worker processes (that eventually perform computations).
- Programmer only controls the master directly. The workers are "instructed" (one-sided communication).
Julia can be started with a given number of p
local workers with:
julia -p 4
The Distributed module is automatically loaded if the -p
flag is used. But we can also dynamically add processes in a running Julia session:
using Distributed
println(nprocs())
addprocs(4) # add 4 workers
println(nprocs()) # total number of processes
println(nworkers()) # only worker processes
# rmprocs(workers()) # remove worker processes
Note what happens here: there is one master process which can create additional worker processes, and as we shall see, it can also distribute work to these workers.
For running on a cluster, we instead need to provide the --machine-file
option and the name of a file containing a list of machines that are accessible via password-less ssh
. Support for running on clusters with various schedulers (including SLURM) can be found in the ClusterManagers.jl package.
myid()
@sync pmap(i -> println("I'm worker $(myid()), working on i=$i"), 1:10)
@sync @distributed for i in 1:10
println("I'm worker $(myid()), working on i=$i")
end
pmap
should be preferred when each operation is not very fast.
@distributed
for is better when each operation is very fast.
a = [i for i=1:10]
@sync @distributed for i in 1:10
println("working on i=$i")
a[i] = i^2
end
a
nothing changed in a!! a is made available to each worker, but it’s basically for “reading” because a whole copy of a is sent to the memory space of each worker. Each worker has its own memory domain, for safety.
printsquare(i) = println("working on i=$i: its square it $(i^2)")
@sync @distributed for i in 1:10
printsquare(i) # error
end
For this to work, we need to export functions and packages @everywhere
:
@everywhere printsquare(i) = println("working on i=$i: its square it $(i^2)")
@sync @distributed for i in 1:10
printsquare(i)
end
Warning: 1 worker ≠ 1 CPU core or thread: if we ask for 50 processors and our machine only has 4, we will see that we have 50 workers, but several workers will be sharing the same CPU (but different memory domains). It will slow us down compared to asking for 4 workers only.
Each process has a unique identifier accessible via the myid()
function (master has myid() == 1
).
The @spawn
and @spawnat
macros can be used to transfer work to a process, and then return the Future
result to the master process using the fetch
function. @spawn
selects the process automatically while @spawnat
lets you choose.
# execute myid() and rand() on process 2
r = @spawnat 2 (myid(), rand())
# fetch the result
fetch(r)
One use case could be to manually distribute expensive function calls between processes, but there are higher-level and simpler constructs than @spawn / @spawnat:
- the
@distributed
macro for for loops. Can be used with a reduction operator to gather work performed by the independent tasks. - the pmap function which maps an array or range to a given function.
To illustrate the difference between these approaches we revisit the sum_sqrt
function from the Multithreading notebook. To use pmap
we need to modify our function to accept a range so we will use this modified version. Note that to make any function available to all processes it needs to be decorated with the @everywhere macro:
@everywhere function sqrt_sum_range(A, r)
s = zero(eltype(A))
for i in r
@inbounds s += sqrt(A[i])
end
return s
end
A = rand(100_000)
batch = Int(length(A) / 100)
# distributed (+)
@distributed (+) for r in [(1:batch) .+ offset for offset in 0:batch:length(A)-1]
sqrt_sum_range(A, r)
end
# pmap
sum(pmap(r -> sqrt_sum_range(A, r), [(1:batch) .+ offset for offset in 0:batch:length(A)-1]))
# @spawnat
futures = Array{Future}(undef, nworkers())
@time begin
for (i, id) in enumerate(workers())
batch = floor(Int, length(A) / nworkers())
remainder = length(A) % nworkers()
if (i-1) < remainder
start = 1 + (i - 1) * (batch + 1)
stop = start + batch
else
start = 1 + (i - 1) * batch + remainder
stop = start + batch - 1
end
futures[i] = @spawnat myid() sqrt_sum_range(A, start:stop)
end
p = sum(fetch.(futures))
end
The @spawnat
version looks cumbersome for this case particular case as the algorithm required the explicit partitioning of the array which is common in MPI, for instance. The @distributed
(+) parallel for loop and the pmap
mapping are much simpler, but which one is preferable for a given use case?
@distributed
is appropriate for reductions. It does not load-balance and simply divides the work evenly between processes. It is best in cases where each loop iteration is cheap.pmap
can handle reductions as well as other algorithms. It performs load-balancing and since dynamic scheduling introduces some overhead it’s best to use pmap for computationally heavy tasks.- In the case of
@spawnat
, because the futures are not immediately using CPU resources, it opens the possibility of using asynchronous and uneven workloads.
Just like with multithreading, multiprocessing with Distributed comes with an overhead because of sending messages and moving data between processes.
# Finally, it should be emphasized that a common use case of pmap involves heavy computations inside functions defined in imported packages. For example, computing the singular value decomposition of many matrices:
@everywhere using LinearAlgebra
x=[rand(100,100) for i in 1:10]
@btime map(LinearAlgebra.svd, x);
@btime pmap(LinearAlgebra.svd, x);
DistributedArrays: Another way to approach parallelization over multiple machines is through DistributedArrays.jl package. A DistributedArrays
is distributed across a set of workers. Each worker can read and write from its local portion of the array and each worker has read-only access to the portions of the array held by other workers.
Summary:
@distributed
is good for reductions and fast inner loops with limited data transfer.pmap
is good for expensive inner loops that return a value.SharedArrays
can be an easier drop-in replacement for threading-like behaviors on a single machine.
MPI¶
MPI.jl is a Julia interface to the Message Passing Interface, which has been the standard workhorse of parallel computing for decades. Like Distributed
, MPI belongs to the distributed-memory paradigm.
The idea behind MPI is that:
- Tasks have a rank and are numbered 0, 1, 2, 3, …
- Each task manages its own memory
- Each task can run multiple threads
- Tasks communicate and share data by sending messages.
- Many higher-level functions exist to distribute information to other tasks and gather information from other tasks.
- All tasks typically run the entire code and we have to be careful to avoid that all tasks do the same thing.
MPI.jl provides Julia bindings for the Message Passing Interface (MPI) standard. This is how a hello world MPI program looks like in Julia:
Fundamental MPI functions¶
MPI.Init()
andMPI.Finalize()
(the latter isn't necessary in Julia)MPI.COMM_WORLD
: default communicator, includes all MPI ranksMPI.Comm_rank(comm)
: unique rank of the process calling this function (MPI rank ids start at 0!)MPI.Comm_size(comm)
: total number of ranks in the given communicator
using MPI
MPI.Init()
comm = MPI.COMM_WORLD # MPI.COMM_WORLD is the communicator - a group of processes that can talk to each other
rank = MPI.Comm_rank(comm) # Comm_rank returns the individual rank (0, 1, 2, …) for each task that calls it
size = MPI.Comm_size(comm) # Comm_size returns the total number of ranks.
println("Hello from process $(rank) out of $(size)")
MPI.Barrier(comm)
The MPI.jl package contains a lot of functionality, but in principle one can get away with only point-to-point communication (MPI.send()
and MPI.recv()
). However, collective communication can sometimes require less effort. In any case, it is good to have a mental model of different communication patterns in MPI.
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
if rank != 0
# All ranks other than 0 should send a message
local message = "Hello World, I'm rank $rank"
MPI.send(message, comm, dest=0, tag=0)
else
# Rank 0 will receive each message and print them
for sender in 1:(size-1)
message = MPI.recv(comm, source=sender, tag=0)
println(message)
end
end
Summary:
- MPI is a standard work-horse of parallel computing.
- All communication is handled explicitly - not behind the scenes as in
Distributed
. - Programming with MPI requires a different mental model since each parallel rank is executing the same program and the programmer needs to distribute the work by hand.
![No description has been provided for this image](./Graphics/meme_distributed_computing.png)
References:¶
- https://enccs.github.io/Julia-for-HPC/distributed
- https://enccs.github.io/Julia-for-HPC/MPI
- https://enccs.github.io/Julia-for-HPC/cluster
- https://github.com/hlrs-tasc/julia-on-hpc-systems
- https://docs.julialang.org/en/v1/manual/distributed-computing
- https://hiteshmishra708.medium.com/multiprocessing-in-python-c6735fa70f3f
- https://sites.google.com/site/cis115textbook/distributed-computing
- http://cecileane.github.io/computingtools/pages/notes1209.html
- https://github.com/JuliaParallel/MPI.jl
- https://memegenerator.net/instance/68378447