MapReduce: Compression and Input Splits

This is something that always rise doubts:

When considering compressed data that will be processed by MapReduce, it is important to check if the compression format supports splitting. If not, the number of map tasks may not be the expected.

Let’s suppose an uncompressed file stored in HDFS whose size is 1 GB: With a HDFS block size of 64 MB, the file will be stored as 16 blocks, and a MapReduce job using this file as input will create 16 input splits, each processed independently as input to a separate map task.

Now if the file is a gzip-compressed file whose compressed size is 1 GB: As before, HDFS will store the file as 16 blocks. But, creating a split for each block will not work since it is impossible to start reading at an arbitrary point in the gzip stream, and therefore impossible for a map task to read its split independently of the others.

In this case, MapReduce will not try to split the gzipped file, since it knows that the input is gzip-compressed (by looking at the filename extension) and that gzip does not support splitting.

At this scenario a single map will process the 16 HDFS blocks, most of which will not be local to the map (it will have additionally a data locality cost).

This Job, will not parallelize as expected, it will be less granular, and so may take longer to run.

The gzip format uses DEFLATE to store the compressed data, and DEFLATE stores data as a series of compressed blocks. The problem is that the start of each block is not distinguished in any way that would allow a reader positioned at an arbitrary point in the stream to advance to the beginning of the next block, thereby synchronizing itself with the stream. For this reason, gzip does not support splitting.

Here we have a summary of compression formats:

hadoop_spplitable_formats(a)  DEFLATE is a compression algorithm whose standard implementation is zlib. There is no commonly available command-line tool for producing files in DEFLATE format, as gzip is normally used. (Note that the gzip file format is DEFLATE with extra headers and a footer.) The .deflate filename extension is a Hadoop convention.

Source: Hadoop The Definitive Guide.


skb rides the rocket

[21068723.434629] xen_netfront: xennet: skb rides the rocket: 19 slots

skb rides the rocket bug, this issue affects often Hadoop clusters.

Each time I face it, I remember this excelent blog post from Brendan Gregg:


yarn: change configuration and restart node manager on a live cluster

This procedure is to change Yarn configuration on a live cluster, propagate the changes to all the nodes and restart Yarn node manager.

Both commands are listing all the nodes on the cluster and then filtering the DNS name to execute a remote command via SSH. You can customize the sed filter depending on your own needs. This is filtering DNS names with Elastic Mapreduce format (

1. Upload the private key (.pem) file you are using to access the master node on the cluster. Change the private key permissions to at least 600 (i.e chmod 600 MyKeyName.pem)

2.  Change /conf/yarn-site.xml and use a command like this to populate the change across the cluster.

yarn node -list|sed -n "s/^\(ip[^:]*\):.*/\1/p" | xargs -t -I{} -P10 scp -o StrictHostKeyChecking=no -i ~/MyKeyName.pem ~/conf/yarn-site.xml hadoop@{}://home/hadoop/conf/

3. This command will restart Yarn Node Resource manager on all the nodes.

 yarn node -list|sed -n "s/^\(ip[^:]*\):.*/\1/p" | xargs -t -I{} -P10 ssh -o StrictHostKeyChecking=no -i ~/MyKeyName.pem hadoop@{} "yarn nodemanager stop"


Hadoop 1 vs Hadoop 2 – How many slots do I have per node ?

This is a topic that always rise a discussion…

In Hadoop 1, the number of tasks launched per node was specified via the settings and mapred.reduce.tasks.maximum.

But this is ignored when set on Hadoop 2.

In Hadoop 2 with YARN, we can determine how many concurrent tasks are launched per node by dividing the resources allocated to YARN by the resources allocated to each MapReduce task, and taking the minimum of the two types of resources (memory and CPU).

This approach is an improvement over that of Hadoop 1, because the administrator no longer has to bundle CPU and memory into a Hadoop-specific concept of a “slot”.

The number of tasks that will be spawned per node:

    yarn.nodemanager.resource.memory-mb / mapreduce.[map|reduce].memory.mb
    yarn.nodemanager.resource.cpu-vcores / mapreduce.[map|reduce].cpu.vcores

Obtained value will be set on the variable ‘mapreduce.job.maps‘ on the ‘mapred-site.xml‘ file.

Of course, YARN is more dynamic than that, and each job can have unique resource requirements — so in a multitenant cluster with different types of jobs running, the calculation isn’t as straightforward.

More information:

Hadoop useful commands

– Copy fromLocal/ToLocal from/to S3:

$ bin/hadoop fs -copyToLocal s3://my-bucket/myfile.rb /home/hadoop/myfile.rb
$ bin/hadoop fs -copyFromLocal job5.avro s3://my-bucket/input

– Merge all the files from one folder into one single file:

$ hadoop jar ~/lib/emr-s3distcp-1.0.jar --src s3://my-bucket/my-folder/ --dest s3://my-bucket/logs/all-the-files-merged.log --groupBy '.*(*)' --outputCodec none

– Create directory on HDFS:

$ bin/hadoop fs -mkdir -p /user/ubuntu

– List HDFS directory:

bin/hadoop fs -ls /

– Put a file in HDFS:

bin/hadoop dfs -put localfile.txt /user/hadoop/hadoopfile

– Check HDFS filesystem utilization:

$ bin/hadoop dfsadmin -report

– Cat of file on HDFS:

$ bin/hadoop  dfs -cat /user/ubuntu/RESULTS/part-00000

More commands:

Hadoop: HDFS find / recover corrupt blocks

1) Search for files on corrupt files:

A command like ‘hadoop fsck /’ will show the status of the filesystem and any corrupt files. This command will ignore lines with nothing but dots and lines talking about replication:

hadoop fsck / | egrep -v '^\.+$' | grep -v eplica

2) Determine the corrupt blocks:

hadoop fsck /path/to/corrupt/file -locations -blocks -files

(Use that output to determine where blocks might live. If the file is larger than your block size it might have multiple blocks.)

3) Try to copy the files to S3 with s3distcp or s3cmd. If that fails, you will have the option to run:

hadoop fsck -move

which will move what is left of the corrupt blocks into hdfs /lost+found

4) Delete the file:

hadoop fs -rm /path/to/file/with/permanently/missing/blocks

Check file system state again with step 1.

A more drastic command is:

hadoop fsck / -delete

that will search and delete all corrupted files.

Hadoop should not use corrupt blocks again unless the replication factor is low and it does not have enough replicas


HDFS: Cluster to cluster copy with distcp

Este es el formato del comando distcp para copiar de hdfs a hdfs considerando cluster origen y destino en Amazon AWS:

hadoop distcp "hdfs://ec2-54-86-202-252.compute-1.amazonaws.comec2-2:9000/tmp/test.txt" "hdfs://ec2-54-86-229-249.compute-1.amazonaws.comec2-2:9000/tmp/test1.txt"

Mas informacion sobre distcp:


Arquitectura HDFS

El diseño del sistema de archivos HDFS se basa en el Google File System (GFS).

– Es capaz de almacenar una gran cantidad de datos (terabytes o petabytes).

– Esta diseñado para almacenar los datos a traves de un gran numero de maquinas.

– Implementa replicacion de datos para enfrentar mal funcionamiento o perdida de equipos en el cluster.

– Para mejorar la relacion Hadoop – MapReduce, HDFS permite que los datos sean leidos y procesados localmente.


Los archivos de entrada se dividen en bloques de un tamaño fijo (64Mb por default), que se almacenan de manera distribuida en un cluster Hadoop. Un archivo puede estar formado por varios bloques, que se almacenan en diferentes DataNodes (máquinas individuales en el cluster) escogidos al azar. Como resultado, el acceso a un archivo por lo general requiere el acceso a múltiples DataNodes, lo que significa que el HDFS soporta tamaños de archivo mucho más grandes que una capacidad de disco de una sola máquina.

El NameNode, almacena toda la metadata del sistema de archivos en el clúster. Esto significa que HDFS implementa una arquitectura maestro / esclavo. Un único NameNode (que es un servidor primario) gestiona el espacio de nombres del sistema de archivos y se regula el acceso a los archivos de los clientes. La existencia de un único maestro en un clúster simplifica en gran medida la arquitectura del sistema, pero tiene como debilidad que es un unico punto de falla (Single Point of Failure). El NameNode sirve como un solo árbitro y repositorio para todos los metadatos HDFS.

Debido a la relativamente baja cantidad de metadata por archivo (sólo controla los nombres de archivo, los permisos y la ubicación de cada bloque), el NameNode almacena todos los metadatos en la memoria principal, lo que permite un rápido acceso aleatorio. Como resultado, un NameNode con 4 GB de RAM es capaz de soportar un gran número de archivos y directorios.

Varios DataNodes son servidores de un unico archivo, lo que significa que un archivo puede estar disponible en caso de que se pierda una de esas máquinasHDFS replica cada bloque a través de una serie de máquinas (tres, de manera predeterminada).

Cada DataNode envía periódicamente un heartbeat al NameNode. El NameNode marca los DataNode que no han enviado su hearbeat durante 10 minutos (default) como muertos y deja de enviar I/O requests a dichos nodos. Alli comienza el proceso de replicacion de los datos que contenia dicho nodo para mantener el replication factor (3 por default).

Si el replication factor es de 3, significa que el dato tiene que estar almacenado en 3 nodos en todo momento.