S3 and Parallel Processing – DirectFileOutputCommitter

The problem:

While a Hadoop Job is writing output, it will write to a temporary directory:
Task1 –> /unique/temp/directory/task1/file.tmp
Task2 –> /unique/temp/directory/task2/file.tmp

When the tasks finish the execution, will move (commit) the temporary file to a final location.

This schema makes possible the support speculative execution feature on Hadoop.

Moving the task output to its final destination (commit), involves a Rename operation. This rename operation, on a normal filesystem is just a change of pointer in the FS metadata.

Now, as S3 is not a filesystem, rename operations are more costly: it will involve a copy (Put) + Delete operation.

The solution:

In Mapreduce (this behavior can be different for other applications), to avoid these expensive operations, we can change the mapred-site.xml file, “mapred.output.committer.class” property to “org.apache.hadoop.mapred.DirectFileOutputCommitter”, so the the task output directly to it’s final destination.


For this and other useful parallel processing S3 considerations, please have a look here:


Copy Data with Hive and Spark / Copiar Datos con Hive y Spark

These are two examples of how to copy data from one S3 location to other S3 location. Same operation can be done from S3 to HDFS and vice-versa.

I’m considering that you are able to launch the Hive client or spark-shell client.


Using Mapreduce engine or Tez engine:

set hive.execution.engine=mr; 


set hive.execution.engine=tez; 
CREATE EXTERNAL TABLE source_table(a_col string, b_col string, c_col string)
LOCATION 's3://mybucket/hive/csv/';

CREATE TABLE destination_table(a_col string, b_col string, c_col string) LOCATION 's3://mybucket/output-hive/csv_1/';

INSERT OVERWRITE TABLE destination_table SELECT * FROM source_table;




If you want to copy data to HDFS, you can also explore s3-dist-cp:


s3-dist-cp --src s3://mybucket/hive/csv/ --dest=hdfs:///output-hive/csv_10/


HBase and Zookeeper debugging

I came across some scenarios where an application (i.e. Mapreduce) communicating to HBase through YARN could silently fail with a timeout like the following:

2017-01-30 19:42:03,657 DEBUG [main] org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation: locateRegionInMeta parentTable=hbase:meta, metaLocation=, attempt=9 of 35 failed; retrying after sleep of 10095 because: Failed after attempts=36, exceptions:
Mon Jan 30 19:42:03 UTC 2017, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68463: row 'test2,#cmrNo acctNo,99999999999999' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=ip-172-31-3-246.us-west-2.compute.internal,16000,1485539268192, seqNum=0

The root cause for this behavior here wasn’t related to any missconfiguration at server/networking side, but a library missing in the class path.

When there is a zookeeper issue, depending on the retry parameters the exceptions are not visible.

On this case, In the Mapreduce Java application I’ve added/modified the following parameters that lead into more visibility in the communication layer between Zookeeper and HBase:

conf.set("hbase.client.retries.number", Integer.toString(1));
conf.set("zookeeper.session.timeout", Integer.toString(60000));
conf.set("zookeeper.recovery.retry", Integer.toString(1));

After this, the following exception was visible:

Exception: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
 at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:157)
 at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:65)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: com.google.protobuf.ServiceException: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge


Playing around these parameters will cause the application exit quickly when there is a problem with the cluster. This can be desirable in a production environment.

Reducing the parameters to a more conservative value could yield better recovery times.  Setting zookeeper.recovery.retry to 0 will still result in up to two connection attempts made to all zk servers in the quorum and cause and application failure to happen in under a minute should there be a loss of zookeeper connectivity during execution.


As an additional note, if you are receiving timeouts because the application is trying to contact localhost instead of the quorum server, you can set the explicit parameters:

// HBase through MR on Yarn is trying to connect to localhost instead of quorum.


I’ve added a couple of examples of Mapreduce applications for HBase here: https://github.com/hvivani/bigdata/tree/master/hbase


Some additional notes on this behavior: https://discuss.pivotal.io/hc/en-us/articles/200933006-Hbase-application-hangs-indefinitely-connecting-to-zookeeper


FileInputFormat vs. CombineFileInputFormat

When you put a file into HDFS, it is converted to blocks of 128 MB. (Default value for HDFS on EMR) Consider a file big enough to consume 10 blocks. When you read that file from HDFS as an input for a MapReduce job, the same blocks are usually mapped, one by one, to splits.In this case, the file is divided into 10 splits (which implies means 10 map tasks) for processing. By default, the block size and the split size are equal, but the sizes are dependent on the configuration settings for the InputSplit class.

From a Java programming perspective, the class that holds the responsibility of this conversion is called an InputFormat, which is the main entry point into reading data from HDFS. From the blocks of the files, it creates a list of InputSplits. For each split, one mapper is created. Then each InputSplit is divided into records by using the RecordReader class. Each record represents a key-value pair.

FileInputFormat vs. CombineFileInputFormat

Before a MapReduce job is run, you can specify the InputFormat class to be used. The implementation of FileInputFormat requires you to create an instance of the RecordReader, and as mentioned previously, the RecordReader creates the key-value pairs for the mappers.

FileInputFormat is an abstract class that is the basis for a majority of the implementations of InputFormat. It contains the location of the input files and an implementation of how splits must be produced from these files. How the splits are converted into key-value pairs is defined in the subclasses. Some example of its subclasses are TextInputFormat, KeyValueTextInputFormat, and CombineFileInputFormat.

Hadoop works more efficiently with large files (files that occupy more than 1 block). FileInputFormat converts each large file into splits, and each split is created in a way that contains part of a single file. As mentioned, one mapper is generated for each split.


However, when the input files are smaller than the default block size, many splits (and therefore, many mappers) are created. This arrangement makes the job inefficient. This Figure shows how too many mappers are created when FileInputFormat is used for many small files.


To avoid this situation, CombineFileInputFormat is introduced. This InputFormat works well with small files, because it packs many of them into one split so there are fewer mappers, and each mapper has more data to process. This Figure shows how CombineFileInputFormat treats the small files so that fewer mappers are created.


Hadoop: Output Commiter Notes

OutputCommitter describes the commit of task output for a MapReduce job.

The MapReduce framework relies on the OutputCommitter of the job to:

  • Set up the job during initialization; for example, create the temporary output directory for the job. Job setup is done by a separate task when the job is in PREP state and after initializing tasks. Once the setup task completes, the job is moved to the RUNNING state.
  • Clean up the job after job completion; for example, remove the temporary output directory. Job cleanup is done by a separate task at the end of the job. The job is declared SUCCEEDED, FAILED, or KILLED after the cleanup task completes.
  • Set up the task temporary output. Task setup is done as part of the same task, during task initialization.
  • Check whether a task needs a commit. This prevents unnecessary commit procedures.
  • Commit the task output. After the task is done, the task commits its output, if required.
  • Discard the task commit. If the task is failed or killed, the output is cleaned up. If the task could not clean up (in an exception block), a separate task is launched with the same attempt ID to do the cleanup.

FileOutputCommitter is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce containers, whichever is available on the NodeManager. The JobCleanup task, TaskCleanup tasks, and JobSetup task have the highest priority, in that order.

Task Side-Effect Files

In some applications, component tasks need to create or write to side files, which differ from the actual job output files.

In such cases, two instances of the same Mapper or Reducer could be running simultaneously (for example, speculative tasks), trying to open or write to the same file on the file system. You must pick unique names per task-attempt (using the attemptid, say attempt_200709221812_0001_m_000000_0), not just per task.

To avoid these issues, the MapReduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} subdirectory accessible via ${mapreduce.task.output.dir} for each task attempt on the FileSystem where the output of the task attempt is stored.

On successful completion of the task attempt, the files in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) are promoted to ${mapreduce.output.fileoutputformat.outputdir}/. The framework discards the subdirectory of unsuccessful task attempts. This process is completely transparent to the application.

You can use this feature by creating any required side files during execution of a task in ${mapreduce.task.output.dir} via FileOutputFormat.getWorkOutputPath(). The framework promotes them similarly for successful task attempts. This eliminates the need to pick unique paths per task attempt.

Note: The value of ${mapreduce.task.output.dir} during execution of a particular task attempt is actually ${mapreduce.output.filetoutputformat.outputdir}/temporary/${taskid}; this value is set by the MapReduce framework. To use this feature, create files in the path returned by FileOutputFormat.getWorkOutputPath() from the MapReduce task.

The entire discussion holds true for maps of jobs with reducer=NONE (that is, 0 reduces) because output of the map, in that case, goes directly to HDFS.

References: http://www.cloudera.com/content/www/en-us/documentation/other/tutorial/CDH5/Hadoop-Tutorial/ht_job_output.html

YARN / Map Reduce memory settings

On Hadoop 1, we used to use mapred.child.java.opts to set the Java Heap size for the task tracker child processes.

With YARN, that parameter has been deprecated in favor of:

  • mapreduce.map.java.opts – These parameter is passed to the JVM for mappers.
  • mapreduce.reduce.java.opts – These parameter is passed to the JVM for reducers.

The key thing to understand is that if both mapred.child.java.opts and mapreduce.{map|reduce}.java.opts are specified, the settings in mapred.child.java.opts will be ignored.

The way to set values to these variables is, as example:

mapreduce.map.java.opts = -Xmx1280m
mapreduce.reduce.java.opts= -Xmx2304m

A common error that we will see on our application if it try to run beyond those limits is, as example:

.YarnChild.main(YarnChild.java:162)\nCaused by: java.lang.OutOfMemoryError: Java heap space\

Now, assuming an error like mentioned is at reduce phase, we can increase the value for mapreduce.reduce.java.opts to:

mapreduce.reduce.java.opts = -Xmx3584m

But, we will face another error if we do not increase mapreduce.reduce.memory.mb variable accordingly:

mapreduce.reduce.memory.mb = 4096

A typical error message when Container memory available is lower than maximum Java Heap size is:

containerID=container_1420691746319_0001_01_000151] is running beyond physical memory limits. Current usage: 2.6 GB of 2.5 GB physical memory used; 4.8 GB of 12.5 GB virtual memory used. Killing container.

The general rule of thumb to use is that -Xmx (mapreduce.reduce.java.opts) should be 80% of the size of the equivalent mapreduce.reduce.memory.mb value to account for stack and PermGen space.

As a Summary:

Mapper and Reducer Settings:

  • mapreduce.map.memory.mb – This defines the overall size of the JVM that is used by your mappers.  This value is in megabytes (MB).
  • mapreduce.reduce.memory.mb – This defines the overall size of the JVM that is used by your reducers.  This value is in megabytes (MB).



As we should know by now, on YARN memory allocation…everything is a container. This includes the following components:

  • Application Master – It controls the execution of the application (such as re-running tasks when they fail).
  • Mappers – Process all of the input files and grab the information out of them necessary to perform the job.
  • Reducers – These containers take information from the mappers and distill it into the final result of the job (which goes into the output files.)

Global Settings

This two properties handles containers memory allocation:

  • yarn.scheduler.minimum.allocation.mb – This is the smallest allowed value for a container.  Any requests for a container that ask for a value smaller than this will be set to this value.  This value is in megabytes (MB).
  • yarn.scheduler.maximum.allocation.mb – This is the largest allowed value for a container.  Any requests for a container that ask for a value larger than this will be set to this value.  This value is in megabytes (MB)


Hadoop 1 vs Hadoop 2

There are a lot of articles about this, but, I just needed a good summary of concepts:

Hadoop 1:

Hadoop1A master process called the JobTracker is the central scheduler for all MapReduce jobs in the cluster.

Nodes have a TaskTracker process that manages tasks on the individual nodes. The TaskTrackers, communicate with and are controlled by the JobTracker. Similar to most resource managers, the JobTracker has two pluggable scheduler modules, Capacity and Fair.

The JobTracker is responsible for managing the TaskTrackers on worker server nodes, tracking resource consumption and availability, scheduling individual job tasks, tracking progress, and providing fault tolerance for tasks.

The TaskTracker is directed by the JobTracker and is responsible for launch and tear down of jobs and provides task status information to the JobTracker.

The TaskTrackers also communicate through heartbeats to the JobTracker: If the JobTracker does not receive a heartbeat from a TaskTracker, it assumes it has failed and takes appropriate action (e.g., restarts jobs).

Hadoop 2 (YARN):


In YARN, the job tracker is split into two different daemons called ResourceManager and NodeManager (node specific).

Also there are new components: an ApplicationMaster and application Containers.

The ResourceManager is a pure scheduler. Its sole purpose is to manage available resources among multiple applications on the cluster. As with version 1, both Fair and Capacity scheduling options are available.

The ApplicationMaster is responsible for accepting job submissions, negotiating resource Containers from the ResourceManager, and tracking progress.

ApplicationMasters are specific to and written for each type of application. For example, YARN includes a distributed Shell framework that runs a shell script on multiple nodes on the cluster. The ApplicationMaster also provides the service for restarting the ApplicationMaster Container on failure.

ApplicationMasters request and manage Containers, which grant rights to an application to use a specific amount of resources (memory, CPU, etc.) on a specific host.

The ApplicationMaster, once given resources by the ResourceManager, contacts the NodeManager to start individual tasks. For example, using the MapReduce framework, these tasks would be mapper and reducer processes.

The NodeManager is the per-machine framework agent that is responsible for Containers, monitoring their resource usage (CPU, memory, disk, network), and reporting back to the ResourceManager.

On previous figure we have two ApplicationMasters running within the cluster, one of which has three Containers (the red client) and one that has one Container (the blue client). Note that the ApplicationMasters run on cluster nodes and not as part of the ResourceManager, thus reducing the pressure on a central scheduler. Also, because ApplicationMasters have dynamic control of Containers, cluster utilization can be improved.


– Hadoop 2 has the concept of containers, while Hadoop 1 has slots. Containers are generic and can run any type of tasks, but a slot can run either a map or a reduce task.

– The same MR program without any modifications can be executed in Hadoop 1 and 2, but needs to be compiled with the appropriate set of jar files.

– Hadoop 1 allows us to write programs in only MR, while Hadoop 2 with  resource management framework (YARN) allows to write programs in multiple distributed computing models.  As example MR, Spark, Hama, Giraph, MPI and HBase coprocessors, among others.

MapReduce: Compression and Input Splits

This is something that always rise doubts:

When considering compressed data that will be processed by MapReduce, it is important to check if the compression format supports splitting. If not, the number of map tasks may not be the expected.

Let’s suppose an uncompressed file stored in HDFS whose size is 1 GB: With a HDFS block size of 64 MB, the file will be stored as 16 blocks, and a MapReduce job using this file as input will create 16 input splits, each processed independently as input to a separate map task.

Now if the file is a gzip-compressed file whose compressed size is 1 GB: As before, HDFS will store the file as 16 blocks. But, creating a split for each block will not work since it is impossible to start reading at an arbitrary point in the gzip stream, and therefore impossible for a map task to read its split independently of the others.

In this case, MapReduce will not try to split the gzipped file, since it knows that the input is gzip-compressed (by looking at the filename extension) and that gzip does not support splitting.

At this scenario a single map will process the 16 HDFS blocks, most of which will not be local to the map (it will have additionally a data locality cost).

This Job, will not parallelize as expected, it will be less granular, and so may take longer to run.

The gzip format uses DEFLATE to store the compressed data, and DEFLATE stores data as a series of compressed blocks. The problem is that the start of each block is not distinguished in any way that would allow a reader positioned at an arbitrary point in the stream to advance to the beginning of the next block, thereby synchronizing itself with the stream. For this reason, gzip does not support splitting.

Here we have a summary of compression formats:

hadoop_spplitable_formats(a)  DEFLATE is a compression algorithm whose standard implementation is zlib. There is no commonly available command-line tool for producing files in DEFLATE format, as gzip is normally used. (Note that the gzip file format is DEFLATE with extra headers and a footer.) The .deflate filename extension is a Hadoop convention.

Source: Hadoop The Definitive Guide.