Apache Hadoop MapReduce

Categories: BigData

back to overview

Updated: 2016-04-28

Introduction

This is a discussion of the Hadoop MapReduce component of the Apache Hadoop “big data” project (version 2.7).

MapReduce is a design-pattern for performing parallel processing of large sets of records. Hadoop MapReduce is an implementation of the MapReduce design pattern; it is a framework which coordinates HDFS and Yarn together to analyse a large file (sequence of records) by spawning N processes in a YARN cluster, each processing 1/Nth of the overall input. When the input file is in HDFS, then the processes are allocated near the data they read - ie code is brought to the data, rather than data being brought to the code; this can provide very large performance improvements for IO-bound processing. To avoid confusion between the generic MapReduce pattern and the Hadoop MapReduce framework, I will refer to the latter (implementation) as HMapReduce throughout this article.

Both the MapReduce pattern and the HMapReduce implementation are now considered somewhat dated/obsolete. Spark generally performs better when executing the same logic, and simultaneously provides a nicer API for programmers, and many other tools are considered even better for specific purposes. Nevertheless it is useful to understand MapReduce, as:

  • it is a good introduction to the concepts of distributed processing while being simpler to understand than many other tools (eg Spark);
  • there is still a lot of documentation and reference materials that refer to MapReduce;
  • there are still many tools that generate MapReduce code as their “back end”;
  • there is still a significant amount of MapReduce code out there in production environments.

The MapReduce Design Pattern

The MapReduce design pattern consists of a pair of functions which are applied to a large set of input records:

  • the map function is applied by the framework (in parallel, potentially across multiple nodes) to each record, selects just the ones it wants, and labels each with a category
  • the framework then collects all records for each category together onto a single node (but different categories can be processed on different nodes)
  • the reduce function is then applied by the framework to each category (potentially in parallel for different categories, but serially for all records within a category)

The primary contraints are:

  • that selecting and categorising a record doesn’t require information from other records, ie is efficiently distributable.
  • that the reduce stage may well need to take into account other records of the same “category” but can ignore records from different categories

The author of a MapReduce program just needs to write a suitable map function, and a suitable reduce function. The map function has a simple signature: it takes a single record as input, and returns zero or more output records each with an associated category. The reduce method has an equally simple signature: it takes a list of records (all belonging to the same category) and returns zero or more output records. All details related to distributed processing are then basically “configuration details” for the framework.

A more mathematical way of viewing MapReduce is that the map step converts a set of entities to a map where the keys are the different “categories” of data, and the value is the list of entities in that category. The reduce step is then applied to each key (in parallel), converting each list-of-entities into a (usually smaller) list of values of a different type.

The “categorisation” process can also be called partitioning of the input data.

Not every problem fits the MapReduce constraints, but many do - and can therefore be efficiently executed on a cluster of nodes. In particular, with careful planning the code can be “brought to the data” rather than reverse, to reduce memory traffic and thus mitigate one of the major bottlenecks in processing large amounts of data.

The wikipedia article linked to above covers the algorithm and its implications in much more detail.

Note that functions named map(…) and reduce(…) have been available in various programming languages for a long time (starting, as far as I can tell, in Lisp), particularly in those describing themselves as “functional” languages. The MapReduce design pattern is somewhat related; the inventors of the pattern clearly named their pattern after these operations. Nevertheless the analogy is strained; the functional map and reduce operations do not perform categorization/partitioning of input, nor do they perform sort/shuffle operations. It is best to ignore this association when learning the MapReduce design pattern and HMapReduce library.

The Hadoop Implementation of MapReduce

HMapReduce provides a Java API and runtime for writing tasks to process data in MapReduce style. It also provides code to schedule these tasks for execution concurrently on multiple servers via a YARN cluster. Exactly which hosts in the cluster execute the job is determined by the HMapReduce framework.

In HMapReduce terminology, a running HMapReduce program is called a job, and the child processes it starts (via YARN) for mappers and reducers are called tasks.

Within each mapper or reducer task, the HMapReduce framework obtains the input data and passes it to the user-provided map-function or reduce-function. This isolates user code from the details of how the data is stored. The most simple source of data for map tasks is files stored on a Hadoop Distributed File System (HDFS). In this case, HMapReduce will first consult HDFS to find the physical location of the various datablocks making up the file to be processed, and then provide hints to Yarn in order to have each task run near to or directly on the node that holds most/all of the data that it reads.

Many “big data” databases provide native support for MapReduce jobs; when such tools are requested to execute a MapReduce task they pass to HMapReduce information about the file(s) which make up the tables that the HMapReduce job will process.

A MapReduce task processes its input one record at a time, but HDFS has no concept of records; it breaks files into fixed-size chunks and distributes them across data-nodes. Each of the mapper tasks started by HMapReduce is given as input parameters a byterange of the overall file that it should process; it starts at the first record which starts after the start of the byte-range, ie it might have to skip over some initial bytes until it detects “start of record”. It also completely processes any record that starts within its byterange, even if that record’s data continues beyond the assigned byterange. When the input file is in HDFS, HMapReduce tries to start each process on a node which will minimise the amount of network traffic needed to read the assigned byterange. Ideally, a single HDFS node holds a copy of the entire byterange, and the logic can also be run on that node, thus avoiding network traffic completely; however if this isn’t possible then the process started by HMapReduce still runs; the code is simply issuing an initial seek followed by consecutive reads to the filesystem and such reads can be satisfied by HDFS regardless of where the data is; it is just more efficient (less network IO) when the reads are for file offsets that happen to be stored locally.

See:

Non-Hadoop Implementations of MapReduce

The Apache Hadoop MapReduce implementation is (as far as I know) the only one which is “standalone” in the sense that a programmer compiles code using the framework into an executable application. However there are many databases and similar tools which “support” MapReduce by allowing users to write map and reduce functions and then pass them to the tool for execution. The map function is then executed (potentially multiple instances in parallel) to process each relevant record, where the executing tool is responsible for providing that stream of input data.

Actually, HMapReduce could be considered in the same category as the other tools supporting mapreduce; it is just that the source of data which the framework passes to the mapper is a file in an HDFS filesystem rather than records from a database or similar.

Failure Handling

A single HMapReduce job may involve executing hundreds of individual processes (mappers and reducers) across hundreds of different physical servers. It is therefore possible that some of these processes fail to execute correctly (eg server crashes, disk fails, network problems cause timeouts). An HMapReduce job tracks all the tasks needed to complete the overall work, and when one does not complete correctly then it simply restarts that failed task. This allows efficient execution of complex jobs even in a not-entirely-reliable environment.

Any tool which uses HMapReduce as its “back end” for executing logic naturally inherits this robustness.

Other MapReduce implementations can implement the same kind of retry logic if desired; due to the mapreduce design pattern the execution of the MapReduce code itself is done in an environment handled by the enclosing framework.

HMapReduce as a Job Executor

The HMapReduce library is used as a helper by many higher-level tools - ie some “database-like” tools (such as Hive or Pig) execute queries by dynamically generating HMapReduce programs and executing them on a YARN cluster.

However HMapReduce wasn’t really designed for this purpose, resulting in some inefficiencies when used in this way. The Tez project was created explicitly to allow such tools to execute tasks within a YARN cluster without forcing their logic into the MapReduce design pattern. Migrating such tools to Tez has been in progress for many years, and most tools now support HMapReduce or Tez as backend options. During this period, Spark was invented and some tools offer Spark as a back-end option too. Big-data technologies are in a period of rapid change!

One of the major problems with HMapReduce is that the API has been specifically designed around just the following cycle:

  • read records (usually from HDFS file, but other options can be supported)
  • start multiple processes, each of which apples the map-function to a subset of the input, writes to local filesystem, and exits
  • start multiple reduce-processs, each of which fetches relevant input from all mappers, writes to HDFS, and exits

Unfortunately, many interesting algorithms require multiple map or reduce phases, and HMapReduce just isn’t designed for that. The solution is therefore to submit multiple HMapReduce jobs, each of which takes the HDFS output of the previous pass as its input. It works, but is complex for the higher-level tool to manage and has performance issues due to unnecessary disk-io. I intend to write an article on Tez which explains these issues in more depth, and how Tez resolves them; check back later!

The HMapReduce Application Master Process

When processing a large number of input records, the map-function should be invoked from multiple processes on multiple servers to increase throughput. The reducer-function should also (where possible) be invoked in parallel. Something needs to request containers from YARN to run these processes in, start the relevant processes, restart any which fail, ensure reducers are started only after all mappers have completed, etc.

This synchronization is done via a single process called an Application Master (abbreviated as AM). An HMapReduce Application Master program is built by writing a map-function, a reduce-function, and a “main method” which calls into the HMapReduce API, and compiling this all to a single executable jar-file. To start this process, it must be submitted to a YARN resource manager. YARN will find an available server and start a single instance of this application master (AM) in a YARN “container”. The AM first connects to HDFS and queries information about the input-file to be processed - in particular, how big it is and which servers actually host different parts of the file. The AM then connects to YARN and requests more resources - empty containers with specific amounts of RAM and CPU - and on specific hosts. It then passes the map and reduce code to YARN for execution within those “containers”; the processes connect back to their AM report their status as they run.

Because the AM monitors the status of processes it commands YARN to start, it can restart processes if they fail, and knows when it can start tasks which require input from earlier phases.

The decision on how many reducers to use is made by the original developer of the AM. The decision on how many mappers to use is done by the developer specifying the “split size” to use, ie how many bytes of the input file each mapper should handle; the actual number of mappers started by the AM process then depends upon the size of the input file. Normally the “split size” is equal to the size of an HDFS block in order to maximise the benefits of bringing the code to the data (ie minimise network traffic).

HMapReduce in Non-Java Languages

It is possible to write map and reduce functions in non-java languages, eg Python - or even C. In this case, the external code to execute is included as a “resource” of the Application Master (AM) program. After the AM is started, it starts the external code as a separate process (eg starts a Python interpreter and passes it the map-function python script). Each record to be processed is then written to the STDIN of the external process, and the external process writes output records to its STDOUT.

Mappers in More Detail

The mapreduce framework must provide its map-function with a sequence of (key, value) pairs. Often the key is irrelevant for the mapper, in which case:

  • any “field” of the input records will do, or alternatively
  • the “key” can be simply set to the byte-offset of the input record within the input file.

The “value” is often a list of properties.

As an example, a suitable input is a textfile containing a series of linefeed-terminated lines with each line containing a comma-separated sequence of values. The “key” may be the first field on the line or (as described above) the line’s offset within the file. The “value” passed to the map-function is then the rest of the line ie the sequence of comma-separated values.

Another suitable input is a Hive SequenceFile, ie a file containing a sequence of serialized java objects. Other more complex formats (eg Hive ORCFile) also work as Mapper inputs.

The MapReduce implementation invokes the map-function once for each input record, passing the key and value. The map function then returns zero or more (key,value) pairs.

One of the simplest things a map-function can do is filter its input, ie look at each record to determine if it is wanted; if so return the record else return nothing. This is similar to applying an SQL where-clause. If the entire set of fields for each wanted input record is not needed, then the map-function might choose to return a new object with a subset of the input object’s fields (to reduce later disk io and network traffic).

Of course a mapper can choose to perform more complex processing on its input records, eg break each one up into multiple output records.

Grouping

The previous paragraph states that a mapper may “return an output record”; actually it must return a (key, value) pair. The value may be anything - it is passed through to the reducer step unaltered. The key is used for sorting and grouping as described next.

Output of a mapping phase (ie multiple map processes being applied in parallel to the same input) is passed to a set of N reducer processes. Each reducer is given a subset of the records generated by the mappers, grouped by the “key” that the mappers assign to the records. In other words, all records with keys A,B,C are all sent to one reducer, all records with D,E,F to another, etc. A reducer might have to handle several (or many) dfferent key values, but knows that if it sees one record with a specific key then it (and it alone) will see all the records with that key. It is sometimes useful to think of the key as a category or groupid which the map-function assigns to each record it outputs. Assigning a category or groupid is also called partitioning the data.

Giving two records different keys/categories indicates that they can be treated independently during the reduce phase, ie the reduce logic applied to one of the records does not need to care about the existence of the other record. If this is not the case, then the map-function must give both records the same key/category. There is one exception - when a single reducer is being used anyway; see later.

In an ideal world, each category would be handled by a different reducer instance. In practice, if the number of records in each category is fairly small then it may be more efficient for each reducer process to handle multiple categories - in fact this is fairly common. Nevertheless, a reducer-function should treat the set of records in one category as independent from records in any other category. When the number of reducer processes is less than the number of categories generated during the mapping phase then HMapReduce tries to give each reducer process a fair share of the keys. However if the mappers generate a million records with 99% of them having the same key then all those records must by definition go to the same reducer - ie the reduce step is basically single-threaded and HMapReduce cannot do anything to help. Algorithms which generate lots of keys (assign records to lots of independent categories) are more parallelizable.

The HMapReduce framework code that invokes the map-function (ie provides its inputs) also handles the map-function outputs. It knows how many reducers it must pass its output to, and keeps an in-memory sorted collection for each target reducer. Each record returned by the map-function is stored in the collection for the relevant reducer, sorted by the output record’s key (ie when a reducer process handles multiple categories, then the records are grouped by category). If an in-memory collection gets too large it is written to disk and a new in-memory one created. The result is therefore that for each reducer a set of output files are generated in the local filesystem. The contents of each file is individually internally sorted by key (thus grouped by category).

After doing its work, the mapper task (ie process) terminates, leaving these generated files on the local filesystem. As each reducer process is started (later, by the application master) it contacts the YARN hosts on which mapper tasks ran, and fetches the output files intended for it. Each reducer task therefore ends up with a set of files coming from various mappers which all contain the keys assigned to that reducer. Each file is internally sorted by key. This collecting of files from various mapper hosts by each reducer task is sometimes called “the shuffle”.

Each reducer task then needs to merge the (potentially many) files from multiple mappers into a single grouped/sorted sequence it can feed to the reducer-function. Given a set of individually-sorted files, a merge sort can be executed to very efficiently produce a single fully-sorted set of records. All input files are opened at once, and the first record of each is inspected. The record with the smallest key from any of the files is the first record to process. Simply repeat. If there are too many files to open concurrently, then batches of files are merge-sorted together and then those outputs are merge-sorted together. This is an exponential process, ie very few passes are needed to merge even enormous numbers of input files into a single input stream sorted by key/category.

Sorting

Above we have addressed the case where the key is a kind of category or enum, with each category having potentially many records.

However it is sometimes also useful to use MapReduce to sort inputs, eg sort the lines in a file.

One solution is to simply force use of a single reducer, and then use the value to sort by as the key. The map phase will sort the records in memory - though it will periodically flush the sorted data to disk, ie generate multiple independently-sorted files. We have already discussed how the reducer can then use merge-sort on these files to generate a completely merged output.

Another solution is to use a compound (category, sortby) pair as the key of each record, and then specify a custom “partitioner” function that uses only the category part to determine which reducer to send the data to. The mapper framework will then generate files in which records are grouped by category but sorted by (category, sortby). Each reducer can then generate a separate output-file for each category, where the records within each category-file are sorted by the desired criteria - both grouping and sorting together.

Reducers in More Detail

Following the collection of fragmentary result-files from the various hosts on which map-functions were executed, and merging them together into a single ordered stream, a HMapReduce reducer task is ready to invoke the reducer-function once for each category of records in its input.

The reducer function receives a (category, iterator) pair as input, and uses the iterator to fetch each record labelled with that category. It outputs zero or more (key, value) pairs. The reducer task then passes this record to its configured “writer” component - the most widely-used one just writes the records to HDFS. The HDFS filename is specified when the job is first submitted. However as there may be multiple reducer-tasks running concurrently, and having multiple processes appending simultaneously to the same HDFS file is not a good idea, each reducer instead writes to a different file whose name is defined by appending a reducer-id to the original filename. Given N reducers, there are therefore usually N output files in HDFS each containing the results for one or more categories of records.

An HMapReduce job can be configured to write output in various forms, eg the ORCFile writer may be used so that each reducer writes its records to an HDFS file in ORCFile (columnar database) format.

Combiners

Sometimes a map-function produces “redundant” data, ie multiple records for a specific category where the next reducer-function only requires one. In this case, passing all the data to the reducer would work (the reducer can just pick out the one it needs), but it would be more efficient to trim the records down somewhat during the mapping phase. The HMapReduce framework allows a Combiner function to be specified to do this work. A combiner cannot change the map-function’s output format - it should be transparent to the reducer whether the combiner was run or not.

Example 1: the goal is to see whether, for each value of column X, there is a record where column Y is null. There may be multiple records where X=1 and Y is null, in which case the map-function will output a “matched” record for each one. However the reduce-function doesn’t care how many records match for a specific X, just whether there is at least one. So for each X the combiner can throw away all but one record, and save network traffic and time.

Example 2: given records with some category C and a numerical value X, we want to find the minimum X for each category. A mapper cannot perform a complete “min” test, as it doesn’t know what records are being processed in other mapper instances. However the combiner can look at arbitrary batches of records and output only the record which has the min X within that batch. Exactly how many records get sent to the reducer then depends on the number of “batches”, which is decided by the framework not the app - but it cannot harm.

A combiner is also called a “mini-reducer”, as it has a signature somewhat like a reducer-function. However it only sees subsets of the overall dataset at a time, unlike the “real” reducer which is guaranteed to see the whole result-set for a specific category.

Combiners are only useful for some kinds of functions; in others the whole dataset for each category really does need to be passed from the mapper task to the corresponding reducer task.

Limitations of MapReduce

MapReduce cannot be used to process a continuous stream of “real time” input; the API for a reducer program promises to provide it with all relevant input for a group. This is something that cannot be done for a “continuous real time” feed of data. In short, the whole model is just not appropriate for such usage.

As mentioned earlier, many algorithms require chaining map/reduce functions together. And as described in Grouping and Sorting, output from a Map process is a series of files. The disk IO that occurs between each step in a chain of map/reduce steps is a significant performance-limiting factor when implementing such complex algorithms via MapReduce. Representing such algorithms as a sequence of map/reduce steps is also often difficult.

Newer more abstract APIs such as Spark’s RDDs don’t make the same assumptions as MapReduce and therefore can avoid some disk-io. However the fact that the APIs are more abstract means that it is much harder to understand what is actually going on within such a program at runtime. MapReduce is much easier to understand and therefore predict.

HMapReduce and Functional Map/Reduce

In “functional” programming, there are two commonly-used functions called map and reduce. The map function takes a list and a transformation-function as parameters, and produces a new list containing the result of applying the transformation-function to each element in the original list, ie a list (a,b,c,..) becomes (f(a), f(b), f(c), ..). The reduce function takes a list and returns a single value, eg sum/min/max.

The map and reduce phases of Google’s original concept, and of Hadoop’s reimplementation, are not identical to the above definitions, but fairly closely related. In HMapReduce, the map phase does apply a function to each element of a list; the result may be to ignore the record, or return (category, modified-record). Common modifications include leaving out fields of the original record that are not relevant for the query being performed. The map phase primarily performs categorization which is not part of the functional map function, but filtering and transforming records certainly is. The reduce phase may indeed act like a functional-reduce, eg when performing a “count(*)” query then a list of results is converted to an integer, or finding min/max within a category. However the reduce phase may also produce a list of records rather than a single value, which is not what a traditional reduce function does.

HMapReduce Daemon Processes

HMapReduce relies on two long-running services.

One is the MapReduce HistoryServer, whose primary responsibility is to delete temporary files in the case a job terminates uncleanly. HMapReduce can actually run without this daemon process, but garbage gets left behind.

The other is the “shuffle handler” which runs on every YARN worker node in order to allow reducer tasks to fetch output generated by their preceding mapper tasks (files on the local filesystem) after those mapper tasks have ended. This isn’t actually a standalone daemon; instead the YARN nodemanager daemon (running on each YARN worker node) has a configuration item for “aux-services” which is a list of Java classes. These classes are then run as part of the YARN nodemanager daemon; the primary usecase for this feature is to run a shuffle-handler class that listens for HTTP requests coming in from reducers.

A MapReduce Example

Filtering and Grouping a Pack of Cards

In this section I’ll walk through a simple MapReduce example. And no, not the “word count” example that can be found a thousand times on various sites - although the principles are the same.

This example shows how to filter and sort a pack of playing cards with MapReduce. Feel free to actually play along if you have a pack of cards handy. I’m talking about the standard poker pack - Ace, 2..10, Jack, Queen, King in four suits (hearts, diamonds, clubs, spades). The goal is to take a shuffled pack as input, discard all cards except the numerical ones (2..10) and then to group them by suit, so all the hearts cards are together, etc. Later we’ll adapt the algorithm to also sort the cards.

First shuffle the pack of cards.

Next we need to “load the data into a single file in HDFS”. We’ll assume that a single HDFS block holds just 15 cards, so deal the first 15 cards into a pile (one HDFS block) and then the next 15 cards into a new pile (the next HDFS block) etc. In this example, we’ll assume HDFS replication for the file is set to 1, ie the individual piles don’t get replicated to other nodes in the HDFS cluster.

Now we are ready to start our Application Master process. Imagine submitting a request to the YARN resource-manager which includes a jarfile and the following configuration parameters:

  • input-file = name of HDFS file containing our card data
  • split-size = 15 (the “block size” of our HDFS input file, for optimum efficiency)
  • num-reducers = 2

We will later see how the mapper generates output records with four categories (the four suits) and thus the maximum number of reducers is four. However as there are few records in each category, it makes sense to use even fewer reducers. Actually, one reducer would normally be appropriate here but is not so interesting for our example.

Imagine the YARN resource-manager doing the following:

  • placing the request to execute this ApplicationMaster on a work-queue;
  • allocating resources on one of the YARN worker nodes for it;
  • writing the Application Manager jarfile to a temporary directory in HDFS;
  • sending the request to the YARN worker node (via the nodemanager daemon).

Imagine the YARN node-manager then doing the following:

  • setting up a linux cgroup for the task (optional);
  • creating a working directory for this task;
  • fetching the ApplicationMaster jarfile from HDFS into the working directory;
  • executing the java jvm (pre-installed on the local system) with the ApplicationMaster jarfile and various other jarfiles on the classpath, and various useful information in environment variables.

The Java process running the Application Master code then:

  • connects to the YARN resource-manager (using info in its environment-variables) and starts sending “heartbeat” messages;
  • queries HDFS for information about the input file specified in the job configuration - in particular, how many blocks it consists of (4 in this example) and which hosts the blocks are stored on;
  • computes the number of map-tasks needed by dividing the file-size by the split-size. As we have specified split-size = block-size (which is usual), the answer for this example is 4;
  • sends a request to the YARN resource-manager asking for a container on one of the hosts holding split#1 of the input file; the resource-manager eventually responds with a “ticket” for a specific yarn nodemanager, ie a specific host.
  • sends a request to the above nodemanager, containing the Application Manager jarfile (again), the name of the file to read (again), and a byte-range to process;
  • repeats the two above steps for each of the mappers;
  • waits for the mappers to report success (if not, it allocates a new container and tries again up to a configurable limit before giving up)
  • performs similar processing for the two reducer tasks;
  • and finally notifies the resource-manager that it has successfully completed then exits.

Within each of the mapper containers, the started application opens the specified input file in HDFS, and asks the configured “reader class” to find the start of the first record following the start of the byte-range allocated to this mapper task. In the case of an input-file in simple comma-separated-values form that simply means looking for the next linefeed. The application then repeatedly uses the reader to read the next line and pass it to the map-function. All this code is embedded in the single application jarfile - like an extra “mode”.

Now it’s time to play with the cards again. To emulate the behaviour of a “mapper task”, choose a pile and take the first card from it. This is the first input to the map-function.

The initial goal of our program is filtering: discarding all cards except those with value 2..10.

If the card is not a desired one, then this is easy: just discard it and “return” from the map-function, at which point the mapper task will “read the next record” (pick the next card from the pack).

If the card is a desired one, then the map-function must “output” a (key, value) pair. As explained earlier, the “key” is actually a category which is used to “partition the data” - ie to select which reducer instance to send the data to. As we want to group cards by suit, we’ll output the suit of the card as the “key”. The card itself is the value. However we need to write the output to a file-per-reducer, and ensure that within this file cards are still grouped by their suit (so the reducer can do a merge-sort). So the cards need to be buffered in memory for a while before being output. To emulate this, you’ll need some nimble fingers! In our example, reducer#1 will be responsible for hearts and spades, reducer#2 for diamonds and clubs. As each card is processed (picked from the pile), build up the cards in your hands in two groups: hearts-and-spades and diamonds-and-clubs. And as you add a card to the existing ones, insert it next to the existing the cards of the same suit. When you reach 4 cards in any group (our “buffer size”) then place these on the table - ie “write a file to the local filesystem”. After the input pile is empty, put down the remaining groups in your hand as extra “files”. The result should be several files for each reducer, with each file internally grouped by suit.

This is of course applied to each pile - in parallel!

What we have so far achieved is filtering, categorization (partitioning) and a partial grouping of the input. Not a bad start!

Now that all mapper tasks have finished, it is time for the Application Master to start the reducers (as described above).

The first reducer is responsible for hearts-and-spaces, so (role-playing as reducer task #1) fetch the relevant “files” from each mapper, ie the piles which contain heards-and-spades, by just moving them in front of you. As described earlier, this is actually done by making an HTTP request to the “ShuffleHandler auxilliary service” running on each YARN nodemanager node which ran a mapper for this job. Now perform a merge-sort of all the small piles (files) into a single one. Each pile is already sorted, so it is trivial to end up with a single pile with all the hearts together and all the spades together. Now the reducer-function gets invoked; as specified in the signature of the function, it is passed a key (the suit of the top card on the merged pile) and an iterator - an object which will read a card from the top of the pile when called. The iterator returns EOF if the key (suit) changes.

In our example, the reducer doesn’t do much - it just writes each records to its output. We’re using the reducer here just to trigger the elegant group-and-shuffle logic of HMapReduce. However if we wished, we could do things like find the highest card for any suit; not particularly useful if the entire pack is being processed but more interesting if we had removed a handful of cards at the start.

The result is two files in HDFS - one for each reducer - whose contents are cards in range 2..10 grouped by suit. To write this as an SQL statement would look like:

select * from pack where face_value in ("2".."10") group by suit

If we had specified 4 reducers, there would have been 4 files. If we had specified just one reducer, there would have been one file with everything nicely merged together - though that wouldn’t scale for larger datasets!

Sorting Data

What if we wish to have the cards in our output fully sorted rather than just grouped by suit? In this case, the “key” emitted by the map-function should have two parts: the suit and the value. A custom partitioner-class must then be provided to extract the category (the suit) from the key, so that all cards with the same suit go to the same reducer. However HMapReduce itself will ensure that the sorting done in the mapper (gathering cards in sorted batches before writing them as files) applies to the whole key, meaning that the mapper-generated files are fully sorted. And the merge-sort done by the reducer will also preserve the full ordering of the key.

In the example case, there was a very clear and obvious criteria on which to partition/group the data: the suit. However what if we need to sort a large set of input records with no obvious categories, eg a big sequence of words? The general approach is the same: split the values into N groups where N is some value suitable for the size of the input dataset and the number of computers in the cluster. Choosing N=100 and allocating words to partitions via hash(word)%100 would work, but might not guarantee that equal numbers of records were passed to each reducer. Choosing N=26 and partitioning by the first letter of the word doesn’t work well either; words starting with Q are far less common than those starting with E. One option is first to sample the input, choosing a few hundred or thousand random items from the input, and then picking ranges of values for the partitioning which would spread that sample roughly evenly between categories. Hopefully the full dataset would then also be spread roughly evenly across the categories/partitions. The resulting criteria for partitioning into 12 reducers might be something like A-B, B-CG, CH-D, E, F, G, H, I, J-L, M-P, Q-V, W-Z.

Notes on Data Partitioning

The concept of partitioning is common when dealing with big data. Sometimes data is sharded across nodes for storage, in other cases it is sharded across nodes for processing. And when sharding, data is grouped so specific subsets of data are allocated to the same node. In particular, the Kafka message broker uses partitioning to distribute data across multiple nodes while ensuring subsets of data for which ordering must be preserved are allocated to the same broker node.

The “map” phase of HMapReduce and the following “shuffle” is doing exactly this: partitioning the data, in exactly the way that the Kafka client library partitions data. A data partition is then processed in a single HMapReduce reduce node using a single thread, in the same way that a Kafka topic partition is processed in a single thread. The result is that a dataset consisting of many logical partitions can be processed single-threaded where necessary and parallel where possible.

Further Reading

This article describes how MapReduce relates to other “big data” tools.

References