Pages

Friday 23 May 2014

Kafka Installation

Step 1: Download the Binary

Download the 0.8.1.1 Release

We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.9.2 is recommended I.E. kafka_2.9.2-0.8.1.1.tgz).

tar xzf kafka-<VERSION>.tgz
cd kafka-<VERSION>



This tutorial assumes you are starting on a fresh zookeeper instance with no pre-existing data.

Step 2: Start the server

Kafka uses zookeeper so you need to first start a zookeeper server if you don't already have one. You can use the convenience script packaged with Kafka to get a quick-and-dirty single-node zookeeper instance.
  • Start Zookeeper-server if stopped.


bin/zookeeper-server-start.sh /etc/zookeeper/conf/zoo.cfg
  • Now start the Kafka server:


bin/kafka-server-start.sh config/server.properties


Step 3: Create a topic

Let's create a topic named "test" with a single partition and only one replica:
 bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1
 
We can now see that topic if we run the list topic command:

bin/kafka-topics.sh --list --zookeeper localhost:2181

Alternatively, you can also configure your brokers to auto-create topics when a non-existent topic is published to.

Step 4: Send some messages

Kafka comes with a command line client that will take input from a file or standard in and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message.

Run the producer and then type a few messages to send to the server.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is msg 1
This is msg 2


Step 5: Start a consumer
Kafka also has a command line consumer that will dump out messages to standard out.
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is msg 1
This is msg 2

If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
All the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.

Step 6: Setting up a multi-broker cluster
So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).
First we make a config file for each of the brokers:
cp config/server.properties config/server-1.properties 
cp config/server.properties config/server-2.properties
 
Now edit these new files and set the following properties:

config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2

The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from trying to all register on the same port or overwrite each other’s data.

We already have Zookeeper and our single node started, so we just need to start the two new nodes. However, this time we have to override the JMX port used by java too to avoid clashes with the running node:
JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties &
JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &

 
Okay but now those we have a cluster how can we know which broker is doing what? To see that run the "list topics" command:

bin/kafka-topics.sh --list --zookeeper localhost:2181
test
topic1
topic2
topic3

Here is an explanation of output:
  • "leader" is the node responsible for all reads and writes for the given partition. Each node would be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that are supposed to server the log for this partition regardless of whether they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
Note that both topics we created have only a single partition (partition 0). The original topic has no replicas and so it is only present on the leader (node 0), the replicated topic is present on all three nodes with node 1 currently acting as leader and all replicas in sync.

Some Tips
As before let's publish a few messages message:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic
...
my test message 1
my test message 2
^C

Now consume this message:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic myTopic

my test message 1
my test message 2
^C

Now let's test out fault-tolerance. Kill the broker acting as leader for this topic's only partition:

pkill -9 -f server-1.properties

Leadership should switch to one of the slaves:

bin/kafka-list-topic.sh --zookeeper localhost:2181

topic: myTopic partition: 0   leader: 2      replicas: 1,2,0 isr: 2
topic: test    partition: 0   leader: 0      replicas: 0    isr: 0

And the messages should still be available for consumption even though the leader that took the writes originally is down:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic myTopic

my test message 1
my test message 2



Saturday 17 May 2014

Setting up and installing Hadoop daemons on Windows 7


Steps:

1. Download 'setup.exe' from Cygwin website
2. Right-click on 'setup.exe'
3. Leave settings as they are, click through until you come to the plugin selection window
                    3.1 - Make sure that the installation directory is 'C:\cygwin'
4. In the plugin selection window, under 'Net', click on 'openssh' to install it
5. Click 'Next', then go do something productive while installation runs its course.
6. Once installed, go to Start -> All Programs -> Cygwin, right-click on the subsequent shortcut and select the option to 'Run as Administrator'

7. In your cygwin window, type in the following commands:
                    $ chmod +r  /etc/passwd
                    $ chmod u+w /etc/passwd
                    $ chmod +r  /etc/group
                    $ chmod u+w /etc/group
                    $ chmod 755 /var
                    $ touch /var/log/sshd.log
                    $ chmod 664 /var/log/sshd.log

                    This is needed in order to allow the sshd account to operate without a passphrase, which is required for Hadoop to work.

8. Once the prompt window opens up, type 'ssh-host-config' and hit Enter
9. Should privilege separation be used? NO
10. Name of service to install: sshd
11. Do you want to install sshd as a service? YES
12. Enter the value of CYGWIN for the daemon: <LEAVE BLANK, JUST HIT ENTER>
13. Do you want to use a different name? (default is 'cyg_server'): NO
14. Please enter the password for user 'cyg_server': <LEAVE BLANK, JUST HIT ENTER>
15. Reenter: <LEAVE BLANK, JUST HIT ENTER>

At this point the ssh service should be installed, to run under the 'cyg_server' account. Don't worry, this will all be handled under the hood.

To start the ssh service, type in 'net start sshd' in your cygwin window. When you log in next time, this will automatically run.

To test, type in 'ssh localhost' in your cygwin window. You should not be prompted for anything.

=================================================================
INSTALLING AND CONFIGURING HADOOP
=================================================================

This is assuming the installation of version 0.20.2 of Hadoop. Newer versions do not get along with Windows 7 (mainly, the tasktracker daemon which requires permissions to be set that are inherently not allowed by Windows 7, but are required by more recent versions of Hadoop e.g. 0.20.20x.x)

1. Download the stable version 0.20.2 of Hadoop
2. Using 7-Zip (you should download this if you have not already, and it should be your default archive browser), open up the archive file. Copy the top level directory from the archive file and paste it into your home directory in C:/cygwin. This is usually something like C:/cygwin/home/{username}
3. Once copied into your cygwin home directory, navigate to {hadoop-home}/conf. Open the following files for editing in your favorite editor (I strongly suggest Notepad++ ... why would you use anything else):
                    * core-site.xml
                    * hdfs-site.xml
                    * mapred-site.xml
                    * hadoop-env.sh
                   
4. Make the following additions to the corresponding files:
                    * core-site.xml (inside the configuration tags)
                                         <property>
                                                             <name>fs.default.name</name>
                                                             <value>localhost:9100</value>
                                         </property>
                    * mapred-site.xml (inside the configuration tags)
                                         <property>
                                                             <name>mapred.job.tracker</name>
                                                             <value>localhost:9101</value>
                                         </property>
                    * hdfs-site.xml (inside the configuration tags)
                                         <property>
                                                             <name>dfs.replication</name>
                                                             <value>1</value>
                                         </property>
                    * hadoop-env.sh
                                         * uncomment the JAVA_HOME export command, and set the path to your Java home (typically C:/Program Files/Java/{java-home}

5. In a cygwin window, inside your top-level hadoop directory, it's time to format your Hadoop file system. Type in 'bin/hadoop namenode -format' and hit enter. This will create and format the HDFS.

6. Now it is time to start all of the hadoop daemons that will simulate a distributed system, type in: 'bin/start-all.sh' and hit enter.

You should not receive any errors (there may be some messages about not being able to change to the home directory, but this is ok).

Double check that your HDFS and JobTracker is up and running properly by visiting http://localhost:50070 and http://localhost:50030, respectively.

To make sure everything is up and running properly, let's try a regex example.

7. From the top level hadoop directory, type in the following set of commands:

                    $ bin/hadoop dfs -mkdir input
                    $ bin/hadoop dfs -put conf input
                    $ bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+'
                    $ bin/hadoop dfs -cat output/*
                   
                    This should display the output of the job (finding any word that matched the regex pattern above).
                   
8. Assuming no errors, you are all set to set up your Eclipse environment.

FYI, you can stop all your daemons by typing in 'bin/stop-all.sh', but keep it running for now as we move on to the next step.

=================================================================
CONFIGURING  HADOOP PLUGIN FOR ECLIPSE
=================================================================

1. Download Eclipse Indigo
2. Download the hadoop plugin jar located at: https://issues.apache.org/jira/browse/MAPREDUCE-1280
                    The file name is 'hadoop-eclipse-plugin-0.20.3-SNAPSHOT.jar'
                    For hadoop 2.x.x Click Here
                    Normally you could use the plugin provided in the 0.20.2 contrib folder that comes with Hadoop, however that plugin is out of date.
3. Copy the downloaded jar and paste it into your Eclipse plugins directory (e.g. C:/eclipse/plugins)
4. In a regular command prompt, navigate to your eclipse folder (e.g. 'cd C:/eclipse')
5. Type in 'eclipse -clean' and hit enter
6. Once Eclipse is open, open a new perspective (top right corner) and select 'Other'. From the list, select 'MapReduce'.
7. Go to Window -> Show View, and select Map/Reduce. This will open a view window for Map/Reduce Locations
8. Now you are ready to tie in Eclipse with your existing HDFS that you formatted and configured earlier. Right click in the Map/Reduce Locations view and select 'New Hadoop Location'
9. In the window that appears, type in 'localhost' for the Location name. Under Map/Reduce Master, type in localhost for the Host, and 9101 for the Port. For DFS Master, make sure the 'Use M/R Master host' checkbox is selected, and type in 9100 for the Port. For the User name, type in 'User'. Click 'Finish'
10. In the Project Explorer window on the left, you should now be able to expand the DFS Locations tree and see your new location. Continue to expand it and you should see something like the following file structure:

                    DFS Locations
                                         -> (1)
                                                             -> tmp (1)
                                                                                 -> hadoop-{username} (1)
                                                                                                      -> mapred (1)
                                                                                                                          -> system(1)
                                                                                                                                              -> jobtracker.info


At this point you can create directories and files and upload them to the HDFS from Eclipse, or you can create them through the cygwin window as you did in step 7 in the previous section.

=================================================================
CREATING YOUR FIRST HADOOP PROJECT IN ECLIPSE
=================================================================
1. Open up the Java perspective
2. In the Project Explorer window, select New -> Project...
3. From the list that appears, select Map/Reduce Project
4. Provide a project name, and then click on the link that says 'Configure Hadoop install directory'
                    4.1 Browse to the top-level hadoop directory that is located in cygwin (e.g. C:\cygwin\home\{username}\{hadoop directory})
                    4.2 Click 'OK'
5. Click 'Finish'
6. You will notice that Hadoop projects in Eclipse are simple Java projects in terms of file structure. Now let's add a class.
7. Right click on the project, and selet New -> Other
8. From the Map/Reduce folder in the list, select 'MapReduce Driver'. This will generate a class for you.

At this point, you are all set to go, now it's time to learn all about MapReduce, which is outside the context of this documentation. Enjoy and have fun.


Friday 16 May 2014

Multiple threads in a mapper i.e. MultithreadedMapper


 As the name suggests it is map task that spawns multiple threads. A map task can be considered as a process which runs on its own jvm boundary. Multithreaded spawns multiple threads within the same map task. Don’t confuse the same as multiple tasks within the same jvm (this is achieved with jvm reuse). When I say a task has multiple threads, a task would be reusing the input split as defined by the input format and record reader reads the input like a normal map task. The multi threading happens after this stage; once the record reading has happened then the input/task is divided into multiple threads. (ie the input IO is not multi threaded and multiple threads come into picture after that)
MultiThreadedMapper is a good fit if your operation is highly CPU intensive and multiple threads getting multiple cycles could help in speeding up the task. If IO intensive, then running multiple tasks is much better than multi thread as in multiple tasks multiple IO reads would be happening in parallel.
Let us see how we can use MultiThreadedMapper. There are different ways to do the same in old mapreduce API and new API.

Old API
Enable Multi threaded map runner as
-D mapred.map.runner.class = org.apache.hadoop.mapred.lib.MultithreadedMapRunner
Or
jobConf.setMapRunnerClass(org.apache.hadoop.mapred.lib.MultithreadedMapRunner);

New API
Your mapper class should sub class (extend) org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper instead of org.apache.hadoop.mapreduce.Mapper . The Multithreadedmapper has a different implementation of run() method.

You can set the number of threads within a mapper in MultiThreadedMapper by

MultithreadedMapper.setNumberOfThreads(n); or
mapred.map.multithreadedrunner.threads = n


Note: Don’t think it in a way that multi threaded mapper is better than normal map reduce as it spawns less jvms and less number of processes. If a mapper is loaded with lots of threads the chances of that jvm crashing are more and the cost of re-execution of such a hadoop task would be terribly high.
            Don’t use Multi Threaded Mapper to control the number of jvms spanned, if that is your goal you need to tweak the mapred.job.reuse.jvm.num.tasks parameter whose default value is 1, means no jvm reuse across tasks.
            The threads are at the bottom level ie within a map task and the higher levels on hadoop framework like the job has no communication regarding the same.


Friday 2 May 2014

Hadoop Commands

hadoop command [genericOptions] [commandOptions]

hadoop fs
Usage: java FsShell
 [-ls <path>]
 [-lsr <path>]
 [-df [<path>]]
 [-du <path>]
 [-dus <path>]
 [-count[-q] <
path>]
 [-mv <src> <dst>]
 [-cp <src> <dst>]
 [-rm [-skipTrash] <path>]
 [-rmr [-skipTrash] <path>]
 [-expunge]
 [-put <localsrc> ... <dst>]
 [-copyFromLocal <localsrc> ... <dst>]
 [-moveFromLocal <localsrc> ... <dst>]
 [-get [-ignoreCrc] [-crc] <src> <localdst>]
 [-getmerge <src> <localdst> [addnl]]
 [-cat <src>]
 [-text <src>]
 [-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>]
 [-moveToLocal [-crc] <src> <localdst>]
 [-mkdir <path>]
 [-setrep [-R] [-w] <rep> <path/file>]
 [-touchz <path>]
 [-kiran -[ezd] <path>]
 [-stat [format] <path>]
 [-tail [-f] <file>]
 [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
 [-chown [-R] [
OWNER][:[GROUP]] PATH...]
 [-chgrp [-R] GROUP PATH...]
 [-help [cmd]]

hadoop fs -ls /
hadoop fs -ls /kiran1/
hadoop fs -cat /kiran1/foo.txt
hadoop fs -rm /kiran1/foo.txt
hadoop fs -mkdir /kiran6/
hadoop fs -put foo.txt /kiran6/
hadoop fs -put /etc/note.txt /kiran2/note_fs.txt
hadoop fs -get /user/kiran/passwd ./
hadoop fs -setrep 5 -R /user/kiran/tmp/
hadoop fsck /user/kiran/tmp -files -blocks -locations
hadoop fs -chmod 1777 /tmp
hadoop fs -touch /user/kiran/kiran/foo
hadoop fs -rmr /user/kiran/kiran/foo
hadoop fs -touchz /user/kiran/kiran/bar
hadoop fs -count -q /user/kiran

hadoop -execute start-all.sh

hadoop job -list
hadoop job -kill jobID
hadoop job -list-attempt-ids jobID taskType taskState
hadoop job -kill-task taskAttemptId

hadoop namenode -format

hadoop jar <jar_file> wordcount <output_file>
hadoop jar /opt/hadoop/hadoop-examples-1.0.4.jar wordcount /out/wc_output

hadoop dfsadmin -report
hadoop dfsadmin -setSpaceQuota 10737418240 /user/esammer
hadoop dfsadmin -refreshNodes
hadoop dfsadmin -upgradeProgress status
hadoop dfsadmin -finalizeUpgrade

hadoop fsck
Usage: DFSck <path> [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]
 <path>  -- start checking from this path
 -move  -- move
corrupted files to /lost+found
 -delete  -- delete corrupted files
 -files   --
print out files being checked
 -openforwrite --
print out files opened for write
 -blocks  --
print out block report
 -locations  --
print outlocations for every block
 -racks  --
print out network topology for data-node locations
By default fsck ignores files opened for write, use -openforwrite to report such files. They are usually tagged CORRUPT or HEALTHY depending on their block allocation status.
hadoop fsck / -files -blocks -locations
hadoop fsck /user/kiran -files -blocks -locations

hadoop distcp                -- Distributed Copy (distcp)
distcp [OPTIONS] <srcurl>* <desturl>
 OPTIONS:
 -p[rbugp] Preserve status
 r: replication number
 b: block size
 u: user
 g: group
 p: permission
 -p alone is equivalent to -prbugp
 -i Ignore failures
 -log <logdir> Write logs to <logdir>
 -m <num_maps> Maximum number of simultaneous copies
 -overwrite Overwrite destination
 -update Overwrite if src size different from dst size
 -skipcrccheck Do not use CRC
check to determine if src is different from dest. Relevant only if -update is specified
 -f <urilist_uri> Use list at <urilist_uri> as src list
 -filelimit <n> Limit the total number of files to be <= n
 -sizelimit <n> Limit the total size to be <= n bytes
 -delete Delete the files
existing in the dst but not in src
 -mapredSslConf <f>
Filename of SSL configuration for mapper task

NOTE 1: if -overwrite or -update are set, each source URI is interpreted as an
isomorphic update to an existing directory.
For example:
hadoop distcp -p -update "hdfs://A:8020/user/foo/bar" "hdfs://B:8020/user/foo/baz"
would update all descendants of 'baz' also in 'bar'; it would *not* update /user/foo/baz/bar

NOTE 2: The parameter <n> in -filelimit and -sizelimit can be specified with symbolic representation. For examples,
1230k = 1230 * 1024 = 1259520
891g = 891 * 1024^3 = 956703965184

hadoop distcp hdfs://A:8020/path/one hdfs://B:8020/path/two
hadoop distcp /path/one /path/two