Secondary NameNode in Hadoop 2


This is a frequent asked question:

In hadoop 2, Secondary Name Node can be implemented in two ways:

1. With HA (High Availability Cluster): if you are setting up HA cluster then you may not need to use Secondary namenode because standby namenode keep its state synchronized with the Active namenode.

The HDFS NameNode High Availability feature enables you to run redundant NameNodes in the same cluster in an Active/Passive configuration with a hot standby.Both NameNode require the same type of hardware configuration.In HA hadoop cluster Active NameNode reads and write metadata information in Separate JournalNode.

In the event of failover, standby NameNode will ensure that its namespace is completely updated according to edit logs before it is changes to active state. So there is no need of Secondary NameNode in this Cluster Setup.

2. Without HA: you can have a hadoop setup without standby node. Then the secondary NameNode will act as you already mentioned in Hadoop 1.x

 

Source: https://stackoverflow.com/questions/37830777/use-of-secondary-namenode-in-hadoop-in-2-x

Adding a mount point to HDFS


Before proceeding:

This procedure considers that you don’t have any current useful data on HDFS. All the data will be lost after adding mount points with this method.

This procedure should be applied to every datanode in the cluster. No intervention in the master node is needed if the framework is configured properly.

#checking available block devices:
[ec2-user@ip-10-0-15-76 media]$ lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
nvme2n1 259:4 0 2.5T 0 disk
nvme1n1 259:3 0 2.5T 0 disk /media/ebs0
nvme4n1 259:6 0 2.5T 0 disk
nvme0n1 259:0 0 2G 0 disk
├─nvme0n1p1 259:1 0 2G 0 part /
└─nvme0n1p128 259:2 0 1M 0 part
nvme3n1 259:5 0 2.5T 0 disk

#checking formatted filesystem:
[ec2-user@ip-10-0-15-76 media]$ sudo file -s /dev/nvme2n1
/dev/nvme2n1: data

(this filesystem is not formatted)

#formatting to ext4:
[ec2-user@ip-10-0-15-76 media]$ sudo mkfs -t ext4 /dev/nvme2n1
mke2fs 1.42.12 (29-Aug-2014)
Creating filesystem with 655360000 4k blocks and 163840000 inodes
Filesystem UUID: 6d9c997f-d47b-4529-85c8-e56e8ef47a1d
Superblock backups stored on blocks:
32768, 98304, 163840, 229376, 294912, 819200, 884736, 1605632, 2654208,
4096000, 7962624, 11239424, 20480000, 23887872, 71663616, 78675968,
102400000, 214990848, 512000000, 550731776, 644972544

Allocating group tables: done
Writing inode tables: done
Creating journal (32768 blocks): done
Writing superblocks and filesystem accounting information: done

#mounting
[ec2-user@ip-10-0-15-76 media]$ sudo mkdir /media/ebs1
[ec2-user@ip-10-0-15-76 media]$ sudo mount /dev/nvme2n1 /media/ebs1
[ec2-user@ip-10-0-15-76 media]$ lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
nvme2n1 259:4 0 2.5T 0 disk /media/ebs1
nvme1n1 259:3 0 2.5T 0 disk /media/ebs0
nvme4n1 259:6 0 2.5T 0 disk
nvme0n1 259:0 0 2G 0 disk
├─nvme0n1p1 259:1 0 2G 0 part /
└─nvme0n1p128 259:2 0 1M 0 part
nvme3n1 259:5 0 2.5T 0 disk

#final mount result
[ec2-user@ip-10-0-60-46 ~]$ lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
nvme2n1 259:4 0 2.5T 0 disk /media/ebs1
nvme1n1 259:3 0 2.5T 0 disk /media/ebs0
nvme4n1 259:6 0 2.5T 0 disk /media/ebs3
nvme0n1 259:0 0 2G 0 disk
├─nvme0n1p1 259:1 0 2G 0 part /
└─nvme0n1p128 259:2 0 1M 0 part
nvme3n1 259:5 0 2.5T 0 disk /media/ebs2

#checking mount points in hdfs-site.xml
[ec2-user@ip-10-0-60-46 media]$ cat /opt/hadoop-2.7.3/etc/hadoop/hdfs-site.xml |grep -A1 dfs.datanode.data.dir
<name>dfs.datanode.data.dir</name>
<value>/media/ebs0/hadoop/datanodes,/media/ebs1/hadoop/datanodes,/media/ebs2/hadoop/datanodes,/media/ebs3/hadoop/datanodes</value>

# create defined directory structure on mount point (for each mount point):
sudo mkdir -p /media/ebs1/hadoop/datanodes

# modify owner to the user that will start DFS (for each mount point):
sudo chown -R ec2-user:ec2-user /media/ebs1/hadoop/datanodes

#format namenode:
hadoop namenode -format

# stop/start DFS:
/opt/hadoop-2.7.3/sbin/stop-dfs.sh
/opt/hadoop-2.7.3/sbin/start-dfs.sh

# check service start status
tail -f /var/log/hadoop/hadoop-ec2-user-datanode-ip-10-0-15-76.log

 

**some ENV variables I usually use on these environments:

export HADOOP_SSH_OPTS="-i /home/ec2-user/.ssh/mykey -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.151.x86_64/jre

s3:// vs s3n:// vs s3a:// vs EMRFS


s3://

Apache Hadoop implementation of a block-based filesystem backed by S3. Apache Hadoop has deprecated use of this filesystem as of May 2016.

s3n://

A native filesystem for reading and writing regular files on S3. S3N allows Hadoop to access files on S3 that were written with other tools, and conversely, other tools can access files written to S3N using Hadoop. S3N is stable and widely used, but it is not being updated with any new features. S3N requires a suitable version of the jets3t JAR on the classpath.

  • Uses jets3t

s3a://

Hadoop’s successor to the S3N filesystem. S3A uses Amazon’s libraries to interact with S3. S3A supports accessing files larger than 5 GB, and it provides performance enhancements and other improvements. For Apache Hadoop, S3A is the successor to S3N and is backward compatible with S3N. Using Apache Hadoop, all objects accessible from s3n:// URLs should also be accessible from S3A by replacing the URL scheme.

  • Uses AWS SDK.
  • Amazon EMR does not currently support use of the Apache Hadoop S3A file system.

EMRFS:

On Amazon EMR, both the s3:// and s3n:// URIs are associated with the EMR filesystem and are functionally interchangeable in the context of Amazon EMR. For consistency sake, however, it is recommended to use the s3:// URI in the context of Amazon EMR.

EMRFS can be used by invoking the prefix s3n:// or s3:// or s3a:// depending on the client application implementation.

Source: https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/

S3 and Parallel Processing – DirectFileOutputCommitter


The problem:

While a Hadoop Job is writing output, it will write to a temporary directory:
Task1 –> /unique/temp/directory/task1/file.tmp
Task2 –> /unique/temp/directory/task2/file.tmp

When the tasks finish the execution, will move (commit) the temporary file to a final location.

This schema makes possible the support speculative execution feature on Hadoop.

Moving the task output to its final destination (commit), involves a Rename operation. This rename operation, on a normal filesystem is just a change of pointer in the FS metadata.

Now, as S3 is not a filesystem, rename operations are more costly: it will involve a copy (Put) + Delete operation.

The solution:

In Mapreduce (this behavior can be different for other applications), to avoid these expensive operations, we can change the mapred-site.xml file, “mapred.output.committer.class” property to “org.apache.hadoop.mapred.DirectFileOutputCommitter”, so the the task output directly to it’s final destination.

<property>
  <name>mapred.output.committer.class</name>
  <value>org.apache.hadoop.mapred.DirectFileOutputCommitter</value>
</property>

For this and other useful parallel processing S3 considerations, please have a look here:

http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-errors-io.html#emr-troubleshoot-errors-io-1
https://aws.amazon.com/blogs/aws/amazon-s3-performance-tips-tricks-seattle-hiring-event/
http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html

Running Spark with oozie


Oozie 4.2 now supports spark-action.

Example job.properties file (configuration tested on EMR 4.2.0):

nameNode=hdfs://172.31.25.17:8020 
jobTracker=172.31.25.17:8032 
master=local[*] 
queueName=default 
examplesRoot=examples 
oozie.use.system.libpath=true 
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/spark

(Use the master node internal IP instead of localhost in the nameNode and jobTracker)

Validate oozie workflow xml file:

oozie validate workflow.xml

Example workflow.xml file:

<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'>
 <start to='spark-node' />
<action name='spark-node'>
 <spark xmlns="uri:oozie:spark-action:0.1">
 <job-tracker></job-tracker>
 <name-node></name-node>
 <prepare>
 <delete path="/user/${wf:user()}//output-data/spark"/>
 </prepare>
 <master></master>
 <name>Spark-FileCopy</name>
 <class>org.apache.oozie.example.SparkFileCopy</class>
 <jar>/user/${wf:user()}//apps/spark/lib/oozie-examples.jar</jar>
 <arg>/user/${wf:user()}//input-data/text/data.txt</arg>
 <arg>/user/${wf:user()}//output-data/spark</arg>
 </spark>
 <ok to="end" />
 <error to="fail" />
 </action>
<kill name="fail">
 <message>Workflow failed, error
 message[${wf:errorMessage(wf:lastErrorNode())}]
 </message>
 </kill>
 <end name='end' />
 </workflow-app>

Create the defined structure in HDFS and copy the proper files:

hadoop fs -ls /user/hadoop/examples/apps/spark/
Found 3 items
drwxr-xr-x - hadoop hadoop 0 2015-12-18 08:13 /user/hadoop/examples/apps/spark/lib
-rw-r--r-- 1 hadoop hadoop 1920 2015-12-18 08:08 /user/hadoop/examples/apps/spark/workflow.xml

hadoop fs -put workflow.xml /user/hadoop/examples/apps/spark/

hadoop fs -put /usr/share/doc/oozie-4.2.0/examples/apps/spark/lib/oozie-examples.jar /user/hadoop/examples/apps/spark/lib

hadoop fs -mkdir -p /user/hadoop/examples/input-data/text

hadoop fs -mkdir -p /user/hadoop/examples/output-data/spark

hadoop fs -put /usr/share/doc/oozie-4.2.0/examples/input-data/text/data.txt /user/hadoop/examples/input-data/text/

 

Run your oozie Job:

oozie job --oozie http://localhost:11000/oozie -config ./job.properties -run

Check oozie job:

oozie job -info 0000004-151203092421374-oozie-oozi-W

Check available sharelib:

$ oozie admin -shareliblist -oozie http://localhost:11000/oozie
[Available ShareLib] 
oozie 
hive 
distcp 
hcatalog 
sqoop 
mapreduce-streaming 
spark 
hive2 
pig

 

 

References:

https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html

 

Hadoop: Output Commiter Notes


OutputCommitter describes the commit of task output for a MapReduce job.

The MapReduce framework relies on the OutputCommitter of the job to:

  • Set up the job during initialization; for example, create the temporary output directory for the job. Job setup is done by a separate task when the job is in PREP state and after initializing tasks. Once the setup task completes, the job is moved to the RUNNING state.
  • Clean up the job after job completion; for example, remove the temporary output directory. Job cleanup is done by a separate task at the end of the job. The job is declared SUCCEEDED, FAILED, or KILLED after the cleanup task completes.
  • Set up the task temporary output. Task setup is done as part of the same task, during task initialization.
  • Check whether a task needs a commit. This prevents unnecessary commit procedures.
  • Commit the task output. After the task is done, the task commits its output, if required.
  • Discard the task commit. If the task is failed or killed, the output is cleaned up. If the task could not clean up (in an exception block), a separate task is launched with the same attempt ID to do the cleanup.

FileOutputCommitter is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce containers, whichever is available on the NodeManager. The JobCleanup task, TaskCleanup tasks, and JobSetup task have the highest priority, in that order.

Task Side-Effect Files

In some applications, component tasks need to create or write to side files, which differ from the actual job output files.

In such cases, two instances of the same Mapper or Reducer could be running simultaneously (for example, speculative tasks), trying to open or write to the same file on the file system. You must pick unique names per task-attempt (using the attemptid, say attempt_200709221812_0001_m_000000_0), not just per task.

To avoid these issues, the MapReduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} subdirectory accessible via ${mapreduce.task.output.dir} for each task attempt on the FileSystem where the output of the task attempt is stored.

On successful completion of the task attempt, the files in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) are promoted to ${mapreduce.output.fileoutputformat.outputdir}/. The framework discards the subdirectory of unsuccessful task attempts. This process is completely transparent to the application.

You can use this feature by creating any required side files during execution of a task in ${mapreduce.task.output.dir} via FileOutputFormat.getWorkOutputPath(). The framework promotes them similarly for successful task attempts. This eliminates the need to pick unique paths per task attempt.

Note: The value of ${mapreduce.task.output.dir} during execution of a particular task attempt is actually ${mapreduce.output.filetoutputformat.outputdir}/temporary/${taskid}; this value is set by the MapReduce framework. To use this feature, create files in the path returned by FileOutputFormat.getWorkOutputPath() from the MapReduce task.

The entire discussion holds true for maps of jobs with reducer=NONE (that is, 0 reduces) because output of the map, in that case, goes directly to HDFS.

References: http://www.cloudera.com/content/www/en-us/documentation/other/tutorial/CDH5/Hadoop-Tutorial/ht_job_output.html

yarn: execute a script on all the nodes in the cluster


This is more Linux script related, but, sometimes we have a Hadoop (YARN) cluster running and we need to run a post install script or activity that executes on all the nodes in the cluster:

for i in `yarn node --list | cut -f 1 -d ':' | grep "ip"`; do ssh -i your-key.pem hadoop@$i 'hadoop fs -copyToLocal s3://mybucket/myscript.sh | chmod +x /home/hadoop/myscript.sh | /home/hadoop/myscript.sh' ; done

Note: we will need the your-key.pem file in the master node.

HDFS: changing the replication factor


The replication factor is a property that can be set in the HDFS configuration file that will allow you to adjust the global replication factor for the entire cluster. For each block stored in HDFS, there will be n – 1 duplicated blocks distributed across the cluster.

File conf/hdfs-site.xml is used to configure HDFS. Changing the dfs.replication property in hdfs-site.xml will change the default replication for all files placed in HDFS.

<name>dfs.replication<name> 
<value>3<value>

You can also change the replication factor on a per-file basis using the Hadoop FS shell.

To set the replication factor to 1 to all the files in a directory, you will need:

hadoop fs -setrep -w 1 -R /my/dir