# Weeks 9 - 10: Parallel processing in Python

Python offers support for two common models for concurrent computation:

1. Threading: Programs running in parallel in a shared Python environment.
2. Multiprocessing: Programs running in parallel in separate Python environments.

Threading and multiprocessing allow us to "distribute" the work of a program across multiple threads and/or processes by splitting it into a collection of subtasks that can be executed in parallel. Some examples follow:

• Parallel array sum: suppose we want to compute the sum of the elements of a very large array. We can "split" they array into non-overlapping blocks and have a separate thread compute the partial sum of each block. Finally, we output the sum of the partial sums.

• Parallel matrix-vector multiplication: to compute the product $$x \mapsto Ax$$ efficiently, we can parallelize it by computing $$A_{i,:} x$$ (where $$A_{i,:}$$ is the $$i$$-th row of $$A$$) concurrently in separate threads or processes.

We begin our discussion on threads with a somewhat contrived example: we wish to increment a counter $$N$$ 5 times, and perform each increment in a separate thread.

import threading
import time

N = 0

def worker():
global N
M = N
print(M)
N = M+1
time.sleep(1)

for i in range(5):
w.start()


The above example demonstrates the standard work flow of the threading module:

1. A "master" process spawns a number of threads, using the threading.Thread object and its target parameter to specify what each thread should do.
2. Threads spawned by the same process have shared memory, and can access and modify the same resources; in the above example, they all try to increment the same variable N.
3. The master process does not "pause" while the threads are active.
4. Important: threads typically execute out-of-order. For example, the first thread created need not be the first one to complete its work.

The second point above implies that threading code can lead to race conditions!

• Multiple threads try to access and modify the same resource concurrently.
• Special care needed to ensure output is deterministic.

### Race conditions

To better demonstrate how a race condition can affect the output of our program, consider the simplest case where we only have two threads and let us focus on the following part of our code:

def worker():
global N
M = N
N = M + 1


Here, the order of execution matters! For example, consider the following sequence of events:

1. Thread 1 reaches the M = N statement; creates a local variable M = 0, since N = 0 initially.
2. Thread 2 reaches the M = N statement, does the same thing as Thread 1. Since Python variables are local by default, this M is different from the M that Thread 1 sees.
3. Thread 1 executes N = M + 1, setting N = 1.
4. Thread 2 executes N = M + 1. This also sets N = 1.

After both threads terminate, we have N = 1 instead of N = 2, which would be the "expected" behavior.

### Synchronization

To prevent race conditions, we have a number of so-called synchronization mechanisms at our disposal. The simplest one is called a lock! At a high-level, a lock is an object that can be "held" by a single thread at a time, so we can use them to designate "safe" regions just like below:

from threading import Thread, Lock

N = 0
lock = Lock()

def worker():
global N
lock.acquire()      # enter safe region
M = N
N = M + 1
lock.release()      # leave safe region


In the above code, whenever a thread enters the safe region, it tries to acquire the lock. If it cannot, it means that another thread is currently in the safe region and it must wait until the lock is released, preventing any unwanted changes. The lock is released upon exiting the safe region, enabling other threads to enter it.

Locks as context managers

Enclosing critical regions between lock.acquire() and lock.release() statements is somewhat cumbersome and prone to human error (e.g., one could forget to add the .release() statement). Luckily, we can use locks as context managers using Python's with statement, just like below:

with lock:
do_something()


The above snippet of code is equivalent to writing

lock.acquire()
do_something()
lock.release()


For that reason, we will use the context manager syntax for locks in our examples from now on.

### Waiting for threads to stop

In addition to the Thread.start() method, upon which a thread begins to run, we have the Thread.join() method, which waits until a thread has finished. For example:

from threading import Thread, Lock

N = 0
lock = Lock()

def worker():
global N
lock.acquire()      # enter safe region
M = N
N = M + 1
lock.release()      # leave safe region

for i in range(5):
t.start()

# wait until they finish
t.join()

print(f"Value of N is: {N}")


Note that we have to keep track of the threads created in a list in order to be able to wait for them to finish in the future.

Question

What would happen if we rewrote the final few lines of the above program as below?

for i in range(5):
t.start()
t.join()


Since the join() function waits / "blocks" until a thread has finished its execution, the above code would be equivalent to having no paralellization at all.

So far we have used the target parameter to designate the code to be executed by each thread, but the target functions did not accept any parameters. When the target function expects a set of arguments, we can pass them using the args parameter of the Thread object as shown below:

from threading import Thread

def worker(arg1, arg2):
print(arg1 + arg2)

for i in range(5):
t.start()


A more realistic example of passing arguments will follow later, as we try to speed up a sorting algorithm using threads.

### Other synchronization mechanisms

Locks are not the only options for synchronization between threads. In threading we have access to several synchronization primitives:

1. Mutex: a fancy name for a lock; designates a "safe" region that only one thread can enter at a time.

2. Semaphore: a "counting" version of the lock that allows multiple threads to enter a region. A semaphore can be treated as a counter that works as follows:

3. before a thread enters a safe region, it tries to decrement the value of the semaphore by 1.
4. when a thread leaves a safe region, it increments the value of the semaphore by 1.
5. the value of the semaphore cannot drop below 0; if a semaphore has value 0 and a thread tries to decrement it, it will "block" until it is able to do so without violating this requirement.

6. Barrier: A barrier indicates that a thread must block until all other threads spawned by the same process reach the barrier.

Note that the above synchronization mechanisms are not thread-specific and are also available and necessary in a multiprocessing environment.

### The producer-consumer problem

In the producer-consumer problem, we have the following setup: one or more threads generate data, and a disjoint collection of threads "consume" them and process them. The order at which data are processed must satisfy a FIFO order; the first piece of data generated must be the first to get processed, and so on. In particular, we can assume that we have a FIFO data structure such as a queue shared between the different threads / processes.

For the producer-consumer framework with a single producer, a semaphore is necessary for synchronization so as to prevent any threads from attempting to consume data when none are available. This would not be possible with a lock (at least not with additional modifications):

from collections import deque

queue = deque()

def producer():
global queue
while True:
new_data = generate_data()
queue.insert(new_data)
sem.release()               # increment semaphore value by 1

def consumer():
global queue
sem.acquire()                   # attempt to decrement value by 1
consume(queue.popleft())        # if successful, get next item from queue


Combining synchronization mechanisms

In the above example, the following two operations will likely be executed concurrently:

# CONSUMERS
queue.popleft()
# PRODUCER
queue.insert()


Depending on the implementation of the queue, the above operations may not be thread-safe, i.e., running them concurrently might produce inconsistent results or even mess up the internals of the queue data structure. To guard against this, we could introduce an additional lock in our code:

queue = deque()

def producer():
global queue
while True:
new_data = generate_data()
with lock:
queue.insert(new_data)
sem.release()

def consumer():
global queue
sem.acquire()
with lock:
consume(queue.popleft())


In practice, many Python libraries offer thread-safe implementations for us.

### A more involved example: multi-threaded sorting

Suppose we have $$T$$ available threads and want to sort an array of numbers. In addition, suppose we have an implementation of sort(A) that sorts an array in-place with running time $$O(n \log n)$$, where $$n$$ is the size of the input array.

In what follows, we will parallelize our sorting algorithm so that its running time is

$O\left(n \log T\right) + O\left(\frac{n}{T} \log(n / T) \right).$

For $$T = \Theta(\log n)$$, the above yields an asymptotically faster running time.

The idea is straightforward: we will split our input array $$A$$ into disjoint blocks $$A_1, \dots, A_T$$, each of size $$\frac{n}{T}$$ (assuming $$n$$ is divisible by $$T$$ for simplicity). Then:

1. We will call sort on each block $$A_i$$ in a separate thread. Since sorting is done in place, this will result in a collection of $$T$$ sorted arrays.

2. We will use an $$O(n \log T)$$ algorithm for merging $$T$$ sorted arrays in the parent process.

Even though we are using threading, the above algorithm has no race conditions, since each thread handles a disjoint block of the input array $$A$$. The complete algorithm follows below:

from math import ceil
from heapq import merge

def sort(A, start, end):
A[start:end] = sorted(A[start:end])

def merge(array_of_arrays):
return list(heapq.merge(*arrays))

def sort_parallel(A, T):
bsize = ceil(len(A) / T)
for i in range(T):
start, end = bsize * i, min(bsize * (i + 1), len(A))
t = Thread(target=sort, args=(A, start, end))
t.start()

# wait until all threads have finished

return merge([
A[i*bsize:min((i+1)*bsize, len(A))] for i in range(T)])


### More questions

Question

In the above examples, we used global N to edit the variable N in different threads. However, parallel_sort passes the array A as argument. Can we do the same with N (i.e., can we change the above code snippet to look like below)?

from threading import Lock, Thread

def worker(N):
with lock:
M = N
N = M + 1

N = 0
for i in range(5):
t.start()


The answer is no, since variables that hold numbers are passed by value in Python, and the proposed approach will just result in each thread creating a local copy of N (in other words, this would not work even if we didn't have threads).

On the other hand, arrays and other objects are passed by reference, which is why parallel_sort worked as is.

Threading in Python is limited and not really intended for CPU-intensive tasks. The reason is by design; Python has something called the Global Interpreter Lock, which means that bytecode running in a single Python environment (as is the case with threading) cannot run in parallel (it can still run out of order, but not concurrently).

The threading library is still very useful for IO-bound tasks (e.g., downloading several files in parallel). However, for CPU-bound tasks, the multiprocessing library presented below is the recommended approach to achieve parallelism.

## Multiprocessing

Multiprocessing in Python is possible via the homonymous library, which allows a "master" or base process to spawn several other Python processes, each of them effectively running in a separate Python environment. For reference, let us see how we would rewrite the parallel increment example from the previous section in multiprocessing:

import multiprocessing as mp

def worker(N, lock):
with lock:
N.value += 1        # modify using the value field

procs = []
N     = mp.Value('i', 0)    # 'i': integer type
lock  = mp.Lock()

# create 5 processes
for i in range(5):
p = mp.Process(target=worker, args=(N, lock))
p.start()
procs.append(p)

# wait for all processes to stop
for p in procs: p.join()


Key differences:

• memory is no longer shared by default, so we can't use global N as we did before

• instead, we use "special" constructs for sharing objects b/w processes:

1. Value for a shared scalar value

2. Array for a shared array

3. Queue for a shared queue that multiple processes can read/write to

• synchronization mechanisms are still necessary when editing shared memory

While synchronization mechanisms from the multiprocessing library are similar (e.g., Lock, Semaphore, etc.), they have to be passed as arguments to the worker function explicitly.

### Multiprocessing examples

#### Populating an array in parallel

Goal: suppose we have nproc processes available, and wish to generate a random array with elements in the range [0, 1000] and length N in parallel.

The idea is to let each process populate a separate "block" of the array. To do so, we will create a shared memory array (multiprocessing.Array). Here are some examples of defining such a shared array:

from multiprocessing import Array

N = 100
integer_array = Array('i', N)
double_array  = Array('d', N)


Note

Shared memory arrays such as the above are statically typed (i.e., their type must be known in advance) since they get mapped to low-level structures to allow sharing between processes.

The above arrays can be indexed just like Python lists. In addition, all their elements are initially set to 0. With this at hand, we can write our program:

from math import ceil
from multiprocessing import Process, Array

def worker(A, start, end):
A[start:end] = np.random.randint(0, 1000, size=(end-start,))

if __name__ == "__main__":
array = Array('i', 250)         # example: N = 250
nproc = 16
bsize = ceil(len(array) / nproc)
procs = [None] * nproc

for i in range(nproc):
# split array into chunks
start, end = i * bsize, min((i+1) * bsize, len(array))
p = Process(target=worker, args=(array, start, end))
p.start()
procs[i] = p

# wait until they all end
for p in procs: p.join()


The above snippet uses bsize to determine the size of the blocks of array handled by each process. Every different block is populated using calls to np.random.randint with appropriate size.

Common gotchas

Some important things to keep in mind when writing multiprocessing code:

1. Shared memory objects still need to be passed to the worker function as parameters.

2. Make sure to use the if __name__ == "__main__" ... construct to ensure that only the first process executes the "main" part of the code!

Tip

In Python 3, ceil() returns an integer; in Python 2, it would return a float. If you are using Python 2 (which is not recommended), you should cast it to an integer:

bsize = int(ceil(len(array) / nproc))


#### Parallel array sum

Goal: We want to compute the sum of an array in parallel.

As in the previous example, we will split the array in separate blocks or "chunks" and assign each chunk to a different process. Each process will compute the partial sum of its assigned chunk, and the parent process will sum over the partial sums.

However, there is a nontrivial obstacle to overcome. How will the children processes communicate their results back to the parent process?

• We could add a return statement at the end of the worker function, but that would not help.
• Instead, we will use a multiprocessing.Queue to communicate return values.

The Queue class

A multiprocessing.Queue is a race-condition-free implementation of a FIFO queue, and can be used to communicate values back and forth between multiple processes. No synchronization mechanisms are necessary when using its get or put methods.

The program follows below:

import numpy as np
from multiprocessing import Process, Queue

def worker(A, start, end, queue):
queue.put(sum(A[start:end]))

A     = np.random.randint(0, 1000, size=(250,))
nproc = 16
bsize = (len(A) // nproc) + 1
procs = [None] * nproc
queue = Queue()
for i in range(nproc):
# figure out subarray limits
start, end = i * bsize, min((i + 1) * bsize, len(A))
# start a process
p = Process(target=worker, args=(A, start, end, queue))
p.start()
procs[i] = p

Asum = sum([queue.get() for i in range(nproc)])
for p in procs: p.join()


Getting elements from the Queue

Note the unintuitive order in which we are get()-ing the return values above:

Asum = sum([queue.get() for i in range(nproc)])
for p in procs: p.join()


One would expect that we would first join() the processes and get the return values from the Queue later. However, the correct pattern is the reverse, which we use here.

An explanation of why this is the correct pattern can be found in this StackOverflow thread.

Question

Why not use a multiprocessing.Array for the array A?

Since we are not changing the values of A but rather only summing over them, there is no need for A to be shared; it is sufficient for each process to receive its own copy of it.

Number of processes

To get the best out of parallelism, it is important to use the correct number of processes. The "ideal" situation is when all processes can run completely in parallel by assigning exactly one to each processor.

To determine this dynamically, you can use the os library:

from os import cpu_count

nprocs = cpu_count()
# parallel code using nprocs processes in total


### The dining philosophers problem

So far, our examples either had no race conditions or suffered from a simple race condition: many threads or processes are trying to edit a value in shared memory at the same time, causing unpredictable results. However, there are (more rare) instances of race conditions that lead to deadlocks.

Dining philosophers problem: Suppose that 5 philosophers are sitting on a circular table with just 5 chopsticks, and the following hold:

• philosophers alternate between thinking and eating

• 1 chopstick is placed between each pair of adjacent philosophers

• to eat, a philosopher must pick up both their right and left chopsticks

Note

The above is just a pedagogical instantiation of a more abstract model:

• each philosopher is a process

• "thinking" means that the process is doing some CPU-bound task

• "eating" is some resource-bound task

• "chopsticks" are resources shared between multiple processes

The example is due to Edsger Dijkstra (1965).

The dining philosophers problem presents a twofold set of challanges:

1. Since philosophers are competing for chopsticks at the same time, we need to introduce synchronization mechanisms to ensure consistent results (i.e., two adjacent philosophers should not be eating at the same time).

2. Even with synchronization, the system can reach a state where no progress can be made! This is a so-called deadlock.

Consider the following proposed solution to the dining philosophers problem:

1. A philosopher will think() until left chopstick becomes available
2. When the left chopstick becomes available, they pick it up and wait for right chopstick
3. When the right chopstick becomes available, they pick it up and eat()
4. Put the right chopstick down and then the left chopstick down (order does not matter)
5. Repeat from the beginning

Problem: if all philosophers pick up their left chopstick simultaneously, no progress can be made! Philosopher 1 is waiting for the chopstick on their right, which is held by Philosopher 2. Philosopher 2 is waiting for a chopstick held by Philosopher 3, and so on.

To avoid the resulting deadlock, E. Dijkstra proposed imposing an ordering on the chopsticks

1. Let $$i \in \{0, \dots, 4\}$$ be the index of the philosophers (e.g., clockwise).

2. The chopstick on the left of philosopher $$i$$ is given index $$i$$, and the one on the right is given index $$(i + 1) \; \mathrm{mod} \; 5$$.

3. Waiting is done in order. In particular, philosopher $$i$$:

• waits for chopstick $$i$$ first if $$i < (i + 1) \; \mathrm{mod} \; 5$$
• otherwise, waits for the other chopstick first

Why is this helpful? It is immediate that the previous scenario that was causing the deadlock can no longer happen. In particular, if 4 philosophers pick the chopsticks on their left side at the same time, the 5th one will not be able to pick up either!

A solution using threading follows below (the multiprocessing version of this will be similar, subject to some bookkeeping):

from random import randint

chopsticks = [Lock() for _ in range(5)]   # 5 chopsticks

def eat(i):
first, last = min(i, (i+1) % 5), max(i, (i+1) % 5)
while True:
print(("Philosopher %d waits for first chopstick" % i))
with chopsticks[first]:
print(("Philosopher %d waits for second chopstick" % i))
with chopsticks[last]:
sleep(randint(0, 2))   # "EAT" for some time



Note

The dining philosophers problem is simple to resolve due to the dependence structure obeyed by the processes and the shared resources. However, modern operating systems have to solve similar problems in vastly different scales (with hundreds or thousands of shared resources).

In those cases, "hand-made" solutions may be inadequate or simply too tedious to derive, calling for advanced synchronization mechanisms. Monitors are an example of such a mechanism.

### Process Pools

Finally, multiprocessing introduces an object called the process Pool, which automates certain parts of process management (e.g., starting and stopping, as well as obtaining return values, etc.).

Process pools offer several methods - some commonly used ones are summarized below:

• Pool.map(f, iterable): apply the function f to each element in iterable

• Pool.apply(f, args): call the function f with arguments args:

• Pool.starmap(f, iterable): this is similar to map, but with a subtle difference; suppose that f has the following signature:

def f(arg1, arg2):
return arg1 + arg2


Running ProcessPool.map(f, [(1, 2), (3, 4), (5, 6)]) will fail, because it will try to compute:

[ f((1,2)), f((3, 4)), f((5, 6)) ]


In other words, f will expect two arguments but only receive a single tuple as an argument. Instead, starmap will compute

[ f(*(1, 2)), f(*(3, 4)), f(*(5, 6)) ]


which will apply, e.g., f(1, 2) as expected.

Tip

All of the above methods are blocking, which means that the parent process stops or "blocks" until the subprocesses complete their assigned tasks. The multiprocessing library offers non-blocking versions, which can be distinguished by the _async() suffix in their name.

The official documentation has more details on how these work in practice.

#### Process Pool examples

##### Example 1: Parallel matrix sum

We wish to compute the sum over all the elements of a 2D matrix A. One way to do so is to make each process compute the sum over a separate row of A, and add these partial sums up in the parent process.

They key to write this using a ProcessPool is to recall that a 2D numpy.array object is iterated row-by-row by default:

import numpy as np

A = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]

for row in A:
print(row)
# will print:
# [1, 2, 3]
# [4, 5, 6]
# [7, 8, 9]


Given this, we can use the Pool.map method to accomplish our task:

import os
import numpy as np
from multiprocessing import Pool

nproc = os.cpu_count()
# create a random array
A = np.random.randint(0, 1000, size=(1000, 100))

with Pool(processes=nproc) as pool:
# note: chunksize is optional
parsum = sum(pool.map(sum, A, chunksize=(np.shape(A)[0] // nproc)))

print(parsum, np.sum(A))   # the two sums will be the same

##### Example 2: Parallel merging of sorted lists

Recall the following task: given sorted lists $$L_1, \dots, L_K$$, we wish to merge them into a big list containing all the elements of $$L_1, \dots, L_K$$ in sorted order.

The natural divide-and-conquer algorithm to accomplish this in time $$O(N \log K)$$, where $$N$$ is the total number of elements, is the following:

1. Write a function single_merge(A, B) that merges sorted A and B in time $$O(\texttt{len}(A) + \texttt{len}(B))$$.
2. Pair the lists up: form $$(L_1, L_2), (L_3, L_4), \dots, (L_{K-1}, L_K)$$
3. Merge each pair using simple_merge.
4. Repeat steps 1-3 until a single list remains.

Note: in Step 2, we can always assume that $$K$$ is a multiple of $$2$$ by adding the empty list into our collection without altering the result, if necessary.

Now, we explore how to parallelize this algorithm.

Step 1: if Ls is a list of sorted lists with K lists in total, we can use

zip(Ls[:(K // 2)], Ls[(K // 2):])


to create an iterable over pairs $$(L_1, L_{K/2+1}), \dots, (L_{i}, L_{K/2 + i})$$. Since zip "loses" elements if its two arguments don't have the same length, we pad an empty list to make sure K is even, if necessary.

Step 2: We now apply the single_merge function to each pair in the result of zip using Pool.starmap. The reason we have to use starmap was explained at the beginning of our discussion: single_merge expects two arguments, yet the output of zip is a list of tuples.

Step 3: Repeat Steps 1-2 until Step 2 results in a single list. The total number of times we will have to do this is $$O(\log_2 K)$$ times, since each time we are reducing the number of sorted lists to merge by half.

The Python code below contains the full example:

from os import cpu_count
from heapq import merge
from multiprocessing import Pool

def single_merge(list_a, list_b):
return list(merge(list_a, list_b))

def merge(Ls):
while True:
k = len(Ls)
if k == 1:          # eventually, we will end up in this case
return Ls[0]
if k % 2 == 1:
Ls.append([])   # make sure number of lists divisible by 2
k += 1
# parallelize merging step
with Pool(processes=cpu_count()) as pool:
Ls = pool.starmap(single_merge, zip(Ls[(k // 2):], Ls[:(k // 2)]))