EBS Storage Performance Notes – Instance throughput vs Volume throughput


I just wanted to write a couple lines/guidance on this regard as this is a recurring question when configuring storage, not only in the cloud, but can also happen on bare metal servers.

What is throughput on a volume?

Throughput is the measure of the amount of data transferred from/to a storage device per time unit (typically seconds).

The throughput consumed on a volume is calculated using this formula:

IOPS (IO Ops per second) x BS (block size)= Throughput

As example, if we are writing at 1200 Ops/Sec, and the chunk write size is around 125Kb, we will have a total throughput of about 150Mb/sec.

Why is this important?

This is important because we have to be aware of the Maximum Total Throughput Capacity for a specific volume vs the Maximum Total Instance Throughput.

Because, if your instance type (or server) is able to produce a throughput of 1250MiB/s (i.e M4.16xl)) and your EBS Maximum Throughput is 500MiB/s (i.e. ST1), not only you will hit a bottleneck trying to write to the specific volumes, but also throttling might occur (i.e. EBS on cloud services).

How do I find what is the Maximum throughput for EC2 instances and EBS volumes?

Here is documentation about Maximum Instance Throughput for every instance type on EC2: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-optimized.html

And here about the EBS Maximum Volume throughput: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-volume-types.html

How do I solve the problem ?

If we have an instance/server that has more throughput capabilities than the volume, just add or split the storage capacity into more volumes. So the load/throughput will be distributed across the volumes.

As an example, here are some metrics with different volume configurations:

1 x 3000GB – 9000IOPS volume:

GP2-1x3000GB-9000IOPS

3 x 1000GB – 3000IOPS volume:

GP2-3x1000GB-3000IOPS

Look at some of the metrics: these are using the same instance type (m4.10xl – 500Mb/s throughput), same volume type (GP2 – 160Mb/s throughput) and running the same job:

  • Using 1 volume, Write/Read Latency is around 20-25 ms/op. This value is high compared to 3x1000GB volumes.
  • Using 1 volume, Avg Queue length 25. The queue depth is the number of pending I/O requests from your application to your volume. For maximum consistency, a Provisioned IOPS volume must maintain an average queue depth (rounded to the nearest whole number) of one for every 500 provisioned IOPS in a minute. On this scenario 9000/500=18. Queue length of 18 or higher will be needed to reach 9000 IOPS.
  • Burst Balance is 100%, which is Ok, but if this balance drops to zero (it will happen if volume capacity keeps being exceeded), all the requests will be throttled and you’ll start seeing IO errors.
  • On both scenarios, Avg Write Size is pretty large (around 125KiB/op) which will typically cause the volume to hit the throughput limit before hitting the IOPS limit.
  • Using 1 volume, Write throughput is around 1200 Ops/Sec. Having write size around 125Kb, it will consume about 150Mb/sec. (IOPS x BS = Throughput)

Secondary NameNode in Hadoop 2


This is a frequent asked question:

In hadoop 2, Secondary Name Node can be implemented in two ways:

1. With HA (High Availability Cluster): if you are setting up HA cluster then you may not need to use Secondary namenode because standby namenode keep its state synchronized with the Active namenode.

The HDFS NameNode High Availability feature enables you to run redundant NameNodes in the same cluster in an Active/Passive configuration with a hot standby.Both NameNode require the same type of hardware configuration.In HA hadoop cluster Active NameNode reads and write metadata information in Separate JournalNode.

In the event of failover, standby NameNode will ensure that its namespace is completely updated according to edit logs before it is changes to active state. So there is no need of Secondary NameNode in this Cluster Setup.

2. Without HA: you can have a hadoop setup without standby node. Then the secondary NameNode will act as you already mentioned in Hadoop 1.x

 

Source: https://stackoverflow.com/questions/37830777/use-of-secondary-namenode-in-hadoop-in-2-x

s3:// vs s3n:// vs s3a:// vs EMRFS


s3://

Apache Hadoop implementation of a block-based filesystem backed by S3. Apache Hadoop has deprecated use of this filesystem as of May 2016.

s3n://

A native filesystem for reading and writing regular files on S3. S3N allows Hadoop to access files on S3 that were written with other tools, and conversely, other tools can access files written to S3N using Hadoop. S3N is stable and widely used, but it is not being updated with any new features. S3N requires a suitable version of the jets3t JAR on the classpath.

  • Uses jets3t

s3a://

Hadoop’s successor to the S3N filesystem. S3A uses Amazon’s libraries to interact with S3. S3A supports accessing files larger than 5 GB, and it provides performance enhancements and other improvements. For Apache Hadoop, S3A is the successor to S3N and is backward compatible with S3N. Using Apache Hadoop, all objects accessible from s3n:// URLs should also be accessible from S3A by replacing the URL scheme.

  • Uses AWS SDK.
  • Amazon EMR does not currently support use of the Apache Hadoop S3A file system.

EMRFS:

On Amazon EMR, both the s3:// and s3n:// URIs are associated with the EMR filesystem and are functionally interchangeable in the context of Amazon EMR. For consistency sake, however, it is recommended to use the s3:// URI in the context of Amazon EMR.

EMRFS can be used by invoking the prefix s3n:// or s3:// or s3a:// depending on the client application implementation.

Source: https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/

Copy Data with Hive and Spark / Copiar Datos con Hive y Spark


These are two examples of how to copy data from one S3 location to other S3 location. Same operation can be done from S3 to HDFS and vice-versa.

I’m considering that you are able to launch the Hive client or spark-shell client.

Hive:

Using Mapreduce engine or Tez engine:

set hive.execution.engine=mr; 

or

set hive.execution.engine=tez; 
CREATE EXTERNAL TABLE source_table(a_col string, b_col string, c_col string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://mybucket/hive/csv/';

CREATE TABLE destination_table(a_col string, b_col string, c_col string) LOCATION 's3://mybucket/output-hive/csv_1/';

INSERT OVERWRITE TABLE destination_table SELECT * FROM source_table;

Spark:

sc.textFile("s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1387346051826/warc/").saveAsTextFile("s3://mybucket/spark/bigfiles")

 

If you want to copy data to HDFS, you can also explore s3-dist-cp:

s3DistCP:

s3-dist-cp --src s3://mybucket/hive/csv/ --dest=hdfs:///output-hive/csv_10/

 

HBase and Zookeeper debugging


I came across some scenarios where an application (i.e. Mapreduce) communicating to HBase through YARN could silently fail with a timeout like the following:

2017-01-30 19:42:03,657 DEBUG [main] org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation: locateRegionInMeta parentTable=hbase:meta, metaLocation=, attempt=9 of 35 failed; retrying after sleep of 10095 because: Failed after attempts=36, exceptions:
Mon Jan 30 19:42:03 UTC 2017, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68463: row 'test2,#cmrNo acctNo,99999999999999' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=ip-172-31-3-246.us-west-2.compute.internal,16000,1485539268192, seqNum=0

The root cause for this behavior here wasn’t related to any missconfiguration at server/networking side, but a library missing in the class path.

When there is a zookeeper issue, depending on the retry parameters the exceptions are not visible.

On this case, In the Mapreduce Java application I’ve added/modified the following parameters that lead into more visibility in the communication layer between Zookeeper and HBase:

conf.set("hbase.client.retries.number", Integer.toString(1));
conf.set("zookeeper.session.timeout", Integer.toString(60000));
conf.set("zookeeper.recovery.retry", Integer.toString(1));


After this, the following exception was visible:

Exception: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
 at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:157)
 at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:65)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: com.google.protobuf.ServiceException: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge

 

Playing around these parameters will cause the application exit quickly when there is a problem with the cluster. This can be desirable in a production environment.

Reducing the parameters to a more conservative value could yield better recovery times.  Setting zookeeper.recovery.retry to 0 will still result in up to two connection attempts made to all zk servers in the quorum and cause and application failure to happen in under a minute should there be a loss of zookeeper connectivity during execution.

 

As an additional note, if you are receiving timeouts because the application is trying to contact localhost instead of the quorum server, you can set the explicit parameters:

// HBase through MR on Yarn is trying to connect to localhost instead of quorum.
        conf.set("hbase.zookeeper.quorum","172.31.3.246");
        conf.set("hbase.zookeeper.property.clientPort","2181");

 

I’ve added a couple of examples of Mapreduce applications for HBase here: https://github.com/hvivani/bigdata/tree/master/hbase

 

Some additional notes on this behavior: https://discuss.pivotal.io/hc/en-us/articles/200933006-Hbase-application-hangs-indefinitely-connecting-to-zookeeper

 

Checking Yarn child execution environment


Never go out without this:

$ sudo -u yarn jps
27343 YarnChild
4156 NodeManager
27292 Jps

$ sudo strings -f /proc/27343/environ
/proc/27343/environ: STDERR_LOGFILE_ENV=/var/log/hadoop-yarn/containers/application_1485807340469_0019/container_1485807340469_0019_01_000003/stderr
/proc/27343/environ: SHELL=/bin/bash
/proc/27343/environ: TERM=linux
/proc/27343/environ: HADOOP_HOME=/usr/lib/hadoop
/proc/27343/environ: YARN_PID_DIR=/var/run/hadoop-yarn
/proc/27343/environ: NM_HOST=ip-172-31-5-156.us-west-2.compute.internal
/proc/27343/environ: HADOOP_PREFIX=/usr/lib/hadoop
/proc/27343/environ: YARN_OPTS= -XX:OnOutOfMemoryError='kill -9 %p' -XX:OnOutOfMemoryError='kill -9 %p' -server  -Dhadoop.log.dir=/var/log/hadoop-yarn -Dyarn.log.dir=/var/log/hadoop-yarn -Dhadoop.log.file=yarn-yarn-nodemanager-ip-172-31-5-156.log -Dyarn.log.file=yarn-yarn-nodemanager-ip-172-31-5-156.log -Dyarn.home.dir=/usr/lib/hadoop-yarn -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.root.logger=INFO,DRFA -Dyarn.root.logger=INFO,DRFA -Dsun.net.inetaddr.ttl=30 -Djava.library.path=:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native
/proc/27343/environ: NM_AUX_SERVICE_mapreduce_shuffle=AAA0+gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=
/proc/27343/environ: YARN_NICENESS=0
/proc/27343/environ: NM_HTTP_PORT=8042
/proc/27343/environ: LOCAL_DIRS=/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019,/mnt1/yarn/usercache/hadoop/appcache/application_1485807340469_0019
/proc/27343/environ: USER=hadoop
/proc/27343/environ: JAVA_LIBRARY_PATH=:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native
/proc/27343/environ: LD_LIBRARY_PATH=/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019/container_1485807340469_0019_01_000003:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
/proc/27343/environ: JSVC_HOME=/usr/lib/bigtop-utils
/proc/27343/environ: HADOOP_LIBEXEC_DIR=/usr/lib/hadoop/libexec
/proc/27343/environ: HADOOP_TOKEN_FILE_LOCATION=/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019/container_1485807340469_0019_01_000003/container_tokens
/proc/27343/environ: SVC_USER=yarn
/proc/27343/environ: LOG_DIRS=/var/log/hadoop-yarn/containers/application_1485807340469_0019/container_1485807340469_0019_01_000003
/proc/27343/environ: MALLOC_ARENA_MAX=4
/proc/27343/environ: HADOOP_JOB_HISTORYSERVER_HEAPSIZE=2396
/proc/27343/environ: YARN_ROOT_LOGGER=INFO,DRFA
/proc/27343/environ: NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
/proc/27343/environ: PATH=/usr/local/sbin:/usr/local/bin:/usr/bin:/usr/sbin:/sbin:/bin
/proc/27343/environ: CONF_DIR=/etc/hadoop/conf
/proc/27343/environ: YARN_IDENT_STRING=yarn
/proc/27343/environ: HADOOP_HDFS_HOME=/usr/lib/hadoop-hdfs
/proc/27343/environ: DAEMON_FLAGS=nodemanager
/proc/27343/environ: HADOOP_CLIENT_OPTS=
/proc/27343/environ: PWD=/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019/container_1485807340469_0019_01_000003
/proc/27343/environ: HADOOP_COMMON_HOME=/usr/lib/hadoop
/proc/27343/environ: HADOOP_YARN_HOME=/usr/lib/hadoop-yarn
/proc/27343/environ: JAVA_HOME=/usr/lib/jvm/java-openjdk
/proc/27343/environ: HADOOP_CLASSPATH=/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019/container_1485807340469_0019_01_000003:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019/container_1485807340469_0019_01_000003/*:/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019/container_1485807340469_0019_01_000001:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019/container_1485807340469_0019_01_000001/*:/usr/lib/hbase/*:/usr/lib/hbase/lib/*:/etc/tez/conf:/usr/lib/tez/*:/usr/lib/tez/lib/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*
/proc/27343/environ: HADOOP_CONF_DIR=/etc/hadoop/conf
/proc/27343/environ: DAEMON=hadoop-yarn-nodemanager
/proc/27343/environ: STDOUT_LOGFILE_ENV=/var/log/hadoop-yarn/containers/application_1485807340469_0019/container_1485807340469_0019_01_000003/stdout
/proc/27343/environ: LANG=en_US.UTF-8
/proc/27343/environ: SLEEP_TIME=10
/proc/27343/environ: XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
/proc/27343/environ: HADOOP_OPTS= -server -XX:OnOutOfMemoryError='kill -9 %p' -Dhadoop.log.dir=/usr/lib/hadoop/logs -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str= -Dhadoop.root.logger=INFO,console -Djava.library.path=:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -server -XX:OnOutOfMemoryError='kill -9 %p' -Dhadoop.log.dir=/usr/lib/hadoop/logs -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str= -Dhadoop.root.logger=INFO,console -Djava.library.path=:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/usr/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true
/proc/27343/environ: PIDFILE=/var/run/hadoop-yarn/yarn-yarn-nodemanager.pid
/proc/27343/environ: YARN_LOG_DIR=/var/log/hadoop-yarn
/proc/27343/environ: DESC=Hadoop nodemanager
/proc/27343/environ: EXEC_PATH=/usr/lib/hadoop-yarn/sbin/yarn-daemon.sh
/proc/27343/environ: SHLVL=5
/proc/27343/environ: HOME=/home/
/proc/27343/environ: JVM_PID=27333
/proc/27343/environ: YARN_CONF_DIR=/etc/hadoop/conf
/proc/27343/environ: YARN_LOGFILE=yarn-yarn-nodemanager-ip-172-31-5-156.log
/proc/27343/environ: YARN_NODEMANAGER_HEAPSIZE=2048
/proc/27343/environ: UPSTART_INSTANCE=
/proc/27343/environ: HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
/proc/27343/environ: LOGNAME=hadoop
/proc/27343/environ: NM_PORT=8041
/proc/27343/environ: HADOOP_HOME_WARN_SUPPRESS=true
/proc/27343/environ: CLASSPATH=/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019/container_1485807340469_0019_01_000003:/etc/hadoop/conf:/usr/lib/hadoop/*:/usr/lib/hadoop/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/lib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/lib/hadoop-mapreduce/share/hadoop/mapreduce/*:/usr/lib/hadoop-mapreduce/share/hadoop/mapreduce/lib/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/lib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:/mnt/yarn/usercache/hadoop/appcache/application_1485807340469_0019/container_1485807340469_0019_01_000003/*
/proc/27343/environ: CONTAINER_ID=container_1485807340469_0019_01_000003
/proc/27343/environ: YARN_PROXYSERVER_HEAPSIZE=2396
/proc/27343/environ: HADOOP_ROOT_LOGGER=DEBUG,console
/proc/27343/environ: WORKING_DIR=/var/lib/hadoop-yarn
/proc/27343/environ: UPSTART_JOB=hadoop-yarn-nodemanager
/proc/27343/environ: HADOOP_NAMENODE_HEAPSIZE=1740
/proc/27343/environ: HADOOP_DATANODE_HEAPSIZE=757
/proc/27343/environ: YARN_RESOURCEMANAGER_HEAPSIZE=2396
/proc/27343/environ: BASH_FUNC_run_prestart()=() {  su -s /bin/bash $SVC_USER -c "cd $WORKING_DIR && $EXEC_PATH --config '$CONF_DIR' start $DAEMON_FLAGS"
/proc/27343/environ: _=/usr/lib/jvm/java-openjdk/bin/java

Creating Bigtop patches


To contribute to Bigtop project, we need to submit a patch.

We should follow this process for managing our proposed contributions:

  1. Create a Jira ticket with the description of the problem. (Note: the ticket should be Minor priority for most things, and only Major if it is fixing a bug that prevents something from working as expected; also the component will be “deployment” for new charms or bundles, as well as for changes to the puppet manifests.)
  2. Create branch in /hvivani/bigtop fork named BIGTOP-XXXX, where XXXX is the Jira ticket number, e.g. BIGTOP-2417.
  3. Commit the change with a commit message of the form BIGTOP-XXXX: message, where message must match the title of your JIRA ticket. Then push it to the branch.
  4. Open a pull request from your branch to the upstream Apache Bigtop repository, e.g. PR 138. Again, ensure the PR title exactly matches the title of your Jira ticket prefixed by the BIGTOP-XXXX ticket number.
  5. Refresh your Jira ticket. You should see your GitHub PR linked to the ticket. You should also see a comment by ASF GitHub Bot with information about the PR which includes a link to the PR’s patch file, which is the PR URL with .patch appended to it. For example, BIGTOP-2417 contains a link to https://github.com/apache/bigtop/pull/138.patch.
  6. Once you do it, click the Submit Patch button in the ticket to inform committers that this ticket has a Patch Available.

FileInputFormat vs. CombineFileInputFormat


When you put a file into HDFS, it is converted to blocks of 128 MB. (Default value for HDFS on EMR) Consider a file big enough to consume 10 blocks. When you read that file from HDFS as an input for a MapReduce job, the same blocks are usually mapped, one by one, to splits.In this case, the file is divided into 10 splits (which implies means 10 map tasks) for processing. By default, the block size and the split size are equal, but the sizes are dependent on the configuration settings for the InputSplit class.

From a Java programming perspective, the class that holds the responsibility of this conversion is called an InputFormat, which is the main entry point into reading data from HDFS. From the blocks of the files, it creates a list of InputSplits. For each split, one mapper is created. Then each InputSplit is divided into records by using the RecordReader class. Each record represents a key-value pair.

FileInputFormat vs. CombineFileInputFormat

Before a MapReduce job is run, you can specify the InputFormat class to be used. The implementation of FileInputFormat requires you to create an instance of the RecordReader, and as mentioned previously, the RecordReader creates the key-value pairs for the mappers.

FileInputFormat is an abstract class that is the basis for a majority of the implementations of InputFormat. It contains the location of the input files and an implementation of how splits must be produced from these files. How the splits are converted into key-value pairs is defined in the subclasses. Some example of its subclasses are TextInputFormat, KeyValueTextInputFormat, and CombineFileInputFormat.

Hadoop works more efficiently with large files (files that occupy more than 1 block). FileInputFormat converts each large file into splits, and each split is created in a way that contains part of a single file. As mentioned, one mapper is generated for each split.

FileInputFormatLargeFile

However, when the input files are smaller than the default block size, many splits (and therefore, many mappers) are created. This arrangement makes the job inefficient. This Figure shows how too many mappers are created when FileInputFormat is used for many small files.

FileInputFormatManySmallFiles

To avoid this situation, CombineFileInputFormat is introduced. This InputFormat works well with small files, because it packs many of them into one split so there are fewer mappers, and each mapper has more data to process. This Figure shows how CombineFileInputFormat treats the small files so that fewer mappers are created.

CombineFileInputFormatSmallFiles