Skip to content

Week 11: MapReduce

High-level overview

MapReduce is a framework for distributed computation that helps scale up parallel programs. Writing code for parallel or distributed processing manually can quickly become tedious due to the need to manage communication between machines, synchronization, etc. In response to that, the MapReduce framework defines 3 fundamental steps (that can be iterated) to write parallel programs:

  1. map: in this step, a function is applied to a list of inputs and outputs a list of (key, value) pairs.

  2. shuffle (also referred to as the combine or partition step): the (key, value) tuples generated in the map step are grouped based on their key field.

  3. reduce: finally, a function is applied to a series of (key, [value_1, value_2, ..., value_N]) pairs generated by the shuffle step, and outputs another list of (key, value) pairs

Since MapReduce is a framework for distributed computing, the reader should keep in mind that the map and reduce steps can happen concurrently on different machines within a compute network. The shuffle step that groups data per key ensures that (key, value) pairs with the same key will be collected and processed in the same machine in the next step.

Distributed frameworks

Though the MapReduce model is abstract, it is most often used in the context of a larger distributed computation framework; the standard example is Apache Hadoop. Often, people use the Apache Spark compute engine together on top of a Hadoop installation.

Iterating MapReduce steps

The output of the reduce step is itself a list of (key, value) pairs, allowing the programmer to "compose" multiple steps of MapReduce. In that case, the output of the reduce function is again grouped by key and passed on to the map function of the next step.

Because there are multiple backends implementing MapReduce, we will use the mrjob library to write MapReduce programs without having to worry about setting up the backend. To run the examples below, make sure to install the library first:

$ pip install --user mrjob

Example: Word Count

As a first example, suppose we have a text file consisting of multiple lines and we wish to find the count of each word appearing in that file. We will use the MapReduce framework to do that, as follows:

  1. The map step will split each input line to a list of words, and output (word, 1) for each word found;

  2. The shuffle step will group all (word, 1) pairs by word, and output a list of (word, [1, 1, ..., 1]) pairs;

  3. Finally, the reduce step will sum over the number of 1s for each word in the output of the shuffle step.

Note that the number of 1s in a (word, [1, 1, ..., 1]) pair indicates the number of appearances of word. The code follows below:

from mrjob.job import MRJob
import re

WORD_REGEX = re.compile(r"[\w]+")

class MRWordCount(MRJob):
    def mapper(self, _, line):
        for word in WORD_REGEX.findall(line):
            yield word.lower(), 1

    def reducer(self, word, counts):
        yield word, sum(counts)

if __name__ == "__main__":
    MRWordCount().run()

Method declarations

Both the mapper and reducer functions have signatures that look like below:

def mapper(self, key, value): ...
def reducer(self, key, value): ...

In short, both mapper and reducer are invoked on a series of key-value pairs, and this is reflected in their declarations. The second argument of mapper is omitted here because mapper is expected to process lines directly from an input file.

Running mrjob examples

If the above script is in a file called mr_word_count.py and your input file is called data.txt, it suffices to write

$ python mr_word_count.py data.txt

assuming that data.txt is in the same folder as your script. The run() method will take care of splitting the file line-by-line and "feeding" each line to the mapper function.

Let us take a step-by-step look at how this example works. Suppose that the input file contains the following lines:

hello ORIE students
welcome to ORIE

The mapper function will process the file line-by-line. Processing the first line will lead to the following intermediate outputs:

(hello, 1), (orie, 1), (students, 1)

while processing the second line will output:

(welcome, 1), (to, 1), (orie, 1)

Now comes the shuffle step - if we have two tuples of the form (word, 1) we merge them into a single tuple of the form (word, [1, 1]), and so on. Therefore, we obtain

(hello, [1]), (orie, [1, 1]), (students, [1]), (welcome, [1]), (to, [1]).

Note that the number of 1s appearing in the list for each word is exactly the number of appearances of that word.

Finally, the reducer sums over the list of 1s for each distinct word, leading to the following output:

(hello, 1), (orie, 2), (students, 1), (welcome, 1), (to, 1)

Tip

Note that we make each word lowercase by using word.lower() in the output of the mapper function. This is because grouping is case-sensitive, i.e., the strings "orie" and "ORIE" would be treated as different keys by MapReduce.

More details on MapReduce

There are several things to keep in mind when running MapReduce jobs:

  1. Even though all our examples use local data, data can be stored in a distributed fashion. This means that:

    • multiple machines may be processing a local file by a given name;
    • even if input is a single file, each line could be handled by a different process! (depends on implementation)
  2. Grouping outputs by key always happens to ensure consistency:

    • data that look like (key, value) are grouped by key after each call;
    • there is no way to prevent this grouping from happening.
  3. The output of mapper and reducer functions must be in the form (key, value). In addition:

    • between each call, key and value are serialized (using the JSON format);
    • some data types, such as set, cannot be used without "hacks".

Chaining multiple steps

MapReduce jobs are composable, meaning that the map-shuffle-reduce pipeline can be applied to a series of inputs multiple times to perform complex tasks. The mrjob library allows us to chain multiple steps, as long as each step:

  • implements at least one of mapper, reducer, or combiner
  • follows the (key, value) output format

Here is an example, using multiple steps of MapReduce to find the word of maximum frequency in a file.

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_REGEX = re.compile(r"[\w]+")

class MRMaxFreq(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer),
            MRStep(mapper=self.mapper_post,
                   reducer=self.reducer_post)
        ]

    def mapper(self, _, line):
        for word in WORD_REGEX.findall(line):
            yield word.lower(), 1

    def reducer(self, word, counts):
        yield word, sum(counts)


    # keys: None, values: (word, word_count)
    def mapper_post(self, word, word_count):
        yield None, (word, word_count)

    # sort list of (word, word_count) by word_count
    def reducer_post(self, _, word_count_pairs):
        yield max(word_count_pairs, key=lambda p: p[1], reverse=True)

if __name__ == "__main__":
    MRMaxFreq().run()

There are several moving parts here, so let us parse them one-by-one.

  1. First, note that we are overloading the steps() method to inform mrjob that our program consist of multiple steps:

  2. the first step uses the same mapper and reducer functions that we used for the simple word count program

  3. the second step uses mapper_post as mapper function and reducer_post as its reducer function.

    The above means that the output of reducer becomes the input to mapper_post.

  4. The mapper_post function is very simple: it converts key-value pairs as follows:

    (word, word_count) -> (None, (word, word_count))
    

    This is for good reason: without a step like that, we would not be able to find the word with maximum frequency since any function we write would process (word, word_count) pairs one at a time. By mapping everything to the key None, we ensure that reducer_post will process all (word, word_count) pairs at once, since the shuffle step of MapReduce will collect them all into a list corresponding to the "placeholder" key of None.

  5. Finally, reducer_post sorts the list of (word, word_count) pairs by word_count and outputs the word with the maximal word count at the end. The key parameter in the max() function is so that the list of tuples is sorted by the second element, which contains the count of each word.

Again, let us see how our program would work given the example input file below:

$ cat data.txt
hello ORIE students
welcome to ORIE

Step 1: the first MRStep would give us identical output to that produced by our simple word count program:

(hello, 1), (orie, 2), (students, 1), (welcome, 1), (to, 1)

Step 2: The mapper_post function processes the above series of key-value pairs and outputs the following:

(None, (hello, 1)), (None, (orie, 2)), (None, (students, 1)), (None, (welcome, 1)), (None, (to, 1))

Step 3: MapReduce groups the above outputs by key. Since all tuples have None as their key, the result is a single key-value pair like the one below:

(None, [(hello, 1), (orie, 2), (students, 1), (welcome, 1), (to, 1)])

Step 4: The reducer_post function processes the above pair, sorting the list of (word, word_count) pairs in descending order of word_count, and finally outputs

(orie, 2)

which indicates that orie is the word with maximal frequency 2.

Tip

In this example, since the mapper_post function does something trivial to the output of reducer, we can incorporate that change directly into the reducer function:

def reducer(self, word, counts):
    yield None, (word, sum(counts))

In that case, we can omit the mapper_post function entirely and rewrite the steps() method as follows:

def steps(self):
    return [
        MRStep(mapper=self.mapper, reducer=self.reducer),
        MRStep(reducer=self.reducer_post)
    ]

Other available methods

The mrjob library supports a few other methods that complement the mapper, combiner and reducer functions, such as:

  • {mapper,combiner,reducer}_init(): these can be used to e.g., initialize helper data structures;

  • {mapper,combiner,reducer}_final(): used for post-processing after the "main" function is run.

We now show an example of using mapper_init() and mapper_final() to slightly optimize the word count example. Recall that the simple word count program had a somewhat inefficient mapper function; it would output (word, 1) immediately for each word encountered, which means that we could have something like the following:

hi hello hi hello hi -> (hi, 1), (hello, 1), (hi, 1), (hello, 1), (hi, 1)

If possible, we would like to keep track of the partial sums of each word encountered in our compute node to output fewer key-value pairs, such as:

hi hello hi hello hi -> (hi, 3), (hello, 2)

Below, we use mapper_init() to initialize a dictionary holding word counts for each word encountered so far. The mapper function simply updates the key-value pairs in that dictionary, and in mapper_final(), which runs after all inputs have been processed, we output all the key-value pairs from that dictionary.

At first glance, it seems like this is all we need to do to count the number of words. However:

  • MapReduce runs in multiple instances / processes! Therefore, a separate dictionary will be created for each process;
  • Because of this, we still need a reducer step to combine the partial counts from each process.

The complete code follows below:

from collections import defaultdict
from mrjob.job import MRJob
import re

word_regex = re.compile(r"[\w]+")

class MRWordCount(MRJob):

    def mapper_init(self):
        self.words_dict = defaultdict(int)

    def mapper(self, _, line):
        for w in word_regex.findall(line):
            self.words_dict[word.lower()] += 1

    def mapper_final(self):
        for word, val in self.words_dict.items():
            yield word, val

    def reducer(self, word, counts):
        yield word, sum(counts)

Our mapper function in the above example only populates the dictionary and does not output any key-value pairs; the latter is taken care of in the mapper_final function, which emits all key-value pairs from the dictionary. Skipping mapper_final is not possible, as otherwise reducer would not receive any inputs.

The defaultdict class

We use defaultdict, which is a convenience wrapper around dictionaries providing a default value for keys that do not exist yet. For example:

defaultdict(int)    # keys have default value 0
defaultdict(list)   # keys have default value []

This allows us to e.g., increment keys as in self.words_dict[word.lower()] += 1 without having to check whether word.lower() previously existed as a key in the dictionary.

Example: Finding mutual friends

Suppose that you have a textual representation of a graph in the following form:

A -> B C E
B -> A C F
C -> A B
...

In each line, the first element is a node of the graph, with the elements after the arrow being the nodes it is connected to. We assume that this graph is undirected, meaning that if A is connected to B then B is also connected to A.

Now, suppose that this graph was representing a social network, where nodes are users and being connected means that you are "friends" or "followers". A common piece of functionality (implemented, e.g., in Facebook) is to report a list of mutual friends between a pair of already existing friends.

For example, in the following graph, (A, B) have C as a mutual friend:

A -> B C E
B -> A C D
C -> A B
D -> B E
E -> A D

Indeed, A is connected to B, C and E and B is connected to A, C, and D.

We can use MapReduce to solve the problem of finding mutual friends between each pair of friends in the original graph:

  1. Initially, for each pair of friends (U, V) described in each line of the input file, we output a key-value pair with (U, V) as its key and the list of friends of U as its value. For example:

    # mapper(A -> B C E)
    ((A, B), [B, C, E]), ((A, C), [B, C, E]), ((A, E), [B, C, E])
    

    The intuition behind this step is as follows. - since the graph is undirected,every pair (U, V) of friends will appear twice as a key - the first time, we will have ((U, V), [list of friends of U]) - the second time, we will have ((V, U), [list of friends of V])

    Note that if we could intersect the list of friends of U with the list of friends of V, this would give us precisely the list of their mutual friends. However, if we maintain the output format as-is, the shuffle step of MapReduce will treat (U, V) and (V, U) as different keys!

  2. Given the aforementioned problem, an important part in this step is to actually make sure that we output (U, V) both times, so that the shuffle step is MapReduce groups these two lists together as below:

    ((U, V), [[list of friends of U], [list of friends of V]])
    

    This can be accomplished by making sure that the keys in the output of the mapper function are sorted lexicographically. In our example input, we would obtain:

    # mapper(A -> B C E)
    ((A, B), [B, C, E]), ((A, C), [B, C, E]), ((A, E), [B, C, E])
    # mapper(B -> A C D)
    ((A, B), [A, C, D]), ((B, C), [A, C, D]), ((B, D), [A, C, D])
    ...
    

    After grouping the above by key in the shuffle step, one of the intermediate outputs would be

    ((A, B), [[B, C, E], [A, C, D]])
    

    where we can simply intersect the two lists to obtain the list of mutual friends of A and B.

  3. The final step, which is the reducer step, simply performs the list intersection.

The following program contains the complete implementation in Python:

from mrjob.job import MRJob

class MRFriends(MRJob):
    def mapper(self, _, line):
        # parse line: A -> B C D E
        f_from, to_str = line.split("->")
        f_from         = f_from.strip()         # remove surrounding whitespace
        friends_to     = to_str.strip().split() # split on whitespace
        # output: ((U, V), [F1, F2, ...])
        for f_to in friends_to:
            if f_from > f_to:                   # key sorted lexicographically
                f_to, f_from = f_from, f_to
            yield (f_from, f_to), friends_to

    def reducer(self, pair, friends_to):
        common = set.intersection(
            set(friends_to[0]), set(friends_to[1]))
        yield pair, list(common)

if __name__ == "__main__":
    MRFriends().run()

Note

Note that the above program only works for finding mutual friends of A, B when A and B are already friends in the existing graph.

To find mutual friends of (A, B) when A and B are allowed to not be connected, one way would be to output ((X, Y), [list of friends of X]) for each pair (X, Y), regardless of whether X, Y are connected. For this approach to work, one may need to know the nodes in the graph in advance, or build it up via multiple steps of MapReduce.