Pages

Monday, 2 February 2015

Loading Twitter Data into Hive and processing

In this post, we will load our tweets into Hive and query them to learn about our little world.
To load our tweet-JSON into Hive, we’ll use the rcongiu Hive-JSON-Serde. Download and build it via:
wget http://www.datanucleus.org/downloads/maven2/javax/jdo/jdo2-api/2.3-ec/jdo2-api-2.3-ec.jar
mvn install:install-file -DgroupId=javax.jdo -DartifactId=jdo2-api \
  -Dversion=2.3-ec -Dpackaging=jar -Dfile=jdo2-api-2.3-ec.jar
mvn package
Find the jar it generated via:
find .|grep jar
./target/json-serde-1.1.4-jar-with-dependencies.jar
./target/json-serde-1.1.4.jar
Run hive, and create our table with the following commands:
add jar /path/to/my/Hive-Json-Serde/target/json-serde-1.1.4-jar-with-dependencies.jar;

create table tweets (
   created_at string,
   entities struct <
      hashtags: array ,
            text: string>>,
      media: array ,
            media_url: string,
            media_url_https: string,
            sizes: array >,
            url: string>>,
      urls: array ,
            url: string>>,
      user_mentions: array ,
            name: string,
            screen_name: string>>>,
   geo struct <
      coordinates: array ,
      type: string>,
   id bigint,
   id_str string,
   in_reply_to_screen_name string,
   in_reply_to_status_id bigint,
   in_reply_to_status_id_str string,
   in_reply_to_user_id int,
   in_reply_to_user_id_str string,
   retweeted_status struct <
      created_at: string,
      entities: struct <
         hashtags: array ,
               text: string>>,
         media: array ,
               media_url: string,
               media_url_https: string,
               sizes: array >,
               url: string>>,
         urls: array ,
               url: string>>,
         user_mentions: array ,
               name: string,
               screen_name: string>>>,
      geo: struct <
         coordinates: array ,
         type: string>,
      id: bigint,
      id_str: string,
      in_reply_to_screen_name: string,
      in_reply_to_status_id: bigint,
      in_reply_to_status_id_str: string,
      in_reply_to_user_id: int,
      in_reply_to_user_id_str: string,
      source: string,
      text: string,
      user: struct <
         id: int,
         id_str: string,
         name: string,
         profile_image_url_https: string,
         protected: boolean,
         screen_name: string,
         verified: boolean>>,
   source string,
   text string,
   user struct <
      id: int,
      id_str: binary,
      name: string,
      profile_image_url_https: string,
      protected: boolean,
      screen_name: string,
      verified: boolean>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
Load it full of data from the tweet JSON file we created last tutorial:
LOAD DATA LOCAL INPATH '/path/to/all_tweets.json' OVERWRITE INTO TABLE tweets;
Verify our data loaded with a count:
SELECT COUNT(*) from tweets;
OK
24655
Our tweets are loaded! Some fun queries to run:
    • Sample some tweets
SELECT text from tweets limit 5
Which gets us:
OK
Paddled out, tried to psyche myself into wave for 30 minutes...
Waves twice as tall as me are scary
No waves here yet, nap time
Doin 80 on i10w
Gustav and panama city beach here I come
    • Top people we reply to:
SELECT in_reply_to_screen_name, 
  COUNT(*) as total from tweets 
  GROUP BY in_reply_to_screen_name 
  ORDER BY total DESC 
  LIMIT 30;
Which gets us the top N people I reply to:
OK
NULL 13447
sanjay 356
Urvaksh 282
ChrisDiehl 268
pfreet 230
mikeschinkel 222
mmealling 193
keithmcgreggor 191
peteskomoroch 183
semil 183
...
Hive has some builtin n-gram analysis utilities, documented here that we can use. For example:
SELECT sentences(lower(text)) FROM tweets;
[["dear","twitter","send","me","my","tweets","plz","you","promised","me"]]
[["pig","eye","for","the","sql","guy","http","t.co","vjx4rcugix","via","sharethis"]]
[["rt","hortonworks","pig","eye","for","the","sql","guy","with","mortardata","http","t.co","vnkwsswnkv","hadoop"]]
We can use these to do n-gram analysis:
SELECT ngrams(sentences(lower(text), 3, 10) FROM tweets;
Which is kind of amusing:
[{"ngram":["http","instagr.am","p"],"estfrequency":136.0},
{"ngram":["i","want","to"],"estfrequency":100.0},
{"ngram":["on","hacker","news"],"estfrequency":92.0},
{"ngram":["you","have","to"],"estfrequency":66.0},
{"ngram":["a","lot","of"],"estfrequency":65.0},
{"ngram":["i","need","to"],"estfrequency":63.0},
{"ngram":["is","looking","for"],"estfrequency":59.0},
{"ngram":["hortonworks","is","looking"],"estfrequency":59.0},
{"ngram":["there","is","no"],"estfrequency":56.0},{"ngram":["is","there","a"],"estfrequency":53.0}]
You can see common phrases, as well as hortonworks job offerings that are auto-tweeted, and of course – ‘on hacker news’ – talking about Hacker News :)
We can also check out our tweets that are RTs.
SELECT retweeted_status.user.screen_name, COUNT(*) as total 
  FROM tweets 
  WHERE retweeted_status.user is not null 
  GROUP BY retweeted_status.user.screen_name 
  ORDER BY total desc 
  LIMIT 20;
This gets me:
OK
peteskomoroch 99
hortonworks 97
ChrisDiehl 56
newsycombinator 55
newsyc20 38
adamnash 31
bradfordcross 29
kjurney 29
Once we have our tweets in Hive, there’s no limit to what we can do to them! This is what Hive excels at.

Note:- Objective to copy this blog is to make central repo for hadoop developer. 

Friday, 23 May 2014

Kafka Installation

Step 1: Download the Binary

Download the 0.8.1.1 Release

We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.9.2 is recommended I.E. kafka_2.9.2-0.8.1.1.tgz).

tar xzf kafka-<VERSION>.tgz
cd kafka-<VERSION>



This tutorial assumes you are starting on a fresh zookeeper instance with no pre-existing data.

Step 2: Start the server

Kafka uses zookeeper so you need to first start a zookeeper server if you don't already have one. You can use the convenience script packaged with Kafka to get a quick-and-dirty single-node zookeeper instance.
  • Start Zookeeper-server if stopped.


bin/zookeeper-server-start.sh /etc/zookeeper/conf/zoo.cfg
  • Now start the Kafka server:


bin/kafka-server-start.sh config/server.properties


Step 3: Create a topic

Let's create a topic named "test" with a single partition and only one replica:
 bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1
 
We can now see that topic if we run the list topic command:

bin/kafka-topics.sh --list --zookeeper localhost:2181

Alternatively, you can also configure your brokers to auto-create topics when a non-existent topic is published to.

Step 4: Send some messages

Kafka comes with a command line client that will take input from a file or standard in and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message.

Run the producer and then type a few messages to send to the server.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is msg 1
This is msg 2


Step 5: Start a consumer
Kafka also has a command line consumer that will dump out messages to standard out.
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is msg 1
This is msg 2

If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
All the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.

Step 6: Setting up a multi-broker cluster
So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).
First we make a config file for each of the brokers:
cp config/server.properties config/server-1.properties 
cp config/server.properties config/server-2.properties
 
Now edit these new files and set the following properties:

config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2

The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from trying to all register on the same port or overwrite each other’s data.

We already have Zookeeper and our single node started, so we just need to start the two new nodes. However, this time we have to override the JMX port used by java too to avoid clashes with the running node:
JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties &
JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &

 
Okay but now those we have a cluster how can we know which broker is doing what? To see that run the "list topics" command:

bin/kafka-topics.sh --list --zookeeper localhost:2181
test
topic1
topic2
topic3

Here is an explanation of output:
  • "leader" is the node responsible for all reads and writes for the given partition. Each node would be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that are supposed to server the log for this partition regardless of whether they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
Note that both topics we created have only a single partition (partition 0). The original topic has no replicas and so it is only present on the leader (node 0), the replicated topic is present on all three nodes with node 1 currently acting as leader and all replicas in sync.

Some Tips
As before let's publish a few messages message:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic
...
my test message 1
my test message 2
^C

Now consume this message:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic myTopic

my test message 1
my test message 2
^C

Now let's test out fault-tolerance. Kill the broker acting as leader for this topic's only partition:

pkill -9 -f server-1.properties

Leadership should switch to one of the slaves:

bin/kafka-list-topic.sh --zookeeper localhost:2181

topic: myTopic partition: 0   leader: 2      replicas: 1,2,0 isr: 2
topic: test    partition: 0   leader: 0      replicas: 0    isr: 0

And the messages should still be available for consumption even though the leader that took the writes originally is down:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic myTopic

my test message 1
my test message 2



Saturday, 17 May 2014

Setting up and installing Hadoop daemons on Windows 7


Steps:

1. Download 'setup.exe' from Cygwin website
2. Right-click on 'setup.exe'
3. Leave settings as they are, click through until you come to the plugin selection window
                    3.1 - Make sure that the installation directory is 'C:\cygwin'
4. In the plugin selection window, under 'Net', click on 'openssh' to install it
5. Click 'Next', then go do something productive while installation runs its course.
6. Once installed, go to Start -> All Programs -> Cygwin, right-click on the subsequent shortcut and select the option to 'Run as Administrator'

7. In your cygwin window, type in the following commands:
                    $ chmod +r  /etc/passwd
                    $ chmod u+w /etc/passwd
                    $ chmod +r  /etc/group
                    $ chmod u+w /etc/group
                    $ chmod 755 /var
                    $ touch /var/log/sshd.log
                    $ chmod 664 /var/log/sshd.log

                    This is needed in order to allow the sshd account to operate without a passphrase, which is required for Hadoop to work.

8. Once the prompt window opens up, type 'ssh-host-config' and hit Enter
9. Should privilege separation be used? NO
10. Name of service to install: sshd
11. Do you want to install sshd as a service? YES
12. Enter the value of CYGWIN for the daemon: <LEAVE BLANK, JUST HIT ENTER>
13. Do you want to use a different name? (default is 'cyg_server'): NO
14. Please enter the password for user 'cyg_server': <LEAVE BLANK, JUST HIT ENTER>
15. Reenter: <LEAVE BLANK, JUST HIT ENTER>

At this point the ssh service should be installed, to run under the 'cyg_server' account. Don't worry, this will all be handled under the hood.

To start the ssh service, type in 'net start sshd' in your cygwin window. When you log in next time, this will automatically run.

To test, type in 'ssh localhost' in your cygwin window. You should not be prompted for anything.

=================================================================
INSTALLING AND CONFIGURING HADOOP
=================================================================

This is assuming the installation of version 0.20.2 of Hadoop. Newer versions do not get along with Windows 7 (mainly, the tasktracker daemon which requires permissions to be set that are inherently not allowed by Windows 7, but are required by more recent versions of Hadoop e.g. 0.20.20x.x)

1. Download the stable version 0.20.2 of Hadoop
2. Using 7-Zip (you should download this if you have not already, and it should be your default archive browser), open up the archive file. Copy the top level directory from the archive file and paste it into your home directory in C:/cygwin. This is usually something like C:/cygwin/home/{username}
3. Once copied into your cygwin home directory, navigate to {hadoop-home}/conf. Open the following files for editing in your favorite editor (I strongly suggest Notepad++ ... why would you use anything else):
                    * core-site.xml
                    * hdfs-site.xml
                    * mapred-site.xml
                    * hadoop-env.sh
                   
4. Make the following additions to the corresponding files:
                    * core-site.xml (inside the configuration tags)
                                         <property>
                                                             <name>fs.default.name</name>
                                                             <value>localhost:9100</value>
                                         </property>
                    * mapred-site.xml (inside the configuration tags)
                                         <property>
                                                             <name>mapred.job.tracker</name>
                                                             <value>localhost:9101</value>
                                         </property>
                    * hdfs-site.xml (inside the configuration tags)
                                         <property>
                                                             <name>dfs.replication</name>
                                                             <value>1</value>
                                         </property>
                    * hadoop-env.sh
                                         * uncomment the JAVA_HOME export command, and set the path to your Java home (typically C:/Program Files/Java/{java-home}

5. In a cygwin window, inside your top-level hadoop directory, it's time to format your Hadoop file system. Type in 'bin/hadoop namenode -format' and hit enter. This will create and format the HDFS.

6. Now it is time to start all of the hadoop daemons that will simulate a distributed system, type in: 'bin/start-all.sh' and hit enter.

You should not receive any errors (there may be some messages about not being able to change to the home directory, but this is ok).

Double check that your HDFS and JobTracker is up and running properly by visiting http://localhost:50070 and http://localhost:50030, respectively.

To make sure everything is up and running properly, let's try a regex example.

7. From the top level hadoop directory, type in the following set of commands:

                    $ bin/hadoop dfs -mkdir input
                    $ bin/hadoop dfs -put conf input
                    $ bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+'
                    $ bin/hadoop dfs -cat output/*
                   
                    This should display the output of the job (finding any word that matched the regex pattern above).
                   
8. Assuming no errors, you are all set to set up your Eclipse environment.

FYI, you can stop all your daemons by typing in 'bin/stop-all.sh', but keep it running for now as we move on to the next step.

=================================================================
CONFIGURING  HADOOP PLUGIN FOR ECLIPSE
=================================================================

1. Download Eclipse Indigo
2. Download the hadoop plugin jar located at: https://issues.apache.org/jira/browse/MAPREDUCE-1280
                    The file name is 'hadoop-eclipse-plugin-0.20.3-SNAPSHOT.jar'
                    For hadoop 2.x.x Click Here
                    Normally you could use the plugin provided in the 0.20.2 contrib folder that comes with Hadoop, however that plugin is out of date.
3. Copy the downloaded jar and paste it into your Eclipse plugins directory (e.g. C:/eclipse/plugins)
4. In a regular command prompt, navigate to your eclipse folder (e.g. 'cd C:/eclipse')
5. Type in 'eclipse -clean' and hit enter
6. Once Eclipse is open, open a new perspective (top right corner) and select 'Other'. From the list, select 'MapReduce'.
7. Go to Window -> Show View, and select Map/Reduce. This will open a view window for Map/Reduce Locations
8. Now you are ready to tie in Eclipse with your existing HDFS that you formatted and configured earlier. Right click in the Map/Reduce Locations view and select 'New Hadoop Location'
9. In the window that appears, type in 'localhost' for the Location name. Under Map/Reduce Master, type in localhost for the Host, and 9101 for the Port. For DFS Master, make sure the 'Use M/R Master host' checkbox is selected, and type in 9100 for the Port. For the User name, type in 'User'. Click 'Finish'
10. In the Project Explorer window on the left, you should now be able to expand the DFS Locations tree and see your new location. Continue to expand it and you should see something like the following file structure:

                    DFS Locations
                                         -> (1)
                                                             -> tmp (1)
                                                                                 -> hadoop-{username} (1)
                                                                                                      -> mapred (1)
                                                                                                                          -> system(1)
                                                                                                                                              -> jobtracker.info


At this point you can create directories and files and upload them to the HDFS from Eclipse, or you can create them through the cygwin window as you did in step 7 in the previous section.

=================================================================
CREATING YOUR FIRST HADOOP PROJECT IN ECLIPSE
=================================================================
1. Open up the Java perspective
2. In the Project Explorer window, select New -> Project...
3. From the list that appears, select Map/Reduce Project
4. Provide a project name, and then click on the link that says 'Configure Hadoop install directory'
                    4.1 Browse to the top-level hadoop directory that is located in cygwin (e.g. C:\cygwin\home\{username}\{hadoop directory})
                    4.2 Click 'OK'
5. Click 'Finish'
6. You will notice that Hadoop projects in Eclipse are simple Java projects in terms of file structure. Now let's add a class.
7. Right click on the project, and selet New -> Other
8. From the Map/Reduce folder in the list, select 'MapReduce Driver'. This will generate a class for you.

At this point, you are all set to go, now it's time to learn all about MapReduce, which is outside the context of this documentation. Enjoy and have fun.