Introduction
In the last few months I was tasked several times with setting up Hadoop
clusters. Those weren't huge - two to thirteen machines - but from what
I read and hear this is a common use case especially for companies just
starting with Hadoop or setting up a first small test cluster. While
there is a huge amount of documentation in form of official
documentation, blog posts, articles and books most of it stops just
where it gets interesting: Dealing with all the stuff you really have to
do to set up a cluster, cleaning logs, maintaining the system, knowing
what and how to tune etc.
I'll try to describe all the hoops we had to jump through and all the
steps involved to get our Hadoop cluster up and running. Probably
trivial stuff for experienced Sysadmins but if you're a Developer and
finding yourself in the "Devops" role all of a sudden I hope it is
useful to you.
While working at GBIF I was asked to
set up a Hadoop cluster on 15 existing and 3 new machines. So the first
interesting thing about this setup is that it is a heterogeneous
environment: Three different configurations at the moment. This is where
our first goal came from: We wanted some kind of automated
configuration management. We needed to try different cluster
configurations and we need to be able to shift roles around the cluster
without having to do a lot of manual work on each machine. We decided to
use a tool called
Puppet for this task.
While Hadoop is not currently in production at GBIF there are mid- to
long-term plans to switch parts of our infrastructure to various
components of the HStack. Namely MapReduce jobs with Hive and perhaps
Pig (there is already strong knowledge of SQL here) and also storing of
large amounts of raw data in HBase to be processed asynchronously (~500
million records until next year) and indexed in a Lucene/Solr solution
possibly using something like Katta to distribute indexes. For good
measure we also have fairly complex geographic calculations and map-tile
rendering that could be done on Hadoop. So we have those 18 machines
and no real clue how they'll be used and which services we'd need in the
end.
Environment
As mentioned before we have three different server configurations. We've put those machines in three logical clusters
c1,
c2 and
c3 and just counting up in those (our master for example is currently running on
c1n1):
- c1 10: Intel(R) Xeon(R) CPU X3363 @ 2.83GHz, 2x6MB (quad), 8 GB RAM, 2 x 500GB SATA 7.2K
- c2 3: 2 x Intel(R) Xeon(R) CPU E5630 @ 2.53GHz (quad), 24 GB RAM, 6 x 250 GB SATA 5.4K
- c3 5: Intel(R) Xeon(R) CPU X3220 @ 2.40GHz (quad), 4 GB RAM, 2 x 160 GB SATA 7.2K
- CentOS 5.5
- The machines are in different racks but connected to only one switch
We realize that this is a very heterogeneous cluster configuration. We
also realize that some people highly discourage use of old machines or
machines with little RAM but the
c1 and
c3 clusters were old unused machines and this way they still serve a purpose and we've had no problems so far using this setup.
Goal
These were the goals we set out to achieve on our cluster and these are
also all the things I'll try to describe in this or a following post:
- Puppet for setting up the services and configuring machine state
- CDH3 (Beta 3)
- Hadoop HDFS + MapReduce incl. Hadoop LZO
- Hue
- Zookeeper
- HBase
- Easily distributable packages for Hadoop, Hive and Pig to be used by
the employees to access the cluster from their own workstations
- Benchmarks & Optimizations
Be warned: This is going to be a very long post and unfortunately it is
the nature of these things that some of the information is bound to be
outdated pretty quickly so let me know if something has changed and I'll
alter the post.
Manual Installation
Before we use Puppet to do everything automatically I will show how it
can be done manually. I think it is important to know all the steps in
case something goes wrong or you decide not to use Puppet at all. When I
talk about "the server" I always mean "all servers in your cluster"
except when noted otherwise. I highly recommend not skipping this part
even if you want to use Puppet.
Operating System
For now I'll just assume a vanilla CentOS 5.5 installation. There's
nothing special you need. I recommend just the bare minimum, everything
else needed can be installed at a later time. A few words though about
things you might want to do: Your servers probably have multiple disks.
You shouldn't use any RAID or LVM on any of your slaves (i.e.
DataNodes/TaskTracker). Just use a JBOD configuration. In our cluster
all disks are in a simple structure:
/mnt/disk1
/mnt/disk2
- ...
There are also two tweaks for your slaves you can do:
- Mount your data disks with
noatime
(e.g. /dev/sdc1 /mnt/disk3 ext3 defaults,noatime 1 2
which btw. implies nodiratime
)
- By default there are a certain number of blocks reserved on ext (not familiar with others) file systems (check by running
tune2fs -l /dev/sdc1
and look at the Reserved block count).
While this is useful on system disks so that critical processes can
still write some data when the disk is full otherwise this is wasted
space on our data disks. By default 5% of a HDD are reserved for this. I
recommend setting this down to 1% by running: tune2fs -m 1
on all your data disks (i.e. tune2fs -m 1 /dev/sdc1
)
which frees up quite a bit of disk space. You can also set it to 0% if
you want though I went with 1% for our cluster. Keep the default setting
for your system disks though!
On your NameNode however use any means you feel necessary to secure your
data. You know your requirements better than I do. Use RAID and/or LVM
however you like. We don't have any special resources so our NameNode is
running on one of our regular servers at the moment. We might change
that in the future.
A note on Cloudera's Package system & naming
Cloudera provides the various components of Hadoop in different Packages but they follow a simple structure: There is one
hadoop-0.20
package which contains all the jars, config files, directories, etc.
needed for all the roles. And then there are packages like
hadoop-0.20-namenode
which are only a few kilobytes and they only contain the appropriate start- and stopscripts for the role in question.
1. Common Requirements
Most of the commands in this guide need to be executed as
root
. I've chosen the easy route here and just logged in as
root
. If you're operating as a non-privileged user remember to use
su
,
sudo
or any other means to ensure you have the proper rights.
Repository
As all the packages we're going to install are provided by Cloudera we need to add their repository to our cluster:
curl http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo > /etc/yum.repos.d/cloudera-cdh3.repo
Java installation
You have to download the JDK from Oracle's website yourself as license
issues prevent it from being added to the repositories. Chose the
correct system (probably Linux x64) and make sure to download the file
ending in
-rpm.bin
(i.e.
jdk-6u23-linux-x64-rpm.bin
).
You might have to do this from a client machine because you need a
browser that works with the Oracle site. So on any one machine execute
the following:
unzip jdk-6u23-linux-x64-rpm.bin
You should now have a bunch of .rpm files but you only need one of them:
jdk-6u23-linux-amd64.rpm
. Copy this file to your servers and install it as root using rpm:
rpm -Uvh ./jdk-6u23-linux-amd64.rpm
Time
While not a hard requirement it makes a lot of things easier if the
clocks on your servers are synchronized. I added this part at the last
minute because we just realized that
ntpd
was disabled on
three of our machines (c2) by accident and had some problems with it. It
is worth taking a look at the clocks now and set up
ntp
properly before you start.
DNS
It doesn't matter if you use a DNS server or hosts files or any other
means for the servers to find each other. But make sure this works! Do
it now! Even if you think everything's set up correctly. Another thing
that you should check is if the local hostname resolves to the public IP
address. If you're using a DNS server you can use
dig
to test this but that doesn't take into account the
/etc/hosts
file so here is a simple test to see if it is correct:
ping -c 1 `hostname`
This should resolve to the public IP and not to
127.0.0.1
.
Firewall
Hadoop uses a lot of ports for its internal and external communications.
We've just allowed all traffic between the servers in the cluster and
clients. But if you don't want to do that you can also selectively open
the required ports. I try to mention them but they can all be changed in
the configuration files. I might also miss some due to our config so
I'd be glad if someone could point those out to me.
Packages
We're going to use lzo compression, the Hadoop native libraries as well
as hue so there are a few common dependencies on all machines in the
cluster which can be easily installed:
rpm -Uvh http://download.fedora.redhat.com/pub/epel/5/x86_64/epel-release-5-4.noarch.rpm
yum install -y lzo hue-plugins hadoop-0.20-native
Directories
We also need some directories later on so we can just create them now:
mkdir /hadoop
chown root:hadoop /hadoop
Cloudera uses the
alternatives system to manage configuration. In
/etc/hadoop/conf
is the currently activated configuration. Look at the contents of
/etc/hadoop
and you'll find all the installed configurations. At the moment there is only a
conf.empty
directory which we'll use as our starting point:
cp -R /etc/hadoop/conf.empty /etc/hadoop/conf.cluster
Now feel free to edit the configuration files in
/etc/hadoop/conf.cluster
but we'll go through them as well later in this post. The last step is to activate this configuration:
/usr/sbin/alternatives --install /etc/hadoop-0.20/conf hadoop-0.20-conf /etc/hadoop-0.20/conf.cluster 50
LZO
Due to licensing issues the LZO bindings for Hadoop cannot be
distributed the same way as the rest of the packages. So this - once
again - involves a few manual steps. After these bindings were removed
from Hadoop itself a few versions ago they moved tho the
hadoop-gpl-compression project on Google Code which (as far as I know)
still works but hasn't seen any development for over a year. Thankfully
though Twitter's Kevin Weil and Cloudera's Todd Lipcon have picked up
the project and maintained it. They regularly sync their github
repositories so both should have almost the same code. I'm going to use
Todd's version here as it should be better synced with CDH releases.
You have to download the code from the repository, build the native
libraries as well as the jar file and distribute those files on your
cluster. You need to do this only on one machine which ideally should
run the same OS version as the servers in your cluster. When you're
finished you can just copy the result to all servers. We're using
version 0.4.9 so we use this to download and build:
rpm -Uvh http://download.fedora.redhat.com/pub/epel/5/x86_64/epel-release-5-4.noarch.rpm
yum install -y lzo-devel
wget --no-check-certificate https://github.com/toddlipcon/hadoop-lzo/tarball/0.4.9
tar xvfz toddlipcon-hadoop-lzo-0.4.9-0-g0e70051.tar.gz
wget http://www.apache.org/dist/ant/binaries/apache-ant-1.8.2-bin.tar.bz2
tar jxvf apache-ant-1.8.2-bin.tar.gz
cd toddlipcon-hadoop-lzo-0e70051
JAVA_HOME=/usr/java/latest/ BUILD_REVISION="0.4.9" ../apache-ant-1.8.2/bin/ant tar
The ant version that comes with CentOS 5.5 didn't work for me that's why I downloaded a new one. This should leave you with a
hadoop-lzo-0.4.9.tar.gz
file in the build directory which you can extract to get all the necessary files for your servers:
hadoop-lzo-0.4.9.jar
needs to be copied into /usr/lib/hadoop/lib
on each server
lib/native/Linux-amd64-64
needs to be copied into /usr/lib/hadoop/lib/native
on each server
cron & log cleaning
We've had a problem with unintentional debug logs filling up our hard
drives. The investigations that followed that incident resulted in a
blog post by
Lars George
explaining all the log files Hadoop writes. It is a worthwhile read.
Hadoop writes tons of logs in various processes and phases and you
should make sure that these don't fill up your hard drives. There are
two instances in the current CDH3b3 where you have to manually
interfere:
- Hadoop daemon logs
- Job XML files on the JobTracker
Hadoop uses a
DailyRollingFileAppender
which unfortunately doesn't have a
maxBackupIndex
setting like the
RollingFileAppender
.
So either change the appender or manually clean up logs after a few
days. We chose the second path and added a very simple cron job to run
daily:
find /var/log/hadoop/ -type f -mtime +14 -name "hadoop-hadoop-*" -delete
This jobs deletes old log files after 14 days. We'll take care of the Job XML files in a similar way at the JobTracker.
HDFS
One property needs to be set for both the NameNode and the DataNodes in the file
/etc/hadoop/conf/core-site.xml
:
fs.default.name
. So just add this and replace
$namenode
with the IP or name of your NameNode:
fs.default.name
hdfs://$namenode:8020
2.1. NameNode
Installing the NameNode is straightforward:
yum install -y hadoop-0.20-namenode
This installs the startup scripts for the NameNode. The core package was
already installed in the previous step. Now we need to change the
configuration, create some directories and format the NameNode.
In
/etc/hadoop/conf/hdfs-site.xml
add the
dfs.name.dir
property which
"determines
where on the local filesystem the DFS name node should store the name
table(fsimage). If this is a comma-delimited list of directories then
the name table is replicated in all of the directories, for redundancy."
We mentioned before that we're using a JBOD configuration. We do this
even for our NameNode. So in our case the NameNode has two disks mounted
at
/mnt/disk1
and
/mnt/disk2
but you might
want to write to just one location if you use RAID. As it says in the
documentation the NameNode will write to each of the locations. You can
write to a third location: A NFS mount which serves as a backup. Our
configuration looks like this:
dfs.name.dir
/mnt/disk1/hadoop/dfs/name,/mnt/disk2/hadoop/dfs/name
Make sure to create the
dfs
directories before starting the NameNode. They need to belong to
hdfs:hadoop
. Formatting the NameNode is all that's left:
su hdfs -c "/usr/bin/hadoop namenode -format"
Once you've done all that you can enable the service so it will be started upon system boot and start the NameNode:
chkconfig hadoop-0.20-namenode on
service hadoop-0.20-namenode start
You should be able to see the web interface on your namenode at port
50070 now. Ports that need to be opened to clients on the NameNode are
50070 (web interface, 50470 if you enabled SSL) and 8020 (for HDFS
command line interaction). Only port 8020 needs to be enabled for all
other servers in the cluster.
We also use a cron job to run the HDFS Balancer every evening:
/usr/lib/hadoop-0.20/bin/start-balancer.sh -threshold 5
2.2 DataNodes
The DataNodes handle all the data by storing it and serving it to
clients. You can run a DataNode on your NameNode and especially for
small- or test clusters this is often done but as soon as you have more
than three to five machines or rely on your cluster for production use
you should use a dedicated NameNode. Setting the DataNodes up is easy
though after all our preparations. We need to set the property
dfs.data.dir
in the file
/etc/hadoop/conf/hdfs-site.xml
. It
"determines
where on the local filesystem an DFS data node should store its blocks.
If this is a comma-delimited list of directories, then data will be
stored in all named directories, typically on different devices.
Directories that do not exist are ignored." These are the
directories where the real data bytes of HDFS will be written to. If you
specify multiple directories the DataNode will write to them in turn
which gives good performance when reading the data.
This is an example of what we are using:
dfs.data.dir
/mnt/disk1/hadoop/dfs/data,/mnt/disk2/hadoop/dfs/data
Make sure to create the
dfs
directories before starting the DataNodes. They need to belong to
hdfs:hadoop
. When that's done you just need to install the DataNode, activate the startup scripts and start it:
yum install -y hadoop-0.20-datanode
chkconfig hadoop-0.20-datanode on
service hadoop-0.20-datanode start
Your DataNode should be up and running and if you have configured it
correctly should also have connected to the NameNode and be visible in
the web interface in the
Live Nodes list and the configured
capacity should go up. Ports that need to be opened to clients are
50075 (web interface, 50475 if you enabled SSL) and 50010 (for data
transfer). For the cluster you need to open ports 50010 and 50020.
3. MapReduce
MapReduce is split in two parts as well: A JobTracker and multiple
TaskTrackers. For small-ish clusters the NameNode and the JobTracker can
run on the same server but depending on your usage and available memory
you might need to run them on separate servers. We have 18 servers, 17
slaves and 1 master (with NameNode, JobTracker and other services) which
isn't a problem so far. We need three properties set on all servers (in
mapred-site.xml
) to get started.
mapred.job.tracker
: "The host and port that the
MapReduce job tracker runs at. If 'local', then jobs are run in-process
as a single map and reduce task."
- This just points to your JobTracker. There is no default port for this in Hadoop 0.20 but 8021 is often used.
- Our value (replace
$jobtracker
with the name or IP of your designated JobTracker): $jobtracker:8021
mapred.local.dir
: "The local directory where
MapReduce stores intermediate data files. May be a comma-separated list
of directories on different devices in order to spread disk i/o.
Directories that do not exist are ignored."
- As it says this is a local directory where MapReduce stores stuff an we spread it out over all our discs.
- Our value:
/mnt/disk1/hadoop/mapreduce,/mnt/disk2/hadoop/mapreduce
- Create the directories on each server with the owner
mapred:hadoop
mapred.system.dir
: "The shared directory where MapReduce stores control files."
- This is a path in HDFS where MapReduce stores stuff
- Our value:
/hadoop/mapreduce/system
- If
dfs.permissions
are on you need to create this directory in HDFS. Execute this command on any server in your cluster: su hdfs -c "/usr/bin/hadoop fs -mkdir /hadoop/mapreduce && /usr/bin/hadoop fs -chown mapred:hadoop /hadoop/mapreduce"
3.1 JobTracker
The JobTracker is very easy to setup and start:
yum install -y hadoop-0.20-jobtracker
chkconfig hadoop-0.20-jobtracker on
service hadoop-0.20-jobtracker start
The web interface should now be available at port 50030 on your
JobTracker. Ports 50030 (web interface) and 8021 (not well defined in
Hadoop 0.20 but if you followed my configuration this is correct) need
to be opened to clients. Only 8021 is necessary for the TaskTrackers.
If the JobTracker is restarted some old files will not be cleaned up. That's why we added another small cronjob to run daily:
find /var/log/hadoop/ -type f -mtime +3 -name "job_*_conf.xml" -delete
3.2 TaskTracker
The TaskTracker are as easy to install as the JobTracker:
yum install -y hadoop-0.20-tasktracker
chkconfig hadoop-0.20-tasktracker on
service hadoop-0.20-tasktracker start
The TaskTracker should now be up and running and visible in the
JobTracker's Nodes list. Only port 50060 needs to be opened to clients
for a minimalistic web interface. Other than that no other ports are
needed as TaskTrackers check in at the JobTracker regularly (heartbeat)
and get assigned Tasks at the same time.
4. Configuration
I'll discuss a few configuration properties here that in a range of
"necessary to change" to "nice to know about". I'll mention the
following things for each property:
- The default value,
- the value we use for our cluster at GBIF if it differs from the default,
- some of the defaults are quite old and have never been changed so I might mention a value I deem safe to use for everybody,
- if we set the property to final so it can't be overridden by clients
(we set a lot of the parameters to final for purely documentary
reasons, even those that can't be overwritten in the first place),
- if the property has been renamed or deprecated in Hadoop 0.21,
- and if this property is required in a client configuration file or only on the cluster, if I don't mention it it's not needed.
Here are the default configuration files for Hadoop 0.20.2 and 0.21:
And I know that there are some duplications to the section above but I want to keep this Configuration section as a reference.
core-site.xml
fs.default.name
- Default:
file:///
- We:
hdfs://$namenode:8020
- We set this to final
- Renamed to
fs.defaultFS
in Hadoop 0.21
- Needed on the clients
This is used to specify the default file system and defaults to your
local file system that's why it needs be set to a HDFS address. This is
important for client configuration as well so your local configuration
file should include this element.
hadoop.tmp.dir
- Default:
/tmp/hadoop-${user.name}
- CDH3 Default:
/var/lib/hadoop-0.20/cache/${user.name}
- We: Left it at the CDH3 default
- We set this to final
As mentioned in the default file this is mainly a base for other
temporary directories. If all other configuration options are set
correctly there shouldn't be too much data in here.
fs.trash.interval
- Default:
0
- We:
10080
- We set this to final
Hadoop has a Trash feature were removed files (using the command line
tools) are moved to a .Trash folder in the users home folder. If set to 0
this feature is disabled but if set to a non-zero value this is the
amount of minutes between Trash cleaner runs. As we have a lot of users
in our system using Hadoop for the first time we chose a safe value
here.
fs.checkpoint.dir
- Default:
${hadoop.tmp.dir}/dfs/namesecondary
- We:
/mnt/disk1/hadoop/dfs/namesecondary,/mnt/disk2/hadoop/dfs/namesecondary
- We set this to final
The secondary NameNode stores its images to merge here. If it is a comma
separated list the data is replicated to all these locations on the
local disks.
io.file.buffer.size
- Default:
4096
- Safe:
65536
- We:
131072
(32 * 4096)
- Can be overwritten by clients
- We set this to final
This is used for buffers all over the place to copy, store and write
data to. It should be a multiple of 4096 and it should be safe to use
65536 today but we use double that. The performance gain is not enormous
but there have been blog posts in the past measuring the impact and it
was positive. We've also done our own tests and saw a small performance
gain. If you use HBase be careful not to set this too high.
io.compression.codecs
- Default:
org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec
- We:
org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec
- We set this to final
This lists all installed compression codecs. If you followed my manual
you've got to add two more to the default list of codecs:
LzoCodec
and
LzopCodec
.
io.compression.codec.lzo.class
- We:
com.hadoop.compression.lzo.LzopCodec
I have actually no idea why this setting is needed as I couldn't find
any reference where it is actually used in the code but I didn't look
very hard so I might be wrong. All I know is that the documentation
mentions that this property needs to be set.
webinterface.private.actions
- Default:
false
- We:
true
- We set this to final
By setting this to
true
the web interfaces for the
JobTracker and NameNode gain some advanced options like killing a job.
It makes life a lot easier while still in development or evaluation. But
you probably should set this to false once you rely on your Hadoop
cluster for production use.
hdfs-site.xml
dfs.name.dir
- Default:
${hadoop.tmp.dir}/dfs/name
- We:
/mnt/disk1/hadoop/dfs/name,/mnt/disk2/hadoop/dfs/name
- We set this to final
This is an important setting to set that's why I've already mentioned it
above. The NameNode stores stuff in these directories by replicating
all information to all these disks. One of them could be a mount on a
remote disk (e.g. NFS) to have a backup.
dfs.data.dir
- Default:
${hadoop.tmp.dir}/dfs/data
- We:
/mnt/disk1/hadoop/dfs/data,/mnt/disk2/hadoop/dfs/data
- We set this to final
This is another important setting as explained above. Different to
dfs.name.dir
in that the data is not replicated to all disks but distributed among
all those locations. The DataNodes save the actual data in these
locations. So more space is better. The easiest thing is to use
dedicated disks for this. If you save other stuff than Hadoop data on
the disks make sure to set
dfs.datanode.du.reserved
(see below).
dfs.namenode.handler.count
- Default:
10
- We:
20
- Safe: 10-20
- We set this to final
The number of threads the NameNode uses to serve requests. This depends
highly on your usage and size of your cluster. We've tried a bunch of
different values and settled on 20 without seeing any notable
differences.
nnbench
is probably a good tool to benchmark
this. If you've got a large cluster or many file operations (create or
delete) you can try upping this value.
dfs.datanode.handler.count
- Default:
3
- We:
5
- Safe: 5-10
- We set this to final
The number of threads DataNodes use. I can't tell what a good value is for large clusters but the
TestDFSIO
benchmark seems like a good test to run to find a good value here. Just
play around. We've tried a bunch of different values up to 20 and
didn't see a difference so we chose a value slightly larger than the
default.
dfs.datanode.du.reserved
- Default:
0
- We: Left the default
This many bytes will be left free on the volumes used by the DataNodes (see
dfs.data.dir
).
As our drives are dedicated to Hadoop we left this at 0 but if the
drives host other stuff as well set this to an appropriate value.
dfs.permissions
- Default:
true
- We:
true
- Renamed to
dfs.permissions.enabled
in Hadoop 0.21
- We set this to final
This enables permission checking in HDFS. Unless you use Secure Hadoop
(which we don't that's why I don't cover it here) it is still easy for
anyone to read, write and delete anything on the cluster as there is no
authentication of users done. So this is purely for safety reasons to
avoid messing with the wrong data by accident.
dfs.replication
- Default:
3
- We:
3
- Can be used in the client configuration
This is the default replication level used for new files in HDFS. if you
change this value later on no existing files will be changed (that can
be done on the command line though). Every file in HDFS can have a
different replication level. This just sets the default.
dfs.block.size
- Default:
67108864
- We:
134217728
- Renamed to
dfs.blocksize
in Hadoop 0.21
- Can be used in the client configuration
This factor is on a per file basis and only used for new files. Files
saved to HDFS are split in blocks at most this large (64 MB by default).
This has multiple implications. The more blocks you have the more load
there is on your NameNode. So if you have many files that are larger
than the blocksize you might set this larger. If your files are mostly
smaller than this you waste no space. All files only take as much space
as they actually have data (this is unlike other file systems where a
file takes up at least one block no matter how large it really is). So
NameNode load (memory requirements as well) are one factor. The deciding
factor for us to set this higher per default is that a lot of our
calculations in MapReduce are very fast and Mappers finish quickly. As
one Mapper usually processes one block and Mappers take a while to set
up we chose a higher block size so that each Mapper has more data to
process.
This can be set on a per file basis so you really have to find your own perfect value, perhaps even on a per dataset basis.
dfs.balance.bandwidthPerSec
- Default:
1048576
- We:
2097152
- Renamed to
dfs.datanode.balance.bandwidthPerSec
in Hadoop 0.21
- We set this to final
This property configures the amounts of bytes per second (default is 1
MB/s) that a DFS balancing operation can use per DataNode. The default
is pretty low so we doubled it. We don't use a lot of bandwidth in our
cluster at the moment so this is not a problem. Depends on your use
case. The higher this number the faster balancing operations will
complete. We run balancing every night on a cron job so we want it to be
finished by morning.
dfs.hosts
- Default: no default set
- We:
/etc/hadoop/conf/allowed_hosts
- We set this to final
This file has to contain one name per line. Every name is the name of a
DataNode that is allowed to connect to the NameNode. This prevents
accidents like what happened to me: I test everything in Virtual
Machines so I started a bunch of them, deployed the live config and
forgot to change the NameNode so all of a sudden a bunch of Virtual
Machines joined our HDFS cluster and blocks began replicating there....
So it is a good thing to explicitly list all allowed hosts in this file.
dfs.support.append
- Default:
false
- We:
true
- As far as I know this option has been removed in Hadoop 0.21 and is enabled by default
- We set this to final
This option has quite a history. To make it short: If you're using CDH3
set this to true, otherwise leave it false. You want/need this on true
if you plan to use HBase.
dfs.datanode.max.xcievers
- Default:
256
- Safe:
1024
- We:
2048
- Yes, this is misspelt in Hadoop and it hasn't been fixed in Hadoop 0.21.
- We set this to final
This is the maximum number of threads a DataNode may use (for example
for file access to the local file system). There used to be bugs in
Hadoop so that the default was a bit to low and needed to be set higher.
Even today it's worth it to set it higher without a lot of risk.
Especially if you're using HBase.
mapred-site.xml
HDFS is pretty straightforward to configure and benchmark. MapReduce is
more of a black art unfortunately. I'll describe the MapReduce process
here because it is important to understand where all the properties come
in so you can safely change their values and tweak the performance. In
my first draft of this post I wrote that I won't go into much detail on
the internals of the MapReduce process. (Un-)fortunately this wasn't as
easy as I thought and it has grown into a full blown explanation of
everything I know. It is very possible that something's wrong here so
please correct me if you see something that is off. And if you're not
interested in how this works just skip to the descriptions of the
properties itself.
All of this is valid for Hadoop 0.20.2+737 (the CDH version). I know
that some things have changed in Hadoop 0.21 but that's left for another
time.
The Map side
While a Map is running it is collecting output records in an in-memory buffer called
MapOutputBuffer
, if there are no reducers a
DirectMapOutputCollector
is used which makes most of the rest obsolete as it writes immediately
to disk. The total size of this in memory buffer is set by the
io.sort.mb
property and defaults to
100 MB (which is converted to a byte value using a bit shift operation [
100 << 20 = 104857600
]). Out of these
100 MB io.sort.record.percent
are reserved for tracking record boundaries. This property defaults to
0.05 (i.e.
5% which means
5 MB in the default case). Each record to track takes
16 bytes (4 integers of 4 bytes each) of memory which means the buffer can track
327680 map output records with the default settings. The rest of the memory (
104857600 bytes - (16 bytes * 327680) = 99614720 bytes) is used to store the actual bytes to be collected (in the default case this will be
95 MB).
While Map outputs are collected they are stored in the remaining memory
and their location in the in-memory buffer is tracked as well. Once one
of these two buffers reaches a threshold specified by
io.sort.spill.percent
, which defaults to
0.8 (i.e.
80%), the buffer is flushed to disk:
0.8 * 99614720 = 79691776
0.8 * 327680 = 262144
Look in the log output of your Maps and you'll see these three lines at the beginning of every log:
2010-12-05 01:33:04,912 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100
2010-12-05 01:33:04,996 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720
2010-12-05 01:33:04,996 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680
You should recognize these numbers!
Now while the Map is running you might see log lines like these:
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 19361312; bufvoid = 99614720
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: kvstart = 0; kvend = 262144; length = 327680
2010-12-05 01:33:09,558 INFO org.apache.hadoop.mapred.MapTask: Finished spill 0
This means we've reached the maximum number of records we can track even though our buffer is still pretty empty (
99614720 - 19361312 bytes still free). If however your buffer is the cause of your spill you'll see a line like this:
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: buffer full = true
All of this spilling to disk is done in a separate thread so that the
Map can continue running. That's also the reason why the spill begins
early (when the buffer is only
80% full) so it doesn't fill up
before a spill is finished. If one single Map output is too large to fit
into the in memory buffer a single spill is done for this one value. A
spill actually consists of one file per partition, meaning one file per
Reducer.
After a Map task has finished there may be multiple spills on the
TaskTracker. Those files have to be merged into one single sorted file
per partition which is then fetched by the Reducers. The property
io.sort.factor
says how many of those spill files will be merged into one file at a
time. The lower the number is the more passes will be required to arrive
at the goal. The default is very low and it was considered to set the
default to
100 (and in fact looking at the code it sometimes is set to
100
by default). This property can make a pretty huge difference if your
Mappers output a lot of data. Not much memory is needed for this
property but the larger it is the more open files there will be so make
sure to set this to a reasonable value. To find such a value you should
run a few MapReduce jobs that you'd expect to see in production use and
carefully monitor the log files.
Watch out for log messages like these:
Merging sorted segments
Down to the last merge-pass, with segments left of total size: bytes
Merging intermediate segments out of a total of
This is the process on the Map side where this factor is used. If your
Mappers only have on spill file all of this doesn't matter. So if you
try to benchmark this make sure to use a job with a lot of Map output
data. If you only see a line like "
Finished spill 0
" but
none of the above you're only producing one spill file which doesn't
require any merging or further sorting. This is the ideal situation and
you should try to get the number of spilled records/files as low as
possible.
The Reduce side
The reduce phase has three different steps: Copy, Sort (which should really be called Merge) and Reduce.
During the Copy phase the Reducer tries to fetch the output of the Maps
from the TaskTrackers and store it on the Reducer either in memory or on
disk. The property
mapred.reduce.parallel.copies
(which defaults to
5) defines how many Threads are started per Reduce task to fetch Map output from the TaskTrackers.
Here's an example log from the beginning of a Reducer log:
2010-12-05 01:53:03,846 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager: MemoryLimit=334063200, MaxSingleShuffleLimit=83515800
2010-12-05 01:53:03,879 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Need another 1870 map output(s) where 0 is already in progress
2010-12-05 01:53:03,880 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread started: Thread for merging on-disk files
2010-12-05 01:53:03,880 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread waiting: Thread for merging on-disk files
2010-12-05 01:53:03,880 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread started: Thread for merging in memory files
2010-12-05 01:53:03,881 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread started: Thread for polling Map Completion Events
You can see two things in these log lines. First of all the
ShuffleRamManager
is started and afterwards you see that this Reducer needs to fetch 1870
map outputs (meaning we had 1870 Mappers). The map output is fetched
and shuffled into memory (that's what the
ShuffleRamManager
is for). You can control its behavior using the
mapred.job.shuffle.input.buffer.percent
(default is
0.7).
Runtime.getRuntime().maxMemory() is used to get the available memory which unfortunately returns slightly
incorrect values so be careful when setting this. We'll get back to the last four lines later.
Our child tasks are running with
-Xmx512m
(536870912 bytes) so 70% of that should be
375809638 bytes but the
ShuffleRamManager
reports
334063200.
No big deal, just be aware of it. There's a hardcoded limit of 25% of
the buffer that a single map output may not surpass. If it is larger
than that it will be written to disk (see the MaxSingleShuffleLimit
value above: 334063200 * 0.25 = 83515800).
Now that everything's set up the copiers will start their work and fetch the output. You'll see a bunch of log lines like these:
2010-12-05 01:53:11,114 INFO org.apache.hadoop.mapred.ReduceTask: header: attempt_201012031527_0021_m_000011_0, compressed len: 454055, decompressed len: 454051
2010-12-05 01:53:11,114 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 454051 bytes (454055 raw bytes) into RAM from attempt_201012031527_0021_m_000011_0
2010-12-05 01:53:11,133 INFO org.apache.hadoop.mapred.ReduceTask: Read 454051 bytes from map-output for attempt_201012031527_0021_m_000011_0
2010-12-05 01:53:11,133 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_201012031527_0021_m_000011_0 -> (70, 6) from c1n7.gbif.org
In the first line you see that a map output was successfully copied and
it could read the size of the data from the headers. The next line is
actually what we've talked about earlier: The map output will now be
decompressed (if it was compressed) and saved into memory using the
ShuffleRamManager
. The third line acknowledges that this succeeded. And the last line is information for a
bug and should have been removed already according to a comment in the source code.
If for whatever reason the map output doesn't fit into memory you will see a similar log line to the second one above but "
RAM
" will be replaced by "
Local-FS
"
and the fourth line will be missing. You obviously want as much data
into memory as possible so shuffling on to the Local-FS is a warning
sign or at least a sign for possible optimizations.
While all this goes on until all map outputs have been fetched there are
two threads (Thread for merging on-disk files and Thread for merging in
memory files) waiting for some conditions until they become active. The
conditions are as follows:
- The used memory in the in-memory buffer is above
mapred.job.shuffle.merge.percent
(default ist 66%, in our example that would mean 334063200 * 0.66 = 220481712 bytes) and there are at least two map outputs in the buffer
- or there are more than
mapred.inmem.merge.threshold
(defaults to 1000) map outputs in the in-memory buffer, independent of the size
- or if there are more than
io.sort.factor
* 2 -1 files on disk.
When one of the first two condition triggers you'll see something like this:
2010-12-05 01:53:42,106 INFO org.apache.hadoop.mapred.ReduceTask: Initiating in-memory merge with 501 segments...
2010-12-05 01:53:42,114 INFO org.apache.hadoop.mapred.Merger: Merging 501 sorted segments
...
2010-12-05 01:53:46,492 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Merge of the 501 files in-memory complete. Local file is /mnt/disk1/hadoop/mapreduce/local/taskTracker/lfrancke/jobcache/job_201012031527_0021/attempt_201012031527_0021_r_000103_0/output/map_1.out of size 220545981
2
This could actually trigger the third condition as it writes a new file
to disk. When that happens you'll see something like this:
2010-12-10 14:28:23,289 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012101346_0001_r_000012_0We have 19 map outputs on disk. Triggering merge of 10 files
The
io.sort.factor
was set to the default of 10. 10 (out of the 19) files will be merged into one, leaving 10 on disk (i.e.
io.sort.factor
).
Both of these (the in-memory and the on-disk merge, the latter is also called
Interleaved on-disk merge)
will produce a new single output file and write it to disk. All of this
is only going on as long as map outputs are still fetched. When that's
finished we wait for running merges to finish but won't start any new
ones in these threads:
2010-12-05 01:59:10,598 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting
2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread joined.
2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 3 files left.
2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 314 files left.
As you can see by the timestamps no merges were running in our case so
everything just shut down. During the copy phase we finished a total of
three in-memory merges that's why we currently have three files on the
disk. 314 more map outputs are still in the in-memory buffer. This
concludes the Copy phase and the Sort phase begins:
2010-12-05 01:59:10,605 INFO org.apache.hadoop.mapred.Merger: Merging 314 sorted segments
2010-12-05 01:59:10,605 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 314 segments left of total size: 127512782 bytes
2010-12-05 01:59:13,903 INFO org.apache.hadoop.mapred.ReduceTask: Merged 314 segments, 127512782 bytes to disk to satisfy reduce memory limit
2010-12-05 01:59:13,904 INFO org.apache.hadoop.mapred.ReduceTask: Merging 4 files, 788519164 bytes from disk
2010-12-05 01:59:13,905 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce
2010-12-05 01:59:13,905 INFO org.apache.hadoop.mapred.Merger: Merging 4 sorted segments
2010-12-05 01:59:14,493 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 4 segments left of total size: 788519148 bytes
There are two things happening here. First of all the remaining 314
files that are still in memory are merged into one file on the disk (the
first three lines). So now there are four files on the disk. These four
files are merged into one.
There is an option
mapred.job.reduce.input.buffer.percent
which is set to 0 by default which allows the Reducer to keep some map
output files in memory. The following is a snippet with this property
set to 0.7:
2010-12-05 23:11:55,657 INFO org.apache.hadoop.mapred.ReduceTask: Merging 3 files, 661137901 bytes from disk
2010-12-05 23:11:55,660 INFO org.apache.hadoop.mapred.ReduceTask: Merging 312 segments, 127381881 bytes from memory into reduce
2010-12-05 23:11:55,661 INFO org.apache.hadoop.mapred.Merger: Merging 3 sorted segments
2010-12-05 23:11:55,688 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 3 segments left of total size: 661137889 bytes
2010-12-05 23:11:55,688 INFO org.apache.hadoop.mapred.Merger: Merging 313 sorted segments
2010-12-05 23:11:55,689 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 313 segments left of total size: 788519778 bytes
You can see that instead of merging the 312 segments from memory to disk
they are kept in memory while the three files on disk are merged into
one and all of the resulting 313 segments are streamed into the reducer.
There seems to be a bug in Hadoop though. I'm not 100% sure about this
one so any insight would be appreciated. When the following conditions
are true segments from the memory don't seem to be written to disk even
if they should be according to the configuration:
- There are segments in memory that should be written to disk before the reduce task begins according to
mapred.job.reduce.input.buffer.percent
- and there are more files on disk than
io.sort.factor
If this happens you see this:
2010-12-10 16:39:40,671 INFO org.apache.hadoop.mapred.ReduceTask: Keeping 14 segments, 18888592 bytes in memory for intermediate, on-disk merge
2010-12-10 16:39:40,673 INFO org.apache.hadoop.mapred.ReduceTask: Merging 10 files, 4143441520 bytes from disk
2010-12-10 16:39:40,674 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce
2010-12-10 16:39:40,674 INFO org.apache.hadoop.mapred.Merger: Merging 24 sorted segments
2010-12-10 16:39:40,859 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 24 segments left of total size: 4143441480 bytes
So the steps being done in the Sort phase are the following:
- Merge all segments (= map outputs) that are still in memory and don't fit into the memory specified by
mapred.job.reduce.input.buffer.percent
into one file on disk if there are less than io.sort.factor
files on disk so we end up with at most io.sort.factor
files on the disk after this step. If there are already io.sort.factor
or more files on disk but there are map outputs that need to be written out of memory keep them in memory for now
- In the first case you'll see a log message like this:
Merged 314 segments, 127512782 bytes to disk to satisfy reduce memory limit
- In the second case you'll see this:
Keeping 14 segments, 18888592 bytes in memory for intermediate, on-disk merge
- All files on disk and all remaining files in memory that need to be
merged (case 1.b) are determined. You'll see a log message like this: "
Merging 4 files, 788519164 bytes from disk
".
- All files that remain in memory during the Reduce phase are determined: "
Merging 312 segments, 127381881 bytes from memory into reduce
".
- All files (on disk + in-memory) from step 2. are merged together using
io.sort.factor
as the merge factor. Which means that there might be intermediate merges to disk.
- Merge all remaining in-memory (from step 3.) and on-disk files (from
step 4.) into one stream to be read by the Reducer. This is done in a
streaming fashion without writing new data to disk and just returning an
Iterator to the Reduce phase.
This Iterator is given to the Reducer and so the Reduce phase starts.
Well, it turned out to be a rather detailed description of the process
which is helpful to understand the configuration properties available to
you. See below for a detailed list of all the relevant properties:
io.sort.factor
- Default:
10
- We:
100
- Safe: 20-100
- Renamed to
mapreduce.task.io.sort.factor
in Hadoop 0.21
- Can be used in the client configuration
I've explained pretty thoroughly what this parameter does so I won't go into detail here. The whole situation with
io.sort.factor
and
io.sort.mb
is not ideal but as long as they are the options we have and the
defaults are very low it is pretty safe to change them to a more
reasonable value. It is worthwhile to take a look at your logs and
search for the lines mentioned in the explanation above. This can be set
on a per-job basis and for jobs that run frequently it's worth to find a
good job specific value.
io.sort.mb
- Default:
100
- Renamed to
mapreduce.task.io.sort.mb
in Hadoop 0.21
- Can be used in the client configuration
You can adjust the amount of memory used in the Mappers to collect Map
outputs with this parameter. This parameter obviously depends heavily on
the amount of memory you have available in total for your child VMs and
on the memory requirements of your tasks. Your goal should be to
minimize the amount of spilling that has to be performed as explained
above and to utilize the available as best as possible. If your Map
tasks don't need a lot of memory themselves you can use almost all
available memory here. The default settings allocate 200 MB for child
VMs and half of that is used for the output buffer so your Map tasks has
about 100 MB available by default.
io.sort.record.percent
- Default:
0.05
- This has been removed in favor of automatic configuration in Hadoop 0.21
- Can be used in the client configuration
The output buffer on the map side is split in two parts. One stores the
actual bytes of the output data and the other one stores 16 bytes of
metadata per output. This property specifies how much memory of the
buffer (io.sort.mb) is used for tracking the metadata. The default is 5%
and is often very low for jobs which output only small amounts of data
in their map tasks. Look for lines indicating whether a spill to disk
occurs because of
record full = true
. If this happens try
to increase this value. This is another property which is very specific
to the jobs you're running so it might need tuning for each and every
job.
Thankfully this mechanism has been replaced in Hadoop 0.21.
io.sort.spill.percent
- Default:
0.8
- Renamed to
mapreduce.map.sort.spill.percent
in Hadoop 0.21
- Can be used in the client configuration
This property just configures when the data from the map output buffer
will be written (spilled) to disk. The spilling process is running in a
separate thread and output will be collected while it is running so it
is important to start this process before the buffer is completely full
as the map tasks will pause until there is space available.
mapred.job.tracker
- Default:
local
- We:
:8021
- We set this to final
- Renamed to
mapreduce.jobtracker.address
in Hadoop 0.21
- Needed on the clients
This lets the client know where to find the JobTracker and it lets the JobTracker know which port to bind to.
mapred.local.dir
- Default:
${hadoop.tmp.dir}/mapred/local
- We:
/mnt/disk1/hadoop/mapreduce/local,/mnt/disk2/hadoop/mapreduce/local
- We set this to final
- Renamed to
mapreduce.cluster.local.dir
in Hadoop 0.21
This lets the MapReduce servers know where to store intermediate files.
This may be a comma-separated list of directories to spread the load.
Make sure there's enough space here for all your intermediate files. We
share the same disks for MapReduce and HDFS.
mapred.system.dir
- Default:
${hadoop.tmp.dir}/mapred/system
- We:
/hadoop/mapred/system
- We set this to final
- Renamed to
mapreduce.jobtracker.system.dir
in Hadoop 0.21
This is a folder in the
defaultFS
where MapReduce stores some control files. In our case that would be a directory in HDFS. If you have
dfs.permissions
(which it is by default) enabled make sure that this directory exists and is owned by mapred:hadoop.
mapred.temp.dir
- Default:
${hadoop.tmp.dir}/mapred/temp
- We:
/tmp/mapreduce
- We set this to final
- Renamed to
mapreduce.cluster.temp.dir
in Hadoop 0.21
This is a folder to store temporary files in. It is hardly - if at all
used. If I understand the description correctly this is supposed to be
in HDFS but I'm not entirely sure by reading the source code. So we set
this to a directory that exists on the local filesystem as well as in
HDFS.
mapred.map.tasks
- Default:
2
- Renamed to
mapreduce.job.maps
in Hadoop 0.21
It is important to realize that this is just a hint for MapReduce as to
the number of Maps it should use. In most cases this value is ignored
and the actual number of Maps is dependent on the input data and
generated automatically. For those rare cases where this value is used
we set it to about 90% of our map slot capacity. This can be set
client-side per job so if you have a job that relies on this property
you better set it there to an appropriate value.
mapred.reduce.tasks
- Default:
1
- Renamed to
mapreduce.job.reduces
in Hadoop 0.21
This is different than the property for map tasks in that it is often
not possible to calculate a "native" or optimal number of reduce tasks
for a job. With this property you can specify the number of reduce tasks
to start for a given job. The default is very low. The description
suggests to set this to 99% of the cluster capacity so that all reduces
finish in one wave. This is sensible when you use the default scheduler
but as soon as multiple jobs run in parallel it's hard to guarantee that
all reduces of one job finish in one wave. We're constantly playing
around with this and currently have this at about 50% of our capacity.
This too can be specified on a per-job basis.
mapred.jobtracker.taskScheduler
- Default:
org.apache.hadoop.mapred.JobQueueTaskScheduler
- We:
org.apache.hadoop.mapred.FairScheduler
- We set this to final
- Renamed to
mapreduce.jobtracker.taskscheduler
in Hadoop 0.21
With the default configuration all jobs are placed in a priority FIFO
queue and submitted one after the other. This is fine for testing but it
doesn't utilize the available resources very well. This property allows
you to change the scheduler used. These are the available schedulers in
CDH3b3:
Depending on the scheduler you decide to use there may be additional
properties which I'm not going to mention here. Have a look at the
dedicated documentation.
mapred.reduce.parallel.copies
- Default:
5
- We: ~20-50
- We set this to final
- Renamed to
mapreduce.reduce.shuffle.parallelcopies
in Hadoop 0.21
The reduce tasks have to fetch the map outputs from the remote servers.
They have to fetch the output from each map of which there may be
thousands. This option allows to parallelize the copy process. Tuning
this to a value is very worthwhile. In our first tests this property
gave us one of the best performance increases of all properties. We
started to increase this property in steps of 5 and looked very
carefully at the logs and our monitoring system to find a value that
works for us. We've not yet finished this process but values between 20
and 50 seem to mostly work without problems.
mapred.tasktracker.map.tasks.maximum
& mapred.tasktracker.reduce.tasks.maximum
- Default: 2
- We set this to final
- Renamed to
mapreduce.tasktracker.map.tasks.maximum
& mapreduce.tasktracker.map.tasks.maximum
in Hadoop 0.21
This setting is very important and we've yet to find values that we are
comfortable with. This setting can be different on each TaskTracker and
defines how many map or reduce task "slots" there are on a specific
TaskTracker. You need to set these to values that don't overload your
servers while still fully utilizing them. You also need to make sure
that there's enough memory for all tasks and services running on a
server (see mapred.child.java.opts).
By setting this property to different values depending on your server
configuration you can easily use heterogeneous hardware in your cluster.
Each distinct hardware configuration will have these properties set to
different values.
A general rule from the
Hadoop Definitive Guide book says that these properties can be set to
number of cores - 1
.
We've tried various settings now but found the load on the servers to
be very high with those settings so we'll have to do more benchmarking.
mapred.child.java.opts
- Default:
-Xmx200m
- Can be used in the client configuration
These are the options given to each child JVM started (map- and reduce
tasks). The default just sets the maximum memory to 200 MB. This can be
set on the client to pass options needed for a specific job. GC logging
for example can be enabled as well. This isn't configurable on a per
TaskTracker basis so you have to make sure that every machine in your
cluster fulfills the requirements. Available memory needs to be at least
(mapred.tasktracker.map.tasks.maximum + mapred.tasktracker.reduce.tasks.maximum) * Xmx
.
mapred.inmem.merge.threshold
- Default:
1000
- We:
0
- Renamed to
mapreduce.reduce.merge.inmem.threshold
in Hadoop 0.21
- Can be used in the client configuration
I've explained the effect of this property in the MapReduce description
above but to reiterate: The reduce side fetches map outputs to memory.
Once the memory is full or this many map outputs are in memory they are
merged together to one file on the disk. This can be set on a per job
basis but as a default we've disabled this behavior and just flush to
disk when the memory is full. This seems to have been better for all our
jobs so far but it's definitely a property to look out for.
mapred.job.shuffle.merge.percent
- Default:
0.66
- Renamed to
mapreduce.reduce.shuffle.merge.percent
in Hadoop 0.21
- Can be used in the client configuration
Once the memory buffer in the copy (shuffle) phase of the reduce task is
this full a background thread will start to merge all map outputs
collected in memory so far and write them to a single file on disk. This
is similar to what's happening on the map side. In the default
configuration the
mapred.inmem.merge.threshold
parameter
might actually trigger a merge before this value is hit. We haven't yet
played around with this property but you'd have to be careful to turn it
not too high so that the copy processes have to wait for the buffer to
be empty again. That could be a huge performance hit.
An addition to Hadoop's logging would be nice that lets us know how full the buffer is the moment a merge finishes.
mapred.job.shuffle.input.buffer.percent
- Default:
0.7
- Renamed to
mapreduce.reduce.shuffle.input.buffer.percent
in Hadoop 0.21
- Can be used in the client configuration
This is the amount of memory from the total available memory (specified
by mapred.child.java.opts) that's allocated for collecting map outputs
in memory on the reduce side. Another parameter we haven't played around
with but my guess would be that this can be easily set a little bit
higher.
mapred.job.reduce.input.buffer.percent
- Default:
0.0
- Renamed to
mapreduce.reduce.input.buffer.percent
in Hadoop 0.21
- Can be used in the client configuration
Usually map outputs would be written to disk when the sort phase (on the
reduce) ends. If you have reduce tasks that don't need a lot of memory
themselves you can set this to a higher value so that map outputs up to
this amount of memory (in percent of the total available memory) aren't
written to disk but kept in memory. This is obviously faster than an
intermediate spill to disk. Should be considered on a per-job basis.
mapred.map.tasks.speculative.execution
& mapred.reduce.tasks.speculative.execution
- Default:
true
- We:
false
- We will set this to final once we're in production
- Renamed to
mapreduce.map.speculative
& mapreduce.reduce.speculative
in Hadoop 0.21
- Can be used in the client configuration
Speculative Execution starts multiple instances of certain map or reduce
tasks when it detects certain circumstances (like an unusually slow
task or node) to avoid waiting for stragglers too long. This sounds like
a good idea and we've got it enabled at the moment but when we go to
production this will probably be disabled as it uses valuable resources
on the cluster that mostly goes to waste and while one job may finish
faster all the others have to wait longer.
mapred.job.reuse.jvm.num.tasks
- Default:
1
- Renamed to
mapreduce.job.jvm.numtasks
in Hadoop 0.21
- Can be used in the client configuration
Child JVMs are spawned for the map and reduce tasks. This parameter lets
you reuse these VMs for multiple tasks. The default value creates a new
JVM for each task which has some overhead (the book says about one
second per JVM). We've played around with it a bit and it can make
things faster but you've got to be careful with memory leaks and shared
state. Basically you should be sure that your jobs can handle this. If
you have a performance critical job you can play around with this but
we've had some OutOfMemory errors when using this so we're conservative
at the moment. If you set it to
-1
a JVM will never be destroyed.
tasktracker.http.threads
- Default:
40
- We:
80
- We set this to final
- Renamed to
mapreduce.tasktracker.http.threads
in Hadoop 0.21
The map output is fetched by the reducers from the TaskTrackers via
HTTP. This property lets you adjust the number of threads that server
those requests. When we upped the parallel copies we had some errors
about fetch-failures so we slowly increased this value. Those two
parameters need to be carefully tuned. 80 seemed to cause no problems
for us so we stuck to it for now. You have to restart your TaskTrackers
after changing this value.
mapred.compress.map.output
- Default:
false
- We:
true
- Renamed to
mapreduce.map.output.compress
in Hadoop 0.21
- Can be used in the client configuration
Turning this on will compress the output of your Mappers using
SequenceFile compression. Depending on the codec you chose this
computation may be CPU intensive and result in varying degrees of
compression. We've benchmarked jobs of different sizes with this
intermediate compression enabled and disabled and while some of them
took slightly longer than before it is still good to enable it. The cost
isn't too high and there is a lot less intermediate data generated.
Less I/O in general is good especially if multiple jobs are running.
mapred.map.output.compression.codec
Default: org.apache.hadoop.io.compress.DefaultCodec
- We:
com.hadoop.compression.lzo.LzoCodec
- Renamed to
mapreduce.map.output.compress.codec
in Hadoop 0.21
- Can be used in the client configuration
With this property you specify the specific compression codec to use for
Map output compression. So far we've only tried LZO. This choice was
based on the experience of others and the general properties of the
algorithm being very fast but sacrificing a bit of
compression efficiency for its speed. We plan to test the other
algorithms as well.
mapred.hosts
- Default: no default set
- We:
/etc/hadoop/conf/allowed_hosts
- Renamed to
mapreduce.jobtracker.hosts.filename
in Hadoop 0.21
- We set this to final
This is the same as
dfs.hosts
just specifies which
TaskTrackers are allowed to get work from the JobTracker. They both have
the same format so it's quite common for them to be the same file.
Conclusion
After setting up all these parameters the way you like them you should
have a fully functional but basic Hadoop cluster running. You can submit
jobs, use HDFS etc. But there are a few more things that we can do like
installing Hive, Hue, Pig, Sqoop, etc. We've also yet to cover Puppet.
All of this is hopefully forthcoming in more blog posts in the future.
We're also very interested in other users (or interested people and
companies) of Hadoop, HBase & Co. in Scandinavia who would be
interested in a Hadoop Meetup. We're located in Copenhagen. Contact us
if you're interested.
If you have any questions or spot any problems or mistakes please let me know in the comments or by mail.