Parallel computing and GPU programming with Julia¶

Part II: Multi-processing and distributed computing¶

Alexis Montoison

In [ ]:
using BenchmarkTools
using Distributed
using MPI
No description has been provided for this image

An implementation of distributed memory parallel computing is provided by module Distributed as part of the standard library shipped with Julia.

Most modern computers possess more than one CPU, and several computers can be combined together in a cluster. Harnessing the power of these multiple CPUs allows many computations to be completed more quickly. There are two major factors that influence performance: the speed of the CPUs themselves, and the speed of their access to memory. In a cluster, it's fairly obvious that a given CPU will have fastest access to the RAM within the same computer (node).

Julia provides a multiprocessing environment based on message passing to allow programs to run on multiple processes in separate memory domains at once.

Julia’s main implementation of message passing for distributed-memory systems is contained in the Distributed module. Its approach is different from other frameworks like MPI in that communication is generally “one-sided”, meaning that the programmer needs to explicitly manage only one process in a two-process operation.

No description has been provided for this image

Master-worker paradigm:

  • One master process coordinates a set of worker processes (that eventually perform computations).
  • Programmer only controls the master directly. The workers are "instructed" (one-sided communication).

Julia can be started with a given number of p local workers with:

julia -p 4

The Distributed module is automatically loaded if the -p flag is used. But we can also dynamically add processes in a running Julia session:

In [ ]:
using Distributed

println(nprocs())
addprocs(4)         # add 4 workers
println(nprocs())   # total number of processes
println(nworkers()) # only worker processes
# rmprocs(workers())  # remove worker processes

Note what happens here: there is one master process which can create additional worker processes, and as we shall see, it can also distribute work to these workers. For running on a cluster, we instead need to provide the --machine-file option and the name of a file containing a list of machines that are accessible via password-less ssh. Support for running on clusters with various schedulers (including SLURM) can be found in the ClusterManagers.jl package.

In [ ]:
myid()
In [ ]:
@sync pmap(i -> println("I'm worker $(myid()), working on i=$i"), 1:10)
In [ ]:
@sync @distributed for i in 1:10
  println("I'm worker $(myid()), working on i=$i")
end

pmap should be preferred when each operation is not very fast. @distributed for is better when each operation is very fast.

In [ ]:
a = [i for i=1:10]
@sync @distributed for i in 1:10
  println("working on i=$i")
  a[i] = i^2
end
a

nothing changed in a!! a is made available to each worker, but it’s basically for “reading” because a whole copy of a is sent to the memory space of each worker. Each worker has its own memory domain, for safety.

In [ ]:
printsquare(i) = println("working on i=$i: its square it $(i^2)")
@sync @distributed for i in 1:10
  printsquare(i) # error
end

For this to work, we need to export functions and packages @everywhere:

In [ ]:
@everywhere printsquare(i) = println("working on i=$i: its square it $(i^2)")
@sync @distributed for i in 1:10
  printsquare(i)
end

Warning: 1 worker ≠ 1 CPU core or thread: if we ask for 50 processors and our machine only has 4, we will see that we have 50 workers, but several workers will be sharing the same CPU (but different memory domains). It will slow us down compared to asking for 4 workers only.

Each process has a unique identifier accessible via the myid() function (master has myid() == 1). The @spawn and @spawnat macros can be used to transfer work to a process, and then return the Future result to the master process using the fetch function. @spawn selects the process automatically while @spawnat lets you choose.

In [ ]:
# execute myid() and rand() on process 2
r = @spawnat 2 (myid(), rand())

# fetch the result
fetch(r)

One use case could be to manually distribute expensive function calls between processes, but there are higher-level and simpler constructs than @spawn / @spawnat:

  • the @distributed macro for for loops. Can be used with a reduction operator to gather work performed by the independent tasks.
  • the pmap function which maps an array or range to a given function.

To illustrate the difference between these approaches we revisit the sum_sqrt function from the Multithreading notebook. To use pmap we need to modify our function to accept a range so we will use this modified version. Note that to make any function available to all processes it needs to be decorated with the @everywhere macro:

In [ ]:
@everywhere function sqrt_sum_range(A, r)
    s = zero(eltype(A))
    for i in r
        @inbounds s += sqrt(A[i])
    end
    return s
end
In [ ]:
A = rand(100_000)
batch = Int(length(A) / 100)
In [ ]:
# distributed (+)
@distributed (+) for r in [(1:batch) .+ offset for offset in 0:batch:length(A)-1]
    sqrt_sum_range(A, r)
end
In [ ]:
# pmap
sum(pmap(r -> sqrt_sum_range(A, r), [(1:batch) .+ offset for offset in 0:batch:length(A)-1]))
In [ ]:
# @spawnat
futures = Array{Future}(undef, nworkers())

@time begin
    for (i, id) in enumerate(workers())
        batch = floor(Int, length(A) / nworkers())
        remainder = length(A) % nworkers()
        if (i-1) < remainder
            start = 1 + (i - 1) * (batch + 1)
            stop = start + batch
        else
            start = 1 + (i - 1) * batch + remainder
            stop = start + batch - 1
        end
        futures[i] = @spawnat myid() sqrt_sum_range(A, start:stop)
    end
    p = sum(fetch.(futures))
end

The @spawnat version looks cumbersome for this case particular case as the algorithm required the explicit partitioning of the array which is common in MPI, for instance. The @distributed (+) parallel for loop and the pmap mapping are much simpler, but which one is preferable for a given use case?

  • @distributed is appropriate for reductions. It does not load-balance and simply divides the work evenly between processes. It is best in cases where each loop iteration is cheap.
  • pmap can handle reductions as well as other algorithms. It performs load-balancing and since dynamic scheduling introduces some overhead it’s best to use pmap for computationally heavy tasks.
  • In the case of @spawnat, because the futures are not immediately using CPU resources, it opens the possibility of using asynchronous and uneven workloads.

Just like with multithreading, multiprocessing with Distributed comes with an overhead because of sending messages and moving data between processes.

In [ ]:
# Finally, it should be emphasized that a common use case of pmap involves heavy computations inside functions defined in imported packages. For example, computing the singular value decomposition of many matrices:
@everywhere using LinearAlgebra
x=[rand(100,100) for i in 1:10]
@btime map(LinearAlgebra.svd, x);
@btime pmap(LinearAlgebra.svd, x);

DistributedArrays: Another way to approach parallelization over multiple machines is through DistributedArrays.jl package. A DistributedArrays is distributed across a set of workers. Each worker can read and write from its local portion of the array and each worker has read-only access to the portions of the array held by other workers.

Summary:

  • @distributed is good for reductions and fast inner loops with limited data transfer.
  • pmap is good for expensive inner loops that return a value.
  • SharedArrays can be an easier drop-in replacement for threading-like behaviors on a single machine.

MPI¶

MPI.jl is a Julia interface to the Message Passing Interface, which has been the standard workhorse of parallel computing for decades. Like Distributed, MPI belongs to the distributed-memory paradigm.

The idea behind MPI is that:

  • Tasks have a rank and are numbered 0, 1, 2, 3, …
  • Each task manages its own memory
  • Each task can run multiple threads
  • Tasks communicate and share data by sending messages.
  • Many higher-level functions exist to distribute information to other tasks and gather information from other tasks.
  • All tasks typically run the entire code and we have to be careful to avoid that all tasks do the same thing.
No description has been provided for this image

MPI.jl provides Julia bindings for the Message Passing Interface (MPI) standard. This is how a hello world MPI program looks like in Julia:

Fundamental MPI functions¶

  • MPI.Init() and MPI.Finalize() (the latter isn't necessary in Julia)

  • MPI.COMM_WORLD: default communicator, includes all MPI ranks

  • MPI.Comm_rank(comm): unique rank of the process calling this function (MPI rank ids start at 0!)

  • MPI.Comm_size(comm): total number of ranks in the given communicator

In [ ]:
using MPI
MPI.Init()
comm = MPI.COMM_WORLD  # MPI.COMM_WORLD is the communicator - a group of processes that can talk to each other
rank = MPI.Comm_rank(comm)  # Comm_rank returns the individual rank (0, 1, 2, …) for each task that calls it
size = MPI.Comm_size(comm)  # Comm_size returns the total number of ranks.
println("Hello from process $(rank) out of $(size)")
MPI.Barrier(comm)

The MPI.jl package contains a lot of functionality, but in principle one can get away with only point-to-point communication (MPI.send() and MPI.recv()). However, collective communication can sometimes require less effort. In any case, it is good to have a mental model of different communication patterns in MPI.

In [ ]:
using MPI
MPI.Init()

comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)

if rank != 0
    # All ranks other than 0 should send a message
    local message = "Hello World, I'm rank $rank"
    MPI.send(message, comm, dest=0, tag=0)
else
    # Rank 0 will receive each message and print them
    for sender in 1:(size-1)
        message = MPI.recv(comm, source=sender, tag=0)
        println(message)
    end
end

Summary:

  • MPI is a standard work-horse of parallel computing.
  • All communication is handled explicitly - not behind the scenes as in Distributed.
  • Programming with MPI requires a different mental model since each parallel rank is executing the same program and the programmer needs to distribute the work by hand.
No description has been provided for this image

References:¶

  • https://enccs.github.io/Julia-for-HPC/distributed
  • https://enccs.github.io/Julia-for-HPC/MPI
  • https://enccs.github.io/Julia-for-HPC/cluster
  • https://github.com/hlrs-tasc/julia-on-hpc-systems
  • https://docs.julialang.org/en/v1/manual/distributed-computing
  • https://hiteshmishra708.medium.com/multiprocessing-in-python-c6735fa70f3f
  • https://sites.google.com/site/cis115textbook/distributed-computing
  • http://cecileane.github.io/computingtools/pages/notes1209.html
  • https://github.com/JuliaParallel/MPI.jl
  • https://memegenerator.net/instance/68378447