Wednesday, February 24, 2016

How To Install Cassandra and Run a Single-Node Cluster on Ubuntu 14.04


Cassandra, or Apache Cassandra, is a highly scalable open source NoSQL database system, achieving great performance on multi-node setups.
In this tutorial, you’ll learn how to install and use it to run a single-node cluster on Ubuntu 14.04.


To complete this tutorial, you will need the following:

Step 1 — Installing the Oracle Java Virtual Machine

Cassandra requires that the Oracle Java SE Runtime Environment (JRE) be installed. So, in this step, you'll install and verify that it's the default JRE.
To make the Oracle JRE package available, you'll have to add a Personal Package Archives (PPA) using this command:

$ sudo add-apt-repository ppa:webupd8team/java

Update the package database:

$ sudo apt-get update

Then install the Oracle JRE. Installing this particular package not only installs it but also makes it the default JRE. When prompted, accept the license agreement: 

$ sudo apt-get install oracle-java8-set-default
After installing it, verify that it's now the default JRE: 
$java -version 

You should see output similar to the following:
java version "1.8.0_60" Java(TM) SE Runtime Environment (build 1.8.0_60-b27) Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

Step 2 — Installing Cassandra

We'll install Cassandra using packages from the official Apache Software Foundation repositories, so start by adding the repo so that the packages are available to your system. Note that Cassandra 2.2.2 is the latest version at the time of this publication. Change the 22x to match the latest version. For example, use 23x if Cassandra 2.3 is the latest version:

$ echo "deb 22x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list

The add the repo's source:
$echo "deb-src 22x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list 
To avoid package signature warnings during package updates, we need to add three public keys from the Apache Software Foundation associated with the package repositories.
Add the first one using this pair of commands, which must be run one after the other:

 $   gpg --keyserver --recv-keys F758CE318D77295D
 $   gpg --export --armor F758CE318D77295D | sudo apt-key add -

Then add the second key:

 $   gpg --keyserver --recv-keys 2B5C1B00
 $   gpg --export --armor 2B5C1B00 | sudo apt-key add -

Then add the third:

  $  gpg --keyserver --recv-keys 0353B12C
  $  gpg --export --armor 0353B12C | sudo apt-key add -

Update the package database once again:

  $  sudo apt-get update

Finally, install Cassandra:


$ sudo apt-get install cassandra

If it is not running, the following output will be displayed:

* could not access pidfile for Cassandra
This is a well-known issue with the latest versions of Cassandra on 
Ubuntu. We'll try a few fixes. First, start by editing its init script. 
The parameter we're going to modify is on line 60 of that script, so 
open it using:
$sudo nano +60 /etc/init.d/cassandra
That line should read:  
Change it to:  
Close and save the file, then reboot the server:
$sudo reboot
$sudo shutdown -r now
After logging back in, Cassandra should now be running. Verify:
$sudo service cassandra status    
If you are successful, you will see: 

* Cassandra is running

Step 4 — Connecting to the Cluster

 If you were able to successfully start Cassandra, check the status of the cluster:

$sudo nodetool status

In the output, UN means it's Up and Normal:

Datacenter: datacenter1 ======================= Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- Address Load Tokens Owns Host ID Rack UN 142.02 KB 256 ? 2053956d-7461-41e6-8dd2-0af59436f736 rack1 Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
Then connect to it using its interactive command line interface cqlsh.
You will see it connect:
Connected to Test Cluster at [cqlsh 5.0.1 | Cassandra 2.2.2 | CQL spec 3.3.1 | Native protocol v4] Use HELP for help. cqlsh>
Type exit to quit:


Congratulations! You now have a single-node Cassandra cluster running on Ubuntu 14.04. More


Hadoop Installation : ssh-keygen -t rsa -P ""

$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java6-installer
$ sudo addgroup hadoop
$ sudo adduser —ingroup hadoop hduser
$ sudo apt-get install openssh-server
$ su - hduser
$ ssh-keygen -t rsa -P ""
$ cat $HOME/.ssh/ >> $HOME/.ssh/authorized_keys
$ wget
$ cd /home/hduser
$ tar xzf hadoop-1.1.2.tar.gz
$ mv hadoop-1.1.2 hadoop
# Set Hadoop-related environment variables
export HADOOP_PREFIX=/home/hduser/hadoop
The next one points to the Java home directory. We need to make sure that it is pointing to Oracle Java
# Set JAVA_HOME (we will also configure JAVA_HOME directly for Hadoop later on)
export JAVA_HOME=/usr/lib/jvm/java-6-oracle
The last one is to update the PATH to include the Hadoop Home directory
# Add Hadoop bin/ directory to PATH
export JAVA_HOME=/usr/lib/jvm/java-6-oracle
$ mkdir /home/hduser/tmp

A base for other temporary directories.
The name of the default file system. A URI whose
scheme and authority determine the FileSystem implementation. The
uri’s scheme determines the config property (fs.SCHEME.impl) naming
the FileSystem implementation class. The uri’s authority is used to
determine the host, port, etc. for a filesystem.


The host and port that the MapReduce job tracker runs
at. If “local”, then jobs are run in-process as a single map
and reduce task.


Default block replication.
The actual number of replications can be specified when the file is created.
The default is used if replication is not specified in create time.

$ hadoop namenode -format
$ jps
$ hadoop jar hadoop-examples-1.1.2.jar pi 3 10

Sunday, February 14, 2016

ssh localhost

Someone could be eavesdropping on you right now (man-in-the-middle attack)!
It is also possible that a host key has just been changed.
The fingerprint for the ECDSA key sent by the remote host is
Please contact your system administrator.
Add correct host key in /home/hadoop/.ssh/known_hosts to get rid of this message.
Offending ECDSA key in /home/hadoop/.ssh/known_hosts:1
  remove with: ssh-keygen -f "/home/hadoop/.ssh/known_hosts" -R localhost
ECDSA host key for localhost has changed and you have requested strict checking.
Host key verification failed.

Saturday, February 13, 2016

How to setup password-less ssh to the slaves?

For setting up Hadoop on a cluster of machines, the master should be able to do a password-less ssh to start the daemons on all the slaves.

Class MR - master starts TaskTracker and the DataNode on all the slaves.

MRv2 (next generation MR) - master starts NodeManager and the DataNode on all the slaves.

Here are the steps to setup password-ssh. Ensure that port 22 is open on all the slave (`telnet slave-hostname 22` should connect).

1) Install openssh-client on the master

sudo apt-get install openssh-client
2) Install openssh-server on all the slaves
sudo apt-get install openssh-server
3) Generate the ssh key
ssh-keygen -t rsa -P "" -f ~/.ssh/id_rsa
4) Copy the key to all the slaves (replace username appropriately as the user starting the Hadoop daemons). Will be prompted for the password.
ssh-copy-id -i $HOME/.ssh/ username@slave-hostname
5) If the master also acts a slave (`ssh localhost` should work without a password)
cat $HOME/.ssh/ >> $HOME/.ssh/authorized_keys

If hdfs/mapreduce are run as different users then the steps (3,4 and 5) have to be repeated for all the users.

How to test ?

1) Run `ssh user@slave-hostname`. It should get connected without prompting for a password.

Adding a new SSH key to the ssh-agent

When generating an SSH key, you'll need to add your newly created (or existing) SSH key to the ssh-agent.
Before adding a new SSH key to the ssh-agent, you should have:
Tip: If you used an existing SSH key rather than generating a new SSH key, you'll need to replace id_rsa in the above command with the name of your existing private key file.
  1. Ensure ssh-agent is enabled:
    # start the ssh-agent in the background
    eval "$(ssh-agent -s)"
    Agent pid 59566
  2. Add your SSH key to the ssh-agent:
    ssh-add ~/.ssh/id_rsa

Generating a new SSH key

After you've checked for existing SSH keys, you can generate a new SSH key to use for authentication.
Before generating a new SSH key, you should have checked for existing SSH keys.
  1. In the command line, paste the text below, substituting in your GitHub email address.
    ssh-keygen -t rsa -b 4096 -C ""
    # Creates a new ssh key, using the provided email as a label
    Generating public/private rsa key pair.
  2. When you're prompted to "Enter a file in which to save the key," press Enter. This accepts the default file location.
    Enter a file in which to save the key (/Users/you/.ssh/id_rsa): [Press enter]
  3. At the prompt, type a secure passphrase. For more information, see "Working with SSH key passphrases".
    Enter passphrase (empty for no passphrase): [Type a passphrase]
    Enter same passphrase again: [Type passphrase again]
  4. In the command line, copy the alphanumeric key fingerprint you see:
    The key fingerprint is:
    If you're using OpenSSH 6.8 or newer, the key fingerprint is:
  5. Add the SSH key fingerprint you've generated to the ssh-agent and your GitHub account. For more information, see "Adding a new SSH key to the ssh-agent" and "Adding a new SSH key to your GitHub account".

Switching remote URLs from SSH to HTTPS

  1. Open Terminal (for Mac and Linux users) or the command prompt (for Windows users).
  2. Change the current working directory to your local project.
  3. List your existing remotes in order to get the name of the remote you want to change.
    git remote -v
    origin (fetch)
    origin (push)
  4. Change your remote's URL from SSH to HTTPS with the git remote set-url command.
    git remote set-url origin
  5. Verify that the remote URL has changed.
    git remote -v
    # Verify new remote URL
    origin (fetch)
    origin (push)
The next time you git fetch, git pull, or git push to the remote repository, you'll be asked for your GitHub username and password.

Switching remote URLs from HTTPS to SSH

  1. Open Terminal (for Mac and Linux users) or the command prompt (for Windows users).
  2. Change the current working directory to your local project.
  3. List your existing remotes in order to get the name of the remote you want to change.
    git remote -v
    origin (fetch)
    origin (push)
  4. Change your remote's URL from HTTPS to SSH with the git remote set-url command.
    git remote set-url origin
  5. Verify that the remote URL has changed.
    git remote -v
    # Verify new remote URL
    origin (fetch)
    origin (push)


You may encounter these errors when trying to changing a remote.

No such remote '[name]'

This error means that the remote you tried to change doesn't exist:
git remote set-url sofake
fatal: No such remote 'sofake'
Check that you've correctly typed the remote name.

Checking for existing SSH keys

Before you generate an SSH key, you can check to see if you have any existing SSH keys.
  1. Open the command line and enter:
    ls -al ~/.ssh
    # Lists the files in your .ssh directory, if they exist
  2. Check the directory listing to see if you already have a public SSH key.
By default, the filenames of the public keys are one of the following:
If you see an existing public and private key pair listed (for example and id_rsa) that you would like to use to connect to GitHub, you can add your SSH key to the ssh-agent.
If you don't have an existing public and private key pair, or don't wish to use any that are available to connect to GitHub, then generate a new SSH key.
Tip: If you receive an error that ~/.ssh doesn't exist, don't worry! We'll create it when we generate a new SSH key.

Testing your SSH connection

When you test your connection, you'll need to authenticate this action using your password, which is the SSH key passphrase you created earlier. For more information on working with SSH key passphrases, see "Working with SSH key passphrases".
  1. Open the command line and enter:
    ssh -T
    # Attempts to ssh to GitHub
    You may see one of these warnings:
    The authenticity of host ' (' can't be established.
    RSA key fingerprint is 16:27:ac:a5:76:28:2d:36:63:1b:56:4d:eb:df:a6:48.
    Are you sure you want to continue connecting (yes/no)?
    The authenticity of host ' (' can't be established.
    RSA key fingerprint is nThbg6kXUpJWGl7E1IGOCspRomTxdCARLviKw6E5SY8.
    Are you sure you want to continue connecting (yes/no)?
    Note: The example above lists the GitHub IP address as When pinging GitHub, you may see a range of IP addresses. For more information, see "What IP addresses does GitHub use that I should whitelist?"
  2. Verify that the fingerprint in the message you see matches the following message, then type yes:
    Hi username! You've successfully authenticated, but GitHub does not
    provide shell access.
  3. Verify that the resulting message contains your username. If you see a message that contains "access denied," see "Error: Permission denied (publickey)".
    If you receive a message about "access denied," you can read these instructions for diagnosing the issue.
  4. If you're switching from HTTPS to SSH, you'll need to update your remote repository URLs.


SSH (Secure Shell) is a network protocol secure data communication, remote shell services or command execution and other secure network services between two networked computers that it connects via a secure channel over an insecure network. The ssh server runs on a machine (server) and ssh client runs on another machine (client).

ssh has 2 main components :
1- ssh : The command we use to connect to remote machines - the client. 
2- sshd : The daemon that is running on the server and allows clients to connect to the server.
ssh is pre-enabled on Linux, but in order to start sshd daemon, we need to install ssh first. Use this command to do that :

$ sudo apt-get install ssh
This will install ssh on your machine. In order to check if ssh is setup properly do this :

$ which ssh
It will throw this line on your terminal

$ which sshd
It will throw this line on your terminal

SSH uses public-key cryptography to authenticate the remote computer and allow it to authenticate the user, if necessary. Well, there are numerous post and links that explain about ssh in much detail. You can just google ssh if you want to learn about it. I'll now show the steps required to configure ssh.

1- First of all create a ssh-keypair using this command :
$ ssh-keygen -t rsa -P ""
Once you issue this command it will ask you for the name of directory where you want to store the key. Simple hit enter without giving any name, and your key will be created and saved to the default location i.e /.ssh directory inside your home directory. (Files and directories having names tarting with a dot (.) are hidden files and directories in Linux. To see these files and directories just go to your home folder and press Ctrl+h).

cluster@ubuntu:~$ ssh-keygen -t rsa -P ""
Generating public/private rsa key pair.
Enter file in which to save the key (/home/cluster/.ssh/id_rsa):
Hit enter, and you will see something like this :

Your identification has been saved in /home/cluster/.ssh/id_rsa.
Your public key has been saved in /home/cluster/.ssh/
The key fingerprint is:
66:4f:72:26:2b:18:57:43:64:4f:3e:5a:58:d1:2c:30 cluster@ubuntu
The key's randomart image is:
+--[ RSA 2048]-----+
|             .E.++          |
|              o B. o         |
|              + =.           |
|              . + .            |
|           . . S +            |
|           + o O            |
|           . . . .              |
|               .                 |
|                                 |

Your keypair has been created now. The keypair contains the keys in 2 different files present under the .ssh directory. These are id_rsa (the private key) and (the public key). 

To connect to a remote machine, just give ssh command along with the hostname of that machine. 

NOTE : Hostname of the machine to which you want to ssh must be present in your /etc/hosts file along with its IP address.

For example, if you want to connect to machine called 'client', do this :

cluster@ubuntu:~$ ssh localhost 
cluster@localhost's password:

Enter the password of the client machine to login. Now, you are at the terminal of the client machine. Just use a few commands like ls or cat to cross check. Once you give the password and hit enter you will see something like this on your terminal :

cluster@ubuntu:~$ ssh client
cluster@client's password: 
Welcome to Ubuntu 12.04 LTS (GNU/Linux 3.2.0-26-generic x86_64)

 * Documentation:

90 packages can be updated.
10 updates are security updates.

Last login: Fri Jul 20 01:08:28 2012 from client

NOTE : In some cases you may want to use passwordless ssh (for example while working with Apache Hadoop). To do that you just have to copy the public key, i'e the content of your file to the authorized_keys in the .ssh directory of the client machine. Use the following command to do that :

cluster@ubuntu:~$cat $HOME/.ssh/ >> $HOME/.ssh/authorized_keys

Now, if you do ssh client, you won't be asked for any password.

cluster@ubuntu:~$ ssh client 
Welcome to Ubuntu 12.04 LTS (GNU/Linux 3.2.0-26-generic x86_64)

 * Documentation:

90 packages can be updated.
10 updates are security updates.

Last login: Fri Jul 20 01:08:28 2012 from client

Friday, February 12, 2016

Setting up a Hadoop cluster - Part 1: Manual Installation


In the last few months I was tasked several times with setting up Hadoop clusters. Those weren't huge - two to thirteen machines - but from what I read and hear this is a common use case especially for companies just starting with Hadoop or setting up a first small test cluster. While there is a huge amount of documentation in form of official documentation, blog posts, articles and books most of it stops just where it gets interesting: Dealing with all the stuff you really have to do to set up a cluster, cleaning logs, maintaining the system, knowing what and how to tune etc.

I'll try to describe all the hoops we had to jump through and all the steps involved to get our Hadoop cluster up and running. Probably trivial stuff for experienced Sysadmins but if you're a Developer and finding yourself in the "Devops" role all of a sudden I hope it is useful to you.

While working at GBIF I was asked to set up a Hadoop cluster on 15 existing and 3 new machines. So the first interesting thing about this setup is that it is a heterogeneous environment: Three different configurations at the moment. This is where our first goal came from: We wanted some kind of automated configuration management. We needed to try different cluster configurations and we need to be able to shift roles around the cluster without having to do a lot of manual work on each machine. We decided to use a tool called Puppet for this task.

While Hadoop is not currently in production at GBIF there are mid- to long-term plans to switch parts of our infrastructure to various components of the HStack. Namely MapReduce jobs with Hive and perhaps Pig (there is already strong knowledge of SQL here) and also storing of large amounts of raw data in HBase to be processed asynchronously (~500 million records until next year) and indexed in a Lucene/Solr solution possibly using something like Katta to distribute indexes. For good measure we also have fairly complex geographic calculations and map-tile rendering that could be done on Hadoop. So we have those 18 machines and no real clue how they'll be used and which services we'd need in the end.


As mentioned before we have three different server configurations. We've put those machines in three logical clusters c1, c2 and c3 and just counting up in those (our master for example is currently running on c1n1):

  • c1 10: Intel(R) Xeon(R) CPU X3363 @ 2.83GHz, 2x6MB (quad), 8 GB RAM, 2 x 500GB SATA 7.2K
  • c2 3: 2 x Intel(R) Xeon(R) CPU E5630 @ 2.53GHz (quad), 24 GB RAM, 6 x 250 GB SATA 5.4K
  • c3 5: Intel(R) Xeon(R) CPU X3220 @ 2.40GHz (quad), 4 GB RAM, 2 x 160 GB SATA 7.2K
  • CentOS 5.5
  • The machines are in different racks but connected to only one switch
We realize that this is a very heterogeneous cluster configuration. We also realize that some people highly discourage use of old machines or machines with little RAM but the c1 and c3 clusters were old unused machines and this way they still serve a purpose and we've had no problems so far using this setup.


These were the goals we set out to achieve on our cluster and these are also all the things I'll try to describe in this or a following post:
  • Puppet for setting up the services and configuring machine state
  • CDH3 (Beta 3)
    • Hadoop HDFS + MapReduce incl. Hadoop LZO
    • Hue
    • Zookeeper
    • HBase
  • Easily distributable packages for Hadoop, Hive and Pig to be used by the employees to access the cluster from their own workstations
  • Benchmarks & Optimizations
Be warned: This is going to be a very long post and unfortunately it is the nature of these things that some of the information is bound to be outdated pretty quickly so let me know if something has changed and I'll alter the post.

Manual Installation

Before we use Puppet to do everything automatically I will show how it can be done manually. I think it is important to know all the steps in case something goes wrong or you decide not to use Puppet at all. When I talk about "the server" I always mean "all servers in your cluster" except when noted otherwise. I highly recommend not skipping this part even if you want to use Puppet.

Operating System

For now I'll just assume a vanilla CentOS 5.5 installation. There's nothing special you need. I recommend just the bare minimum, everything else needed can be installed at a later time. A few words though about things you might want to do: Your servers probably have multiple disks. You shouldn't use any RAID or LVM on any of your slaves (i.e. DataNodes/TaskTracker). Just use a JBOD configuration. In our cluster all disks are in a simple structure:
  • /mnt/disk1
  • /mnt/disk2
  • ...
There are also two tweaks for your slaves you can do:
  • Mount your data disks with noatime (e.g. /dev/sdc1 /mnt/disk3 ext3 defaults,noatime 1 2 which btw. implies nodiratime)
  • By default there are a certain number of blocks reserved on ext (not familiar with others) file systems (check by running tune2fs -l /dev/sdc1 and look at the Reserved block count). While this is useful on system disks so that critical processes can still write some data when the disk is full otherwise this is wasted space on our data disks. By default 5% of a HDD are reserved for this. I recommend setting this down to 1% by running: tune2fs -m 1 on all your data disks (i.e. tune2fs -m 1 /dev/sdc1) which frees up quite a bit of disk space. You can also set it to 0% if you want though I went with 1% for our cluster. Keep the default setting for your system disks though!
On your NameNode however use any means you feel necessary to secure your data. You know your requirements better than I do. Use RAID and/or LVM however you like. We don't have any special resources so our NameNode is running on one of our regular servers at the moment. We might change that in the future.

A note on Cloudera's Package system & naming

Cloudera provides the various components of Hadoop in different Packages but they follow a simple structure: There is one hadoop-0.20 package which contains all the jars, config files, directories, etc. needed for all the roles. And then there are packages like hadoop-0.20-namenode which are only a few kilobytes and they only contain the appropriate start- and stopscripts for the role in question.

1. Common Requirements

Most of the commands in this guide need to be executed as root. I've chosen the easy route here and just logged in as root. If you're operating as a non-privileged user remember to use su, sudo or any other means to ensure you have the proper rights.

Repository As all the packages we're going to install are provided by Cloudera we need to add their repository to our cluster:
curl > /etc/yum.repos.d/cloudera-cdh3.repo
Java installation You have to download the JDK from Oracle's website yourself as license issues prevent it from being added to the repositories. Chose the correct system (probably Linux x64) and make sure to download the file ending in -rpm.bin (i.e. jdk-6u23-linux-x64-rpm.bin). You might have to do this from a client machine because you need a browser that works with the Oracle site. So on any one machine execute the following:
unzip jdk-6u23-linux-x64-rpm.bin
You should now have a bunch of .rpm files but you only need one of them: jdk-6u23-linux-amd64.rpm. Copy this file to your servers and install it as root using rpm:
rpm -Uvh ./jdk-6u23-linux-amd64.rpm
While not a hard requirement it makes a lot of things easier if the clocks on your servers are synchronized. I added this part at the last minute because we just realized that ntpd was disabled on three of our machines (c2) by accident and had some problems with it. It is worth taking a look at the clocks now and set up ntp properly before you start.

It doesn't matter if you use a DNS server or hosts files or any other means for the servers to find each other. But make sure this works! Do it now! Even if you think everything's set up correctly. Another thing that you should check is if the local hostname resolves to the public IP address. If you're using a DNS server you can use dig to test this but that doesn't take into account the /etc/hosts file so here is a simple test to see if it is correct:
ping -c 1 `hostname`
This should resolve to the public IP and not to

Hadoop uses a lot of ports for its internal and external communications. We've just allowed all traffic between the servers in the cluster and clients. But if you don't want to do that you can also selectively open the required ports. I try to mention them but they can all be changed in the configuration files. I might also miss some due to our config so I'd be glad if someone could point those out to me.

We're going to use lzo compression, the Hadoop native libraries as well as hue so there are a few common dependencies on all machines in the cluster which can be easily installed:
rpm -Uvh
yum install -y lzo hue-plugins hadoop-0.20-native
We also need some directories later on so we can just create them now:
mkdir /hadoop
chown root:hadoop /hadoop
Cloudera uses the alternatives system to manage configuration. In /etc/hadoop/conf is the currently activated configuration. Look at the contents of /etc/hadoop and you'll find all the installed configurations. At the moment there is only a conf.empty directory which we'll use as our starting point:
cp -R /etc/hadoop/conf.empty /etc/hadoop/conf.cluster
Now feel free to edit the configuration files in /etc/hadoop/conf.cluster but we'll go through them as well later in this post. The last step is to activate this configuration:
/usr/sbin/alternatives --install /etc/hadoop-0.20/conf hadoop-0.20-conf /etc/hadoop-0.20/conf.cluster 50
LZO Due to licensing issues the LZO bindings for Hadoop cannot be distributed the same way as the rest of the packages. So this - once again - involves a few manual steps. After these bindings were removed from Hadoop itself a few versions ago they moved tho the hadoop-gpl-compression project on Google Code which (as far as I know) still works but hasn't seen any development for over a year. Thankfully though Twitter's Kevin Weil and Cloudera's Todd Lipcon have picked up the project and maintained it. They regularly sync their github repositories so both should have almost the same code. I'm going to use Todd's version here as it should be better synced with CDH releases. You have to download the code from the repository, build the native libraries as well as the jar file and distribute those files on your cluster. You need to do this only on one machine which ideally should run the same OS version as the servers in your cluster. When you're finished you can just copy the result to all servers. We're using version 0.4.9 so we use this to download and build:
rpm -Uvh
yum install -y lzo-devel
wget --no-check-certificate
tar xvfz toddlipcon-hadoop-lzo-0.4.9-0-g0e70051.tar.gz
tar jxvf apache-ant-1.8.2-bin.tar.gz
cd toddlipcon-hadoop-lzo-0e70051
JAVA_HOME=/usr/java/latest/ BUILD_REVISION="0.4.9" ../apache-ant-1.8.2/bin/ant tar
The ant version that comes with CentOS 5.5 didn't work for me that's why I downloaded a new one. This should leave you with a hadoop-lzo-0.4.9.tar.gz file in the build directory which you can extract to get all the necessary files for your servers:
  • hadoop-lzo-0.4.9.jar needs to be copied into /usr/lib/hadoop/lib on each server
  • lib/native/Linux-amd64-64 needs to be copied into /usr/lib/hadoop/lib/native on each server
cron & log cleaning
We've had a problem with unintentional debug logs filling up our hard drives. The investigations that followed that incident resulted in a blog post by Lars George explaining all the log files Hadoop writes. It is a worthwhile read. Hadoop writes tons of logs in various processes and phases and you should make sure that these don't fill up your hard drives. There are two instances in the current CDH3b3 where you have to manually interfere:
  • Hadoop daemon logs
  • Job XML files on the JobTracker
Hadoop uses a DailyRollingFileAppender which unfortunately doesn't have a maxBackupIndex setting like the RollingFileAppender. So either change the appender or manually clean up logs after a few days. We chose the second path and added a very simple cron job to run daily:
find /var/log/hadoop/ -type f -mtime +14 -name "hadoop-hadoop-*" -delete
This jobs deletes old log files after 14 days. We'll take care of the Job XML files in a similar way at the JobTracker.


One property needs to be set for both the NameNode and the DataNodes in the file /etc/hadoop/conf/ So just add this and replace $namenode with the IP or name of your NameNode:

2.1. NameNode
Installing the NameNode is straightforward:
yum install -y hadoop-0.20-namenode
This installs the startup scripts for the NameNode. The core package was already installed in the previous step. Now we need to change the configuration, create some directories and format the NameNode.

In /etc/hadoop/conf/hdfs-site.xml add the property which "determines where on the local filesystem the DFS name node should store the name table(fsimage). If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy." We mentioned before that we're using a JBOD configuration. We do this even for our NameNode. So in our case the NameNode has two disks mounted at /mnt/disk1 and /mnt/disk2 but you might want to write to just one location if you use RAID. As it says in the documentation the NameNode will write to each of the locations. You can write to a third location: A NFS mount which serves as a backup. Our configuration looks like this:
Make sure to create the dfs directories before starting the NameNode. They need to belong to hdfs:hadoop. Formatting the NameNode is all that's left:
su hdfs -c "/usr/bin/hadoop namenode -format"
Once you've done all that you can enable the service so it will be started upon system boot and start the NameNode:
chkconfig hadoop-0.20-namenode on
service hadoop-0.20-namenode start
You should be able to see the web interface on your namenode at port 50070 now. Ports that need to be opened to clients on the NameNode are 50070 (web interface, 50470 if you enabled SSL) and 8020 (for HDFS command line interaction). Only port 8020 needs to be enabled for all other servers in the cluster.

We also use a cron job to run the HDFS Balancer every evening:
/usr/lib/hadoop-0.20/bin/ -threshold 5

2.2 DataNodes
The DataNodes handle all the data by storing it and serving it to clients. You can run a DataNode on your NameNode and especially for small- or test clusters this is often done but as soon as you have more than three to five machines or rely on your cluster for production use you should use a dedicated NameNode. Setting the DataNodes up is easy though after all our preparations. We need to set the property in the file /etc/hadoop/conf/hdfs-site.xml. It "determines where on the local filesystem an DFS data node should store its blocks. If this is a comma-delimited list of directories, then data will be stored in all named directories, typically on different devices. Directories that do not exist are ignored." These are the directories where the real data bytes of HDFS will be written to. If you specify multiple directories the DataNode will write to them in turn which gives good performance when reading the data.

This is an example of what we are using:
Make sure to create the dfs directories before starting the DataNodes. They need to belong to hdfs:hadoop. When that's done you just need to install the DataNode, activate the startup scripts and start it:
yum install -y hadoop-0.20-datanode
chkconfig hadoop-0.20-datanode on
service hadoop-0.20-datanode start
Your DataNode should be up and running and if you have configured it correctly should also have connected to the NameNode and be visible in the web interface in the Live Nodes list and the configured capacity should go up. Ports that need to be opened to clients are 50075 (web interface, 50475 if you enabled SSL) and 50010 (for data transfer). For the cluster you need to open ports 50010 and 50020.

3. MapReduce

MapReduce is split in two parts as well: A JobTracker and multiple TaskTrackers. For small-ish clusters the NameNode and the JobTracker can run on the same server but depending on your usage and available memory you might need to run them on separate servers. We have 18 servers, 17 slaves and 1 master (with NameNode, JobTracker and other services) which isn't a problem so far. We need three properties set on all servers (in mapred-site.xml) to get started.
  • mapred.job.tracker: "The host and port that the MapReduce job tracker runs at. If 'local', then jobs are run in-process as a single map and reduce task."
    • This just points to your JobTracker. There is no default port for this in Hadoop 0.20 but 8021 is often used.
    • Our value (replace $jobtracker with the name or IP of your designated JobTracker): $jobtracker:8021
  • mapred.local.dir: "The local directory where MapReduce stores intermediate data files. May be a comma-separated list of directories on different devices in order to spread disk i/o. Directories that do not exist are ignored."
    • As it says this is a local directory where MapReduce stores stuff an we spread it out over all our discs.
    • Our value: /mnt/disk1/hadoop/mapreduce,/mnt/disk2/hadoop/mapreduce
    • Create the directories on each server with the owner mapred:hadoop
  • mapred.system.dir: "The shared directory where MapReduce stores control files."
    • This is a path in HDFS where MapReduce stores stuff
    • Our value: /hadoop/mapreduce/system
    • If dfs.permissions are on you need to create this directory in HDFS. Execute this command on any server in your cluster: su hdfs -c "/usr/bin/hadoop fs -mkdir /hadoop/mapreduce && /usr/bin/hadoop fs -chown mapred:hadoop /hadoop/mapreduce"

3.1 JobTracker
The JobTracker is very easy to setup and start:
yum install -y hadoop-0.20-jobtracker
chkconfig hadoop-0.20-jobtracker on
service hadoop-0.20-jobtracker start
The web interface should now be available at port 50030 on your JobTracker. Ports 50030 (web interface) and 8021 (not well defined in Hadoop 0.20 but if you followed my configuration this is correct) need to be opened to clients. Only 8021 is necessary for the TaskTrackers.

If the JobTracker is restarted some old files will not be cleaned up. That's why we added another small cronjob to run daily:
find /var/log/hadoop/ -type f -mtime +3 -name "job_*_conf.xml" -delete

3.2 TaskTracker
The TaskTracker are as easy to install as the JobTracker:
yum install -y hadoop-0.20-tasktracker
chkconfig hadoop-0.20-tasktracker on
service hadoop-0.20-tasktracker start
The TaskTracker should now be up and running and visible in the JobTracker's Nodes list. Only port 50060 needs to be opened to clients for a minimalistic web interface. Other than that no other ports are needed as TaskTrackers check in at the JobTracker regularly (heartbeat) and get assigned Tasks at the same time.

4. Configuration

I'll discuss a few configuration properties here that in a range of "necessary to change" to "nice to know about". I'll mention the following things for each property:
  • The default value,
  • the value we use for our cluster at GBIF if it differs from the default,
  • some of the defaults are quite old and have never been changed so I might mention a value I deem safe to use for everybody,
  • if we set the property to final so it can't be overridden by clients (we set a lot of the parameters to final for purely documentary reasons, even those that can't be overwritten in the first place),
  • if the property has been renamed or deprecated in Hadoop 0.21,
  • and if this property is required in a client configuration file or only on the cluster, if I don't mention it it's not needed.
Here are the default configuration files for Hadoop 0.20.2 and 0.21:
And I know that there are some duplications to the section above but I want to keep this Configuration section as a reference.

  • Default: file:///
  • We: hdfs://$namenode:8020
  • We set this to final
  • Renamed to fs.defaultFS in Hadoop 0.21
  • Needed on the clients
This is used to specify the default file system and defaults to your local file system that's why it needs be set to a HDFS address. This is important for client configuration as well so your local configuration file should include this element.

  • Default: /tmp/hadoop-${}
  • CDH3 Default: /var/lib/hadoop-0.20/cache/${}
  • We: Left it at the CDH3 default
  • We set this to final
As mentioned in the default file this is mainly a base for other temporary directories. If all other configuration options are set correctly there shouldn't be too much data in here.

  • Default: 0
  • We: 10080
  • We set this to final
Hadoop has a Trash feature were removed files (using the command line tools) are moved to a .Trash folder in the users home folder. If set to 0 this feature is disabled but if set to a non-zero value this is the amount of minutes between Trash cleaner runs. As we have a lot of users in our system using Hadoop for the first time we chose a safe value here.

  • Default: ${hadoop.tmp.dir}/dfs/namesecondary
  • We: /mnt/disk1/hadoop/dfs/namesecondary,/mnt/disk2/hadoop/dfs/namesecondary
  • We set this to final
The secondary NameNode stores its images to merge here. If it is a comma separated list the data is replicated to all these locations on the local disks.

  • Default: 4096
  • Safe: 65536
  • We: 131072 (32 * 4096)
  • Can be overwritten by clients
  • We set this to final
This is used for buffers all over the place to copy, store and write data to. It should be a multiple of 4096 and it should be safe to use 65536 today but we use double that. The performance gain is not enormous but there have been blog posts in the past measuring the impact and it was positive. We've also done our own tests and saw a small performance gain. If you use HBase be careful not to set this too high.

  • Default:,,
  • We:,,,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec
  • We set this to final
This lists all installed compression codecs. If you followed my manual you've got to add two more to the default list of codecs: LzoCodec and LzopCodec.

  • We: com.hadoop.compression.lzo.LzopCodec
I have actually no idea why this setting is needed as I couldn't find any reference where it is actually used in the code but I didn't look very hard so I might be wrong. All I know is that the documentation mentions that this property needs to be set.

  • Default: false
  • We: true
  • We set this to final
By setting this to true the web interfaces for the JobTracker and NameNode gain some advanced options like killing a job. It makes life a lot easier while still in development or evaluation. But you probably should set this to false once you rely on your Hadoop cluster for production use.

  • Default: ${hadoop.tmp.dir}/dfs/name
  • We: /mnt/disk1/hadoop/dfs/name,/mnt/disk2/hadoop/dfs/name
  • We set this to final
This is an important setting to set that's why I've already mentioned it above. The NameNode stores stuff in these directories by replicating all information to all these disks. One of them could be a mount on a remote disk (e.g. NFS) to have a backup.
  • Default: ${hadoop.tmp.dir}/dfs/data
  • We: /mnt/disk1/hadoop/dfs/data,/mnt/disk2/hadoop/dfs/data
  • We set this to final
This is another important setting as explained above. Different to in that the data is not replicated to all disks but distributed among all those locations. The DataNodes save the actual data in these locations. So more space is better. The easiest thing is to use dedicated disks for this. If you save other stuff than Hadoop data on the disks make sure to set dfs.datanode.du.reserved (see below).

  • Default: 10
  • We: 20
  • Safe: 10-20
  • We set this to final
The number of threads the NameNode uses to serve requests. This depends highly on your usage and size of your cluster. We've tried a bunch of different values and settled on 20 without seeing any notable differences. nnbench is probably a good tool to benchmark this. If you've got a large cluster or many file operations (create or delete) you can try upping this value.

  • Default: 3
  • We: 5
  • Safe: 5-10
  • We set this to final
The number of threads DataNodes use. I can't tell what a good value is for large clusters but the TestDFSIO benchmark seems like a good test to run to find a good value here. Just play around. We've tried a bunch of different values up to 20 and didn't see a difference so we chose a value slightly larger than the default.

  • Default: 0
  • We: Left the default
This many bytes will be left free on the volumes used by the DataNodes (see As our drives are dedicated to Hadoop we left this at 0 but if the drives host other stuff as well set this to an appropriate value.

  • Default: true
  • We: true
  • Renamed to dfs.permissions.enabled in Hadoop 0.21
  • We set this to final
This enables permission checking in HDFS. Unless you use Secure Hadoop (which we don't that's why I don't cover it here) it is still easy for anyone to read, write and delete anything on the cluster as there is no authentication of users done. So this is purely for safety reasons to avoid messing with the wrong data by accident.

  • Default: 3
  • We: 3
  • Can be used in the client configuration
This is the default replication level used for new files in HDFS. if you change this value later on no existing files will be changed (that can be done on the command line though). Every file in HDFS can have a different replication level. This just sets the default.

  • Default: 67108864
  • We: 134217728
  • Renamed to dfs.blocksize in Hadoop 0.21
  • Can be used in the client configuration
This factor is on a per file basis and only used for new files. Files saved to HDFS are split in blocks at most this large (64 MB by default). This has multiple implications. The more blocks you have the more load there is on your NameNode. So if you have many files that are larger than the blocksize you might set this larger. If your files are mostly smaller than this you waste no space. All files only take as much space as they actually have data (this is unlike other file systems where a file takes up at least one block no matter how large it really is). So NameNode load (memory requirements as well) are one factor. The deciding factor for us to set this higher per default is that a lot of our calculations in MapReduce are very fast and Mappers finish quickly. As one Mapper usually processes one block and Mappers take a while to set up we chose a higher block size so that each Mapper has more data to process.

This can be set on a per file basis so you really have to find your own perfect value, perhaps even on a per dataset basis.

  • Default: 1048576
  • We: 2097152
  • Renamed to dfs.datanode.balance.bandwidthPerSec in Hadoop 0.21
  • We set this to final
This property configures the amounts of bytes per second (default is 1 MB/s) that a DFS balancing operation can use per DataNode. The default is pretty low so we doubled it. We don't use a lot of bandwidth in our cluster at the moment so this is not a problem. Depends on your use case. The higher this number the faster balancing operations will complete. We run balancing every night on a cron job so we want it to be finished by morning.

  • Default: no default set
  • We: /etc/hadoop/conf/allowed_hosts
  • We set this to final
This file has to contain one name per line. Every name is the name of a DataNode that is allowed to connect to the NameNode. This prevents accidents like what happened to me: I test everything in Virtual Machines so I started a bunch of them, deployed the live config and forgot to change the NameNode so all of a sudden a bunch of Virtual Machines joined our HDFS cluster and blocks began replicating there.... So it is a good thing to explicitly list all allowed hosts in this file.
  • Default: false
  • We: true
  • As far as I know this option has been removed in Hadoop 0.21 and is enabled by default
  • We set this to final
This option has quite a history. To make it short: If you're using CDH3 set this to true, otherwise leave it false. You want/need this on true if you plan to use HBase.

  • Default: 256
  • Safe: 1024
  • We: 2048
  • Yes, this is misspelt in Hadoop and it hasn't been fixed in Hadoop 0.21.
  • We set this to final
This is the maximum number of threads a DataNode may use (for example for file access to the local file system). There used to be bugs in Hadoop so that the default was a bit to low and needed to be set higher. Even today it's worth it to set it higher without a lot of risk. Especially if you're using HBase.


HDFS is pretty straightforward to configure and benchmark. MapReduce is more of a black art unfortunately. I'll describe the MapReduce process here because it is important to understand where all the properties come in so you can safely change their values and tweak the performance. In my first draft of this post I wrote that I won't go into much detail on the internals of the MapReduce process. (Un-)fortunately this wasn't as easy as I thought and it has grown into a full blown explanation of everything I know. It is very possible that something's wrong here so please correct me if you see something that is off. And if you're not interested in how this works just skip to the descriptions of the properties itself.

All of this is valid for Hadoop 0.20.2+737 (the CDH version). I know that some things have changed in Hadoop 0.21 but that's left for another time.

The Map side

While a Map is running it is collecting output records in an in-memory buffer called MapOutputBuffer, if there are no reducers a DirectMapOutputCollector is used which makes most of the rest obsolete as it writes immediately to disk. The total size of this in memory buffer is set by the io.sort.mb property and defaults to 100 MB (which is converted to a byte value using a bit shift operation [100 << 20 = 104857600]). Out of these 100 MB io.sort.record.percent are reserved for tracking record boundaries. This property defaults to 0.05 (i.e. 5% which means 5 MB in the default case). Each record to track takes 16 bytes (4 integers of 4 bytes each) of memory which means the buffer can track 327680 map output records with the default settings. The rest of the memory (104857600 bytes - (16 bytes * 327680) = 99614720 bytes) is used to store the actual bytes to be collected (in the default case this will be 95 MB). While Map outputs are collected they are stored in the remaining memory and their location in the in-memory buffer is tracked as well. Once one of these two buffers reaches a threshold specified by io.sort.spill.percent, which defaults to 0.8 (i.e. 80%), the buffer is flushed to disk:
0.8 * 99614720 = 79691776
0.8 * 327680 = 262144
Look in the log output of your Maps and you'll see these three lines at the beginning of every log:
2010-12-05 01:33:04,912 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100
2010-12-05 01:33:04,996 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720
2010-12-05 01:33:04,996 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680
You should recognize these numbers!

Now while the Map is running you might see log lines like these:
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 19361312; bufvoid = 99614720
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: kvstart = 0; kvend = 262144; length = 327680
2010-12-05 01:33:09,558 INFO org.apache.hadoop.mapred.MapTask: Finished spill 0
This means we've reached the maximum number of records we can track even though our buffer is still pretty empty (99614720 - 19361312 bytes still free). If however your buffer is the cause of your spill you'll see a line like this:
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: buffer full = true
All of this spilling to disk is done in a separate thread so that the Map can continue running. That's also the reason why the spill begins early (when the buffer is only 80% full) so it doesn't fill up before a spill is finished. If one single Map output is too large to fit into the in memory buffer a single spill is done for this one value. A spill actually consists of one file per partition, meaning one file per Reducer.

After a Map task has finished there may be multiple spills on the TaskTracker. Those files have to be merged into one single sorted file per partition which is then fetched by the Reducers. The property io.sort.factor says how many of those spill files will be merged into one file at a time. The lower the number is the more passes will be required to arrive at the goal. The default is very low and it was considered to set the default to 100 (and in fact looking at the code it sometimes is set to 100 by default). This property can make a pretty huge difference if your Mappers output a lot of data. Not much memory is needed for this property but the larger it is the more open files there will be so make sure to set this to a reasonable value. To find such a value you should run a few MapReduce jobs that you'd expect to see in production use and carefully monitor the log files.

Watch out for log messages like these:
  • Merging sorted segments
  • Down to the last merge-pass, with  segments left of total size: bytes
  • Merging  intermediate segments out of a total of
This is the process on the Map side where this factor is used. If your Mappers only have on spill file all of this doesn't matter. So if you try to benchmark this make sure to use a job with a lot of Map output data. If you only see a line like "Finished spill 0" but none of the above you're only producing one spill file which doesn't require any merging or further sorting. This is the ideal situation and you should try to get the number of spilled records/files as low as possible.

The Reduce side

The reduce phase has three different steps: Copy, Sort (which should really be called Merge) and Reduce.

During the Copy phase the Reducer tries to fetch the output of the Maps from the TaskTrackers and store it on the Reducer either in memory or on disk. The property mapred.reduce.parallel.copies (which defaults to 5) defines how many Threads are started per Reduce task to fetch Map output from the TaskTrackers.

Here's an example log from the beginning of a Reducer log:
2010-12-05 01:53:03,846 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager: MemoryLimit=334063200, MaxSingleShuffleLimit=83515800
2010-12-05 01:53:03,879 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Need another 1870 map output(s) where 0 is already in progress
2010-12-05 01:53:03,880 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread started: Thread for merging on-disk files
2010-12-05 01:53:03,880 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread waiting: Thread for merging on-disk files
2010-12-05 01:53:03,880 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread started: Thread for merging in memory files
2010-12-05 01:53:03,881 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread started: Thread for polling Map Completion Events
You can see two things in these log lines. First of all the ShuffleRamManager is started and afterwards you see that this Reducer needs to fetch 1870 map outputs (meaning we had 1870 Mappers). The map output is fetched and shuffled into memory (that's what the ShuffleRamManager is for). You can control its behavior using the mapred.job.shuffle.input.buffer.percent (default is 0.7). Runtime.getRuntime().maxMemory() is used to get the available memory which unfortunately returns slightly incorrect values so be careful when setting this. We'll get back to the last four lines later.

Our child tasks are running with -Xmx512m (536870912 bytes) so 70% of that should be 375809638 bytes but the ShuffleRamManager reports 334063200. No big deal, just be aware of it. There's a hardcoded limit of 25% of the buffer that a single map output may not surpass. If it is larger than that it will be written to disk (see the MaxSingleShuffleLimit value above: 334063200 * 0.25 = 83515800).

Now that everything's set up the copiers will start their work and fetch the output. You'll see a bunch of log lines like these:
2010-12-05 01:53:11,114 INFO org.apache.hadoop.mapred.ReduceTask: header: attempt_201012031527_0021_m_000011_0, compressed len: 454055, decompressed len: 454051
2010-12-05 01:53:11,114 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 454051 bytes (454055 raw bytes) into RAM from attempt_201012031527_0021_m_000011_0
2010-12-05 01:53:11,133 INFO org.apache.hadoop.mapred.ReduceTask: Read 454051 bytes from map-output for attempt_201012031527_0021_m_000011_0
2010-12-05 01:53:11,133 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_201012031527_0021_m_000011_0 -> (70, 6) from
In the first line you see that a map output was successfully copied and it could read the size of the data from the headers. The next line is actually what we've talked about earlier: The map output will now be decompressed (if it was compressed) and saved into memory using the ShuffleRamManager. The third line acknowledges that this succeeded. And the last line is information for a bug and should have been removed already according to a comment in the source code.

If for whatever reason the map output doesn't fit into memory you will see a similar log line to the second one above but "RAM" will be replaced by "Local-FS" and the fourth line will be missing. You obviously want as much data into memory as possible so shuffling on to the Local-FS is a warning sign or at least a sign for possible optimizations.

While all this goes on until all map outputs have been fetched there are two threads (Thread for merging on-disk files and Thread for merging in memory files) waiting for some conditions until they become active. The conditions are as follows:
  • The used memory in the in-memory buffer is above mapred.job.shuffle.merge.percent (default ist 66%, in our example that would mean 334063200 * 0.66 = 220481712 bytes) and there are at least two map outputs in the buffer
  • or there are more than mapred.inmem.merge.threshold (defaults to 1000) map outputs in the in-memory buffer, independent of the size
  • or if there are more than io.sort.factor * 2 -1 files on disk.
When one of the first two condition triggers you'll see something like this:
2010-12-05 01:53:42,106 INFO org.apache.hadoop.mapred.ReduceTask: Initiating in-memory merge with 501 segments...
2010-12-05 01:53:42,114 INFO org.apache.hadoop.mapred.Merger: Merging 501 sorted segments
2010-12-05 01:53:46,492 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Merge of the 501 files in-memory complete. Local file is /mnt/disk1/hadoop/mapreduce/local/taskTracker/lfrancke/jobcache/job_201012031527_0021/attempt_201012031527_0021_r_000103_0/output/map_1.out of size 220545981
This could actually trigger the third condition as it writes a new file to disk. When that happens you'll see something like this:
2010-12-10 14:28:23,289 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012101346_0001_r_000012_0We have  19 map outputs on disk. Triggering merge of 10 files
The io.sort.factor was set to the default of 10. 10 (out of the 19) files will be merged into one, leaving 10 on disk (i.e. io.sort.factor).

Both of these (the in-memory and the on-disk merge, the latter is also called Interleaved on-disk merge) will produce a new single output file and write it to disk. All of this is only going on as long as map outputs are still fetched. When that's finished we wait for running merges to finish but won't start any new ones in these threads:
2010-12-05 01:59:10,598 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting
2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread joined.
2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 3 files left.
2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 314 files left.
As you can see by the timestamps no merges were running in our case so everything just shut down. During the copy phase we finished a total of three in-memory merges that's why we currently have three files on the disk. 314 more map outputs are still in the in-memory buffer. This concludes the Copy phase and the Sort phase begins:
2010-12-05 01:59:10,605 INFO org.apache.hadoop.mapred.Merger: Merging 314 sorted segments
2010-12-05 01:59:10,605 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 314 segments left of total size: 127512782 bytes
2010-12-05 01:59:13,903 INFO org.apache.hadoop.mapred.ReduceTask: Merged 314 segments, 127512782 bytes to disk to satisfy reduce memory limit
2010-12-05 01:59:13,904 INFO org.apache.hadoop.mapred.ReduceTask: Merging 4 files, 788519164 bytes from disk
2010-12-05 01:59:13,905 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce
2010-12-05 01:59:13,905 INFO org.apache.hadoop.mapred.Merger: Merging 4 sorted segments
2010-12-05 01:59:14,493 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 4 segments left of total size: 788519148 bytes
There are two things happening here. First of all the remaining 314 files that are still in memory are merged into one file on the disk (the first three lines). So now there are four files on the disk. These four files are merged into one.

There is an option mapred.job.reduce.input.buffer.percent which is set to 0 by default which allows the Reducer to keep some map output files in memory. The following is a snippet with this property set to 0.7:
2010-12-05 23:11:55,657 INFO org.apache.hadoop.mapred.ReduceTask: Merging 3 files, 661137901 bytes from disk
2010-12-05 23:11:55,660 INFO org.apache.hadoop.mapred.ReduceTask: Merging 312 segments, 127381881 bytes from memory into reduce
2010-12-05 23:11:55,661 INFO org.apache.hadoop.mapred.Merger: Merging 3 sorted segments
2010-12-05 23:11:55,688 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 3 segments left of total size: 661137889 bytes
2010-12-05 23:11:55,688 INFO org.apache.hadoop.mapred.Merger: Merging 313 sorted segments
2010-12-05 23:11:55,689 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 313 segments left of total size: 788519778 bytes
You can see that instead of merging the 312 segments from memory to disk they are kept in memory while the three files on disk are merged into one and all of the resulting 313 segments are streamed into the reducer.

There seems to be a bug in Hadoop though. I'm not 100% sure about this one so any insight would be appreciated. When the following conditions are true segments from the memory don't seem to be written to disk even if they should be according to the configuration:
  • There are segments in memory that should be written to disk before the reduce task begins according to mapred.job.reduce.input.buffer.percent
  • and there are more files on disk than io.sort.factor
If this happens you see this:
2010-12-10 16:39:40,671 INFO org.apache.hadoop.mapred.ReduceTask: Keeping 14 segments, 18888592 bytes in memory for intermediate, on-disk merge
2010-12-10 16:39:40,673 INFO org.apache.hadoop.mapred.ReduceTask: Merging 10 files, 4143441520 bytes from disk
2010-12-10 16:39:40,674 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce
2010-12-10 16:39:40,674 INFO org.apache.hadoop.mapred.Merger: Merging 24 sorted segments
2010-12-10 16:39:40,859 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 24 segments left of total size: 4143441480 bytes
So the steps being done in the Sort phase are the following:
  1. Merge all segments (= map outputs) that are still in memory and don't fit into the memory specified by mapred.job.reduce.input.buffer.percent into one file on disk if there are less than io.sort.factor files on disk so we end up with at most io.sort.factor files on the disk after this step. If there are already io.sort.factor or more files on disk but there are map outputs that need to be written out of memory keep them in memory for now
    1. In the first case you'll see a log message like this: Merged 314 segments, 127512782 bytes to disk to satisfy reduce memory limit
    2. In the second case you'll see this: Keeping 14 segments, 18888592 bytes in memory for intermediate, on-disk merge
  2. All files on disk and all remaining files in memory that need to be merged (case 1.b) are determined. You'll see a log message like this: "Merging 4 files, 788519164 bytes from disk".
  3. All files that remain in memory during the Reduce phase are determined: "Merging 312 segments, 127381881 bytes from memory into reduce".
  4. All files (on disk + in-memory) from step 2. are merged together using io.sort.factor as the merge factor. Which means that there might be intermediate merges to disk.
  5. Merge all remaining in-memory (from step 3.) and on-disk files (from step 4.) into one stream to be read by the Reducer. This is done in a streaming fashion without writing new data to disk and just returning an Iterator to the Reduce phase.
This Iterator is given to the Reducer and so the Reduce phase starts.

Well, it turned out to be a rather detailed description of the process which is helpful to understand the configuration properties available to you. See below for a detailed list of all the relevant properties:
  • Default: 10
  • We: 100
  • Safe: 20-100
  • Renamed to in Hadoop 0.21
  • Can be used in the client configuration
I've explained pretty thoroughly what this parameter does so I won't go into detail here. The whole situation with io.sort.factor and io.sort.mb is not ideal but as long as they are the options we have and the defaults are very low it is pretty safe to change them to a more reasonable value. It is worthwhile to take a look at your logs and search for the lines mentioned in the explanation above. This can be set on a per-job basis and for jobs that run frequently it's worth to find a good job specific value.

  • Default: 100
  • Renamed to in Hadoop 0.21
  • Can be used in the client configuration
You can adjust the amount of memory used in the Mappers to collect Map outputs with this parameter. This parameter obviously depends heavily on the amount of memory you have available in total for your child VMs and on the memory requirements of your tasks. Your goal should be to minimize the amount of spilling that has to be performed as explained above and to utilize the available as best as possible. If your Map tasks don't need a lot of memory themselves you can use almost all available memory here. The default settings allocate 200 MB for child VMs and half of that is used for the output buffer so your Map tasks has about 100 MB available by default.

  • Default: 0.05
  • This has been removed in favor of automatic configuration in Hadoop 0.21
  • Can be used in the client configuration
The output buffer on the map side is split in two parts. One stores the actual bytes of the output data and the other one stores 16 bytes of metadata per output. This property specifies how much memory of the buffer (io.sort.mb) is used for tracking the metadata. The default is 5% and is often very low for jobs which output only small amounts of data in their map tasks. Look for lines indicating whether a spill to disk occurs because of record full = true. If this happens try to increase this value. This is another property which is very specific to the jobs you're running so it might need tuning for each and every job. Thankfully this mechanism has been replaced in Hadoop 0.21.

  • Default: 0.8
  • Renamed to in Hadoop 0.21
  • Can be used in the client configuration
This property just configures when the data from the map output buffer will be written (spilled) to disk. The spilling process is running in a separate thread and output will be collected while it is running so it is important to start this process before the buffer is completely full as the map tasks will pause until there is space available.

  • Default: local
  • We: :8021
  • We set this to final
  • Renamed to mapreduce.jobtracker.address in Hadoop 0.21
  • Needed on the clients
This lets the client know where to find the JobTracker and it lets the JobTracker know which port to bind to.

  • Default: ${hadoop.tmp.dir}/mapred/local
  • We: /mnt/disk1/hadoop/mapreduce/local,/mnt/disk2/hadoop/mapreduce/local
  • We set this to final
  • Renamed to mapreduce.cluster.local.dir in Hadoop 0.21
This lets the MapReduce servers know where to store intermediate files. This may be a comma-separated list of directories to spread the load. Make sure there's enough space here for all your intermediate files. We share the same disks for MapReduce and HDFS.

  • Default: ${hadoop.tmp.dir}/mapred/system
  • We: /hadoop/mapred/system
  • We set this to final
  • Renamed to mapreduce.jobtracker.system.dir in Hadoop 0.21
This is a folder in the defaultFS where MapReduce stores some control files. In our case that would be a directory in HDFS. If you have dfs.permissions (which it is by default) enabled make sure that this directory exists and is owned by mapred:hadoop.

  • Default: ${hadoop.tmp.dir}/mapred/temp
  • We: /tmp/mapreduce
  • We set this to final
  • Renamed to mapreduce.cluster.temp.dir in Hadoop 0.21
This is a folder to store temporary files in. It is hardly - if at all used. If I understand the description correctly this is supposed to be in HDFS but I'm not entirely sure by reading the source code. So we set this to a directory that exists on the local filesystem as well as in HDFS.
  • Default: 2
  • Renamed to mapreduce.job.maps in Hadoop 0.21
It is important to realize that this is just a hint for MapReduce as to the number of Maps it should use. In most cases this value is ignored and the actual number of Maps is dependent on the input data and generated automatically. For those rare cases where this value is used we set it to about 90% of our map slot capacity. This can be set client-side per job so if you have a job that relies on this property you better set it there to an appropriate value.

  • Default: 1
  • Renamed to mapreduce.job.reduces in Hadoop 0.21
This is different than the property for map tasks in that it is often not possible to calculate a "native" or optimal number of reduce tasks for a job. With this property you can specify the number of reduce tasks to start for a given job. The default is very low. The description suggests to set this to 99% of the cluster capacity so that all reduces finish in one wave. This is sensible when you use the default scheduler but as soon as multiple jobs run in parallel it's hard to guarantee that all reduces of one job finish in one wave. We're constantly playing around with this and currently have this at about 50% of our capacity.

This too can be specified on a per-job basis.

  • Default: org.apache.hadoop.mapred.JobQueueTaskScheduler
  • We: org.apache.hadoop.mapred.FairScheduler
  • We set this to final
  • Renamed to mapreduce.jobtracker.taskscheduler in Hadoop 0.21
With the default configuration all jobs are placed in a priority FIFO queue and submitted one after the other. This is fine for testing but it doesn't utilize the available resources very well. This property allows you to change the scheduler used. These are the available schedulers in CDH3b3:
Depending on the scheduler you decide to use there may be additional properties which I'm not going to mention here. Have a look at the dedicated documentation.

  • Default: 5
  • We: ~20-50
  • We set this to final
  • Renamed to mapreduce.reduce.shuffle.parallelcopies in Hadoop 0.21
The reduce tasks have to fetch the map outputs from the remote servers. They have to fetch the output from each map of which there may be thousands. This option allows to parallelize the copy process. Tuning this to a value is very worthwhile. In our first tests this property gave us one of the best performance increases of all properties. We started to increase this property in steps of 5 and looked very carefully at the logs and our monitoring system to find a value that works for us. We've not yet finished this process but values between 20 and 50 seem to mostly work without problems. & mapred.tasktracker.reduce.tasks.maximum
  • Default: 2
  • We set this to final
  • Renamed to & in Hadoop 0.21
This setting is very important and we've yet to find values that we are comfortable with. This setting can be different on each TaskTracker and defines how many map or reduce task "slots" there are on a specific TaskTracker. You need to set these to values that don't overload your servers while still fully utilizing them. You also need to make sure that there's enough memory for all tasks and services running on a server (see By setting this property to different values depending on your server configuration you can easily use heterogeneous hardware in your cluster. Each distinct hardware configuration will have these properties set to different values. A general rule from the Hadoop Definitive Guide book says that these properties can be set to number of cores - 1. We've tried various settings now but found the load on the servers to be very high with those settings so we'll have to do more benchmarking.
  • Default: -Xmx200m
  • Can be used in the client configuration
These are the options given to each child JVM started (map- and reduce tasks). The default just sets the maximum memory to 200 MB. This can be set on the client to pass options needed for a specific job. GC logging for example can be enabled as well. This isn't configurable on a per TaskTracker basis so you have to make sure that every machine in your cluster fulfills the requirements. Available memory needs to be at least ( + mapred.tasktracker.reduce.tasks.maximum) * Xmx.

  • Default: 1000
  • We: 0
  • Renamed to mapreduce.reduce.merge.inmem.threshold in Hadoop 0.21
  • Can be used in the client configuration
I've explained the effect of this property in the MapReduce description above but to reiterate: The reduce side fetches map outputs to memory. Once the memory is full or this many map outputs are in memory they are merged together to one file on the disk. This can be set on a per job basis but as a default we've disabled this behavior and just flush to disk when the memory is full. This seems to have been better for all our jobs so far but it's definitely a property to look out for.

  • Default: 0.66
  • Renamed to mapreduce.reduce.shuffle.merge.percent in Hadoop 0.21
  • Can be used in the client configuration
Once the memory buffer in the copy (shuffle) phase of the reduce task is this full a background thread will start to merge all map outputs collected in memory so far and write them to a single file on disk. This is similar to what's happening on the map side. In the default configuration the mapred.inmem.merge.threshold parameter might actually trigger a merge before this value is hit. We haven't yet played around with this property but you'd have to be careful to turn it not too high so that the copy processes have to wait for the buffer to be empty again. That could be a huge performance hit.

An addition to Hadoop's logging would be nice that lets us know how full the buffer is the moment a merge finishes.

  • Default: 0.7
  • Renamed to mapreduce.reduce.shuffle.input.buffer.percent in Hadoop 0.21
  • Can be used in the client configuration
This is the amount of memory from the total available memory (specified by that's allocated for collecting map outputs in memory on the reduce side. Another parameter we haven't played around with but my guess would be that this can be easily set a little bit higher.

  • Default: 0.0
  • Renamed to mapreduce.reduce.input.buffer.percent in Hadoop 0.21
  • Can be used in the client configuration
Usually map outputs would be written to disk when the sort phase (on the reduce) ends. If you have reduce tasks that don't need a lot of memory themselves you can set this to a higher value so that map outputs up to this amount of memory (in percent of the total available memory) aren't written to disk but kept in memory. This is obviously faster than an intermediate spill to disk. Should be considered on a per-job basis. & mapred.reduce.tasks.speculative.execution
  • Default: true
  • We: false
  • We will set this to final once we're in production
  • Renamed to & mapreduce.reduce.speculative in Hadoop 0.21
  • Can be used in the client configuration
Speculative Execution starts multiple instances of certain map or reduce tasks when it detects certain circumstances (like an unusually slow task or node) to avoid waiting for stragglers too long. This sounds like a good idea and we've got it enabled at the moment but when we go to production this will probably be disabled as it uses valuable resources on the cluster that mostly goes to waste and while one job may finish faster all the others have to wait longer.

  • Default: 1
  • Renamed to mapreduce.job.jvm.numtasks in Hadoop 0.21
  • Can be used in the client configuration
Child JVMs are spawned for the map and reduce tasks. This parameter lets you reuse these VMs for multiple tasks. The default value creates a new JVM for each task which has some overhead (the book says about one second per JVM). We've played around with it a bit and it can make things faster but you've got to be careful with memory leaks and shared state. Basically you should be sure that your jobs can handle this. If you have a performance critical job you can play around with this but we've had some OutOfMemory errors when using this so we're conservative at the moment. If you set it to -1 a JVM will never be destroyed.

  • Default: 40
  • We: 80
  • We set this to final
  • Renamed to mapreduce.tasktracker.http.threads in Hadoop 0.21
The map output is fetched by the reducers from the TaskTrackers via HTTP. This property lets you adjust the number of threads that server those requests. When we upped the parallel copies we had some errors about fetch-failures so we slowly increased this value. Those two parameters need to be carefully tuned. 80 seemed to cause no problems for us so we stuck to it for now. You have to restart your TaskTrackers after changing this value.
  • Default: false
  • We: true
  • Renamed to in Hadoop 0.21
  • Can be used in the client configuration
Turning this on will compress the output of your Mappers using SequenceFile compression. Depending on the codec you chose this computation may be CPU intensive and result in varying degrees of compression. We've benchmarked jobs of different sizes with this intermediate compression enabled and disabled and while some of them took slightly longer than before it is still good to enable it. The cost isn't too high and there is a lot less intermediate data generated. Less I/O in general is good especially if multiple jobs are running.
  • Default:
  • We: com.hadoop.compression.lzo.LzoCodec
  • Renamed to in Hadoop 0.21
  • Can be used in the client configuration
With this property you specify the specific compression codec to use for Map output compression. So far we've only tried LZO. This choice was based on the experience of others and the general properties of the algorithm being very fast but sacrificing a bit of compression efficiency for its speed. We plan to test the other algorithms as well.

  • Default: no default set
  • We: /etc/hadoop/conf/allowed_hosts
  • Renamed to mapreduce.jobtracker.hosts.filename in Hadoop 0.21
  • We set this to final
This is the same as dfs.hosts just specifies which TaskTrackers are allowed to get work from the JobTracker. They both have the same format so it's quite common for them to be the same file.


After setting up all these parameters the way you like them you should have a fully functional but basic Hadoop cluster running. You can submit jobs, use HDFS etc. But there are a few more things that we can do like installing Hive, Hue, Pig, Sqoop, etc. We've also yet to cover Puppet. All of this is hopefully forthcoming in more blog posts in the future.

We're also very interested in other users (or interested people and companies) of Hadoop, HBase & Co. in Scandinavia who would be interested in a Hadoop Meetup. We're located in Copenhagen. Contact us if you're interested.

If you have any questions or spot any problems or mistakes please let me know in the comments or by mail.