Hadoop

Apache Hadoop is an open-source software framework written in Java for distributed processing of large data sets (being processed by MapReduce).

Links

The idea - MapReduce

You have very large input files. Those files can be split and each chunk can be processed seperately. Each map run will take one part of the input and map it into a key value pair. In the shuffle step the results from map runs is grouped by its key and assigned to a reduce process. Finally the reduce step will do something with the key value pairs it got. If there is more than one reduce run for a key their results are fed again into a reduce run. The map and reduce steps can be distributed to a lot of different nodes (computer) and a distributed file system helps to send the data to them and get the results back.

Setting up Hadoop and getting started

Read here how to set up Hadoop.

The standalone mode

Does not really use the whole distribution thing but is easy to set up and also easy to debug. Once it works it's easy to switch. So just set a an Java Eclipse project, add some jars from the Hadoop installation and copy the Hadoop example from below into a class. Start the main method and it should start right away. The example from below will expect two parameters, the input folder and the not yet existing output folder.

Pseudo-Distributed Operation

Already with the Hadoop Distributed Filesystem (HDFS) but everything runs on one computer, is not yet distributed. Within the Hadoop installation folder you will find an etc folder. Required changes

Download the Hadoop Binary, extract the tar, put the folder somewhere, add its bin and sbin folder to your path (for convenience). Set also JAVA_HOME to your Java Installation. Test that it works

# hadoop version

Some minimum configuration files

etc/hadoop/core-site.xml:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

etc/hadoop/hdfs-site.xml:

<configuration>

    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

etc/hadoop/mapred-site.xml:

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

etc/hadoop/yarn-site.xml:

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
</configuration>

Ready to initialise the system

bin/hdfs namenode -format

Check that the web interface for the Hadoop NameNode works

http://localhost:50070/

Start it

sbin/start-yarn.sh

Check if the ResourceManager works

http://localhost:8088/

Create folders on the HDFS

bin/hdfs dfs -mkdir /user/
bin/hdfs dfs -mkdir /user/yourusername

Now copy the input files of your problem from you local filesystem to the HDFS

bin/hdfs dfs -put /home/john/problems2besolved/MyHadoopInputFiles/ input

Do a ls on it to check if they are there http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/FileSystemShell.html

bin/hdfs dfs -ls input

Once you have a jar containing a Hadoop program you can also start it on the command line:

hadoop jar MyWordCountJar.jar de.tgunkel.hadooptest.WordCount input/ output

If it dies right away, it might not find the JAVA_HOME. Try to set it manually in hadoop-env.sh

When you're done, stop the daemons with:

sbin/stop-yarn.sh

Hadoop Cluster Setup

In our setup we will have one Hadoop master (we use the virtual machine hadoop1 for it) and serveral slaves.

Let's start to create the virtual machines, or go buy some computers :-)

vserver hadoop1 build -m debootstrap --hostname hadoop1.localdomain.tgunkel.de --netdev heimnetz --interface 192.168.178.221 --context 225 -- -d jessie
vserver hadoop2 build -m debootstrap --hostname hadoop2.localdomain.tgunkel.de --netdev heimnetz --interface 192.168.178.222 --context 226 -- -d jessie
vserver hadoop3 build -m debootstrap --hostname hadoop3.localdomain.tgunkel.de --netdev heimnetz --interface 192.168.178.223 --context 227 -- -d jessie

Install Java on all of them, e.g. with Ubuntu

apt-get install oracle-java8-installer

Go to the master computer

mkdir /usr/local/hadoop/
cd    /usr/local/hadoop/
wget http://.../apache/hadoop/common/hadoop-2.X.0/hadoop-2.X.0.tar.gz
tar xzf hadoop-2.X.0.tar.gz
export JAVA_HOME="/usr/lib/jvm/java-8-oracle/jre/"
export PATH="/usr/local/hadoop/hadoop-2.7.0/bin:/usr/local/hadoop/hadoop-2.7.0/sbin:$PATH"
adduser --uid 1027 --disabled-password hadoop
chown -R hadoop:hadoop /usr/local/hadoop
su - hadoop
ssh-keygen

In the file core-site.xml provide the name of the master node

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://hadoop1.localdomain.tgunkel.de:9000/</value>
  </property>
  <property>
    <name>dfs.permissions</name>
    <value>false</value>
  </property>
</configuration>

Now create a folder for the HDFS filesystem

mkdir /usr/local/hadoop/myHDFS_FilesystemFolder
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder/name
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder/name/data
chown -R hadoop:hadoop /usr/local/hadoop/myHDFS_FilesystemFolder

In the hdfs-site.xml provide the folder and on how many nodes the files should be replicated

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>dfs.data.dir</name>
    <value>/usr/local/hadoop/myHDFS_FilesystemFolder/name/data</value>
    <final>true</final>
  </property>

  <property>
    <name>dfs.name.dir</name>
    <value>/usr/local/hadoop/myHDFS_FilesystemFolder/name</value>
    <final>true</final>
  </property>

  <property>
    <name>dfs.replication</name>
    <value>2</value>
  </property>
</configuration>

Do not use raid filesystems for slaves. Instead format each disk you want to use with plain ext3 (recommend) and list them all in the hdfs-site.xml with a folder or the / folder. This is faster and more reliable than raid (because Hadoop will reduplicate data automatically if a disk fails).

 <property>
  <name>dfs.datanode.data.dir</name>
  <value>/disk1/hdfs/data,/disk2/hdfs/data</value>
 </property>

In the file mapred-site.xml also provide the name of the master

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>hadoop1.localdomain.tgunkel.de:9001</value>
  </property>
</configuration>

No go on all slave nodes and do this (use uid which is free on all nodes):

adduser --uid 1027 --disabled-password hadoop
mkdir /usr/local/hadoop/
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder/name
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder/name/data
chown -R hadoop:hadoop /usr/local/hadoop
su - hadoop
ssh-keygen

The master must be able to login to itself and all slaves via ssh and its key. So on the master in the .ssh folder of the hadoop user do

cat id_rsa.pub >> authorized_keys

and copy it into the .ssh folder of all slaves

Try to login from the master to all slaves.

So far the Hadoop installation on the master does also fit for the slaves, so just copy it over

scp -r /usr/local/hadoop/hadoop-2.7.0/ hadoop2:/usr/local/hadoop/
scp -r /usr/local/hadoop/hadoop-2.7.0/ hadoop3:/usr/local/hadoop/

Now go to the master The configuration file masters contains only ourself

hadoop1.localdomain.tgunkel.de

FIXME: Does it sometimes also make sense to add the master as a slave as well?

No go to the master as the hadoop user and format the HDFS filesystem

bin/hdfs namenode -format

The connection between your nodes might be different. Some nodes might be in the same rack within a datacenter, others might be in total different data centers. Should should provide a Hadoop topology script and reference to it via

net.topology.script.file.name

The result should be /datacenter/rack and if two nodes have the same value in it Hadoop assumes they are in the same datacenter or even the same rack.

Start the filesystem

sbin/start-dfs.sh

The first start may ask you to confirm all the ssh keys fingerprints.

The status page should now work

http://192.168.178.221:50070

At first, none of my data nodes could connect to the master

INFO org.apache.hadoop.ipc.Server: IPC Server handler 5 on 9000, call org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.registerDatanode from 192.168.178.222:52275 Call#1 Retry#0 org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException: Datanode denied communication with namenode because hostname cannot be resolved (ip=192.168.178.222, hostname=192.168.178.222): DatanodeRegistration(192.168.178.222:50010,

Problem was that reverse dns of some nodes returned the same value by mistake. Until I figured out this was a workaround to start it anyway

./hdfs getconf -namenodes

Temporary fix, add this to hdfs-site.xml on master

  <property>
    <name>dfs.namenode.datanode.registration.ip-hostname-check</name>
    <value>false</value>
  </property>

You can also connect a node manually

./hadoop-daemon.sh start datanode

Now start Hadoop

sbin/start-yarn.sh

Watch out, in the default configuration there may be only one reducer which might be a performance issue for you. So reconfigure that!

Simple Hadoop example

A very typical example for understanding Hadoop is the task to count how often every distinct word appears in a very (very very) large text file. A traditional approach would be to read through the file word by word, put every word you read into a Hashmap like structure and count. With Hadoop this problem can be distributed into many nodes working in parallel.

In order to count how often every distinct word in the input appears in text file, we split the problem into a Mapping and a Reduce part. For the mapping we get a row of text, we split it into all words in that line and return the word as a key and the number 1 as a value. During the reduce we take all that keys and sum up the values for the keys. Results of several reduce runs for the same key (word) may be feed into a new reduce run.

package de.tgunkel.hadooptest;
import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class WordCount {
        /* expects two parameters, the input folder and the output folder (not allowed to exist already)
         *
         */

        public static void main(String[] args) throws Exception {
                JobConf conf = new JobConf(WordCount.class);
                conf.setJobName("wordcount");

                conf.setOutputKeyClass(Text.class);
                conf.setOutputValueClass(IntWritable.class);

                conf.setMapperClass(MyMapClass.class);
                conf.setCombinerClass(MyReducer.class);
                conf.setReducerClass(MyReducer.class);

                conf.setInputFormat(TextInputFormat.class);
                conf.setOutputFormat(TextOutputFormat.class);

                FileInputFormat.setInputPaths(conf,  new Path(args[0]));
                FileOutputFormat.setOutputPath(conf, new Path(args[1]));

                JobClient.runJob(conf);
        }

        // Map
        public static class MyMapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
                // 1 as an Hadoop IntWriteable
                private final static IntWritable ONE = new IntWritable(1);

                private Text word = new Text();

                /* @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
                 * key is an offset where the data comes from (not used)
                 * value is one row of the input text file
                 * output is of the form word -> 1
                 */

                public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
                        String line = value.toString();
                        StringTokenizer tokenizer = new StringTokenizer(line);
                        while (tokenizer.hasMoreTokens()) {
                                word.set(tokenizer.nextToken());
                                // here we can also suppress values, we ignore the word frustration here as an example
                                if(!"badword".equals(word.toString())) {
                                        // here we return as a result a single word from one line as a key and as a value the number 1
                                        // the number 1 will be replaced in the reducer with the number of occurrences of the word
                                        output.collect(word, ONE);     
                                }                              
                        }
                }
        }

        // Reduce
        public static class MyReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

                /*
                 * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
                 *
                 * Key: a single word
                 * Value: the number of occurrences for the key word so far. On the first calls to this method we expect there only to be lists of 1s for every occurrence of the word
                 *        later on this method might also be called to reduce previous results again. In this case there is word -> number counted so far
                 */

                public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
                        System.err.println("New mapper started for "+key);
                        int sum = 0;
                        while (values.hasNext()) {
                                int x=values.next().get();
                                if(x>1) {
                                        System.err.println("Reduce again for key "+key.toString()+" -> "+x);
                                }
                                sum +=x;
                        }
                        // output is key -> number
                        output.collect(key, new IntWritable(sum));
                }
        }
}

Hadoop with Maven

<project>
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.hadoopbook</groupId>
  <artifactId>hadoop-book-mr-dev</artifactId>
  <version>4.0</version>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.5.1</hadoop.version>
  </properties>
  <dependencies>
    <!-- Hadoop main client artifact -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <!-- Unit test artifacts -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.mrunit</groupId>
      <artifactId>mrunit</artifactId>
      <version>1.1.0</version>
      <classifier>hadoop2</classifier>
      <scope>test</scope>
    </dependency>
    <!-- Hadoop test artifact for running mini clusters -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-minicluster</artifactId>
      <version>${hadoop.version}</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <finalName>hadoop-examples</finalName>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.5</version>
        <configuration>
          <outputDirectory>${basedir}</outputDirectory>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

Chaining Hadoop Tasks

Instead of having more complex MapReduce jobs, try to create more and simple MapReduce jobs. One benefit of this is a re-usability.

You might even split each Mapper into a input format parser, a field selector and a filter. Mappers can be chained with aChainMapper followed by a ChainReducer.

In order to control those jobs you can start jobs one by one and wait for the previous ones to stop

JobClient.runJob(conf1);
// JobClient.waitForCompletion();
JobClient.runJob(conf2);

Or you use the

org.apache.hadoop.mapreduce.jobcontrol.JobControl
org.apache.hadoop.mapred.jobcontrol.JobControl

or Apache Oozie.

Unit Tests for MapReduce

Having with MRunit JUnit tests for Hadoop MapReduce solutions.

import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.*;

public class MyTestClass
{
  @Test
  public void myTest() throws IOException, InterruptedException
  {
   Text testValue = new Text("123456789abcdef:42");
   new MapDriver<LongWritable, Text, Text, IntWritable>()
     .withMapper(new MyMapperWeJustTest())
     .withInput(new LongWritable(0), testValue)
     .withOutput(new Text("ExpectedKey"), new IntWritable(expectedValue))
     .runTest();
  }
}

If there should be no result (test for irrelevant or bad input value) just ommit the .withOutput

Tests for the Reducer work the same way

new ReduceDriver<Text, IntWritable, Text, IntWritable>()
  .withReducer(new MyReducerWeJustTest())
  .withInput(new Text("AKey"), Arrays.asList(new IntWritable(aValue), new IntWritable(anotherValue)))
  .withOutput(new Text("AKey"), new IntWritable(expectedValue))
  .runTest();

Debugging and logging

Use a counter to count how often something happend

enum MyCounter
{
 problem_a
}

...

System.err.println("Problem occurred!);

context.getCounter(MyCounter.problem_a).increment(1);

See also

mapreduce.map.log.level
mapreduce.reduce.log.level

Set Java options

mapred.child.java.opts

to keep a failed task's files

mapreduce.task.files.preserve.failedtasks

Performance

Mappers should get enough data to run for some time. Only a few seconds is probably to short. There should be more than one reducer and each should run for about 5 minutes.

Combiner may improve performance because they reduce the amount of data exchange between Mapper and Reducers.

job.setCombinerClass(MyCombiner.class);

Sometimes you can even use your Reducer class for this without any change.

Map output compression

If you implement Writeable, also implement RawComparator.

Check the shuffle options.

Enable HPROF profiler

hadoop jar hadoop-examples.jar ... -D mapreduce.task.profile=true ...

Select which IDs of mapper and reducer are profiled (you don't need to profile them all). Default is 0,1,2

mapreduce.task.profile.maps
mapreduce.task.profile.reduces
mapreduce.task.profile.params
mapreduce.task.profile.maps
mapreduce.task.profile.reduces
mapred.child.java.opts

Hadoop Distributed Filesystem (HDFS)

http://wiki.apache.org/hadoop/DiskSetup

Master provides a status page for your HDFS

http://hadoop1.localdomain.tgunkel.de:50070/

Format the HDFS like you format a normal filesystem

hdfs namenode -format

Create a folder

hdfs dfs -mkdir /folder/foo

Set a limit home much space you may use

hdfs dfsadmin -setSpaceQuota 1t /folder/in/your/hdfs

Copy files from a computer into the HDFS

hdfs dfs -put /home/john/problems2besolved/MyHadoopInputFiles/ /folder/in/your/hdfs
hdfs dfs -ls /folder

AVRO

AVRO is a language neutral data serialization system.

Parquet

Parquet is a columnar storage format to efficiently store nested data.

Flume

Flume allows you to use streams as an input for Hadoop instead of fixed data like for example files.

Sqoop

Apache Sqoop allows you to use structured data stores (e.g. SQL database) as an input for Hadoop instead of e.g. files.

PIG

PIG allows you to specify MapReduce jobs with its Pig Latin language which is supposed to be easier to use than MapReduce Jobs.

HIVE

Apache Sqoop converts SQL queries into a series of MapReduce jobs for execution.

Crunch

Apache Crunch is a higher-level API for writing MAPReduce pipelines with richer data transformations and multistage pipelines.

ZooKeeper

Apache ZooKeeper Hadoop distributed coordination service

Ambari

Ambari provides an intuitive, easy-to-use Hadoop management web UI backed.