MapReduce – Shuffling and Sorting:
MAP Phase
The output produced by Map is not directly written to disk, it first writes it to its memory. It takes advantage of buffering writes in memory. Each map task has a circular buffer memory of about 100MB by default (the size can be tuned by changing the mapreduce.task.io.sort.mbproperty).
When the contents of the buffer reach a certain threshold size (mapreduce.map.sort.spill.percent, which has the default value 0.80, or 80%), a background thread will start to spill the contents to disk.
Map outputs will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time, the map will block until the spill is complete.
Spills are written in round-robin fashion to the directories specified by the mapreduce.cluster.local.dir property, in a job-specific sub-directory. Before the map output is written to disk, the background thread divided the data into partitions which we have studied earlier.
In that each partition, the thread performs sort in memory based on key. If we are using any Combiner function, then it is run on the output of sort.
Combiner function makes the output better, because there will be less data written to local disk before transfer to the reducer.
A new spill file is created every time when the buffer memory reaches the spill threshold, so at last we can have many spill files created. Before the completion of task all the spill files will be merged into a single partition and sorted output file. The configuration property mapreduce.task.io.sort.factor controls the maximum number of streams to merge at once; the default is 10.
If there are at least 3 spill files (set by the mapreduce.map.combine.minspills property), the combiner is run again before the output file is written.
We have learnt that the combiners can be run repeatedly over the input without effecting the final result. If we have only 1 or 2 spills then there is no need to run the Combiner function.
We can also compress the map output as it is written to disk, because it than saves disk space, reduces the data to be transferred to reducer.
By default, the output is not compressed, but it is easy to enable this by setting mapreduce.map.output.compress to true. The compression library to use is specified by mapreduce.map.output.compress.codec.
Reduce Phase
Now let look at reducer phase. Map output is given to the reducer. The map tasks may complete at different times, so the reduce task starts copying their outputs as soon as each map task completes. This is called as the copy phase of the reduce task.
The reduce task has a small number of copier threads so that it can fetch map outputs in parallel. The default is five threads, but this can be changed by setting the mapreduce.reduce.shuffle.parallel copies property.
How do reducers know which machines to fetch map output from?
As map tasks complete successfully, they notify their application master using the heartbeat mechanism. Therefore, for any given job, the AM knows the mapping between map outputs and hosts. A thread in the reducer
periodically asks the master for map output hosts until it has retrieved them all.
If the Map output is small they are copied into reduce task JVM’s memory (the buffer’s size is controlled by mapreduce.reduce.shuffle.input.buffer.percent, which specifies the proportion of the heap to use for this purpose); otherwise, they are copied to disk.
When the in-memory buffer reaches a threshold size (controlled by mapreduce.reduce.shuffle.merge.percent) or reaches a threshold number of map outputs mapreduce.reduce.merge.inmem.threshold), it is merged and spilled to disk. If a combiner is specified, it will be run during the merge to reduce the amount of data written to disk.
As the copies are stored on disk, a background thread merges them into larger, sorted files. This saves some time merging later. Note that any map outputs that were compressed (by the map task) must be decompressed in memory to perform a merge on them.
When all the map outputs have been copied, the reduce task moves into the sort phase (which should properly be called the merge phase, as the sorting was carried out on the map side), which merges the map outputs, maintaining their sort ordering. This is done in rounds.
For example, if there were 50 map outputs and the merge factor was 10 (the default, controlled by the mapreduce.task.io.sort.factor property, just like in the map’s merge), there would be five rounds. Each round would merge 10 files into 1, so at the end there would be 5 intermediate files.
Rather than have a final round that merges these five files into a single sorted file, the merge saves a trip to disk by directly feeding the reduce function in what is the last phase: the reduce phase.
This final merge can come from a mixture of in-memory and on-disk segments. During the reduce phase, the reduce function is invoked for each key in the sorted output. The output of this phase is written directly to the output HDFS filesystem.