Breaking

Tuesday 17 November 2015

POC #: Analyse social bookmarking sites to find insights

 Industry: Social Media


Data: It comprises of the information gathered from sites which are bookmarking sites and allow you to bookmark, review, rate, on specific topic. A bookmarking site allows you to bookmark, review, rate, search various links on any topic. The data is in XML format and contains various categories defining it and the ratings linked with it.
Problem Statement: Analyse the data in Hadoop Eco-system to:
1. Fetch the data into Hadoop Distributed File System and analyze it with the help of MapReduce, Pig and Hive to find the top rated links based on the user comments, likes etc.
2. Using MapReduce convert the semi-structured format (XML data) into structured
3. Push the (MapReduce) output HDFS and then feed it into PIG, which splits the data into two parts: Category data and Ratings data.
4. Write a fancy Hive Query to analyze the data further and push the output is into relational database (RDBMS) using Sqoop.

POC Coding Details

Input Data (XML Files)


      MapReduce Code to convert XML File to Flat File or Comma Separated File.

MyMapper.java
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

public class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private static final Log LOG = LogFactory.getLog(MyMapper.class);

    // Fprivate Text videoName = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        try {

            InputStream is = new ByteArrayInputStream(value.toString().getBytes());
            DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
            DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
            Document doc = dBuilder.parse(is);

            doc.getDocumentElement().normalize();

            NodeList nList = doc.getElementsByTagName("book");

            for (int temp = 0; temp < nList.getLength(); temp++) {

                Node nNode = nList.item(temp);

                if (nNode.getNodeType() == Node.ELEMENT_NODE) {

                    Element eElement = (Element) nNode;

                    String id = eElement.getElementsByTagName("id").item(0).getTextContent();
                    String author = eElement.getElementsByTagName("author").item(0).getTextContent();
                    String title = eElement.getElementsByTagName("title").item(0).getTextContent();
                    String genre = eElement.getElementsByTagName("genre").item(0).getTextContent();
                    String price = eElement.getElementsByTagName("price").item(0).getTextContent();
                    String publish_date = eElement.getElementsByTagName("publish_date").item(0).getTextContent();
                    String descriptions = eElement.getElementsByTagName("descriptions").item(0).getTextContent();
                    String review = eElement.getElementsByTagName("review").item(0).getTextContent();
                    String rate = eElement.getElementsByTagName("rate").item(0).getTextContent();
                    String comments = eElement.getElementsByTagName("comments").item(0).getTextContent();
            
                    context.write(new Text(id + "," + author + "," + title + "," + genre + "," + price + "," + publish_date + "," + descriptions + "," + review + "," + rate + "," + comments), NullWritable.get());

                }
            }
        } catch (Exception e) {
              throw new IOException(e);
        }

    }

}

XMLDriver.java

import java.io.IOException; 
import javax.xml.stream.XMLInputFactory;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class XMLDriver {

    /** Bhavesh - for processing XML file using Hadoop MapReduce
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        try {

            Configuration conf = new Configuration();
           
            String[] arg = new GenericOptionsParser(conf,args).getRemainingArgs();

            conf.set("START_TAG_KEY", "<book>");
            conf.set("END_TAG_KEY", "</book>");

            Job job = new Job(conf, "XML Processing");
            job.setJarByClass(XMLDriver.class);
            job.setMapperClass(MyMapper.class);

            job.setNumReduceTasks(0);

            job.setInputFormatClass(XMLInputFormat.class);
           

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);

            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            job.waitForCompletion(true);

        } catch (Exception e)
        {
              throw new IOException(e);
          
        }
      

    }

}

XMLInputFormat.java

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class XMLInputFormat extends TextInputFormat {
    public static final String START_TAG_KEY = "<book>";
    public static final String END_TAG_KEY = "</book>";

    /*Bhavesh - Creating XMLInputformat Class for reading XML File*/
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) {
        return new XmlRecordReader();
    }

    public static class XmlRecordReader extends
            RecordReader<LongWritable, Text> {
        private byte[] startTag;
        private byte[] endTag;
        private long start;
        private long end;
        private FSDataInputStream fsin;
        private DataOutputBuffer buffer = new DataOutputBuffer();
        private LongWritable key = new LongWritable();
        private Text value = new Text();

        @Override
        public void initialize(InputSplit is, TaskAttemptContext tac)
                throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit) is;
            String START_TAG_KEY = "<book>";
            String END_TAG_KEY = "</book>";
            startTag = START_TAG_KEY.getBytes("utf-8");
            endTag = END_TAG_KEY.getBytes("utf-8");

            start = fileSplit.getStart();
            end = start + fileSplit.getLength();
            Path file = fileSplit.getPath();

            FileSystem fs = file.getFileSystem(tac.getConfiguration());
            fsin = fs.open(fileSplit.getPath());
            fsin.seek(start);

        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (fsin.getPos() < end) {
                if (readUntilMatch(startTag, false)) {
                    try {
                        buffer.write(startTag);
                        if (readUntilMatch(endTag, true)) {

                            value.set(buffer.getData(), 0, buffer.getLength());
                            key.set(fsin.getPos());
                            return true;
                        }
                    } finally {
                        buffer.reset();
                    }
                }
            }
            return false;
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,
                InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;

        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return (fsin.getPos() - start) / (float) (end - start);
        }

        @Override
        public void close() throws IOException {
            fsin.close();
        }

        private boolean readUntilMatch(byte[] match, boolean withinBlock)
                throws IOException {
            int i = 0;
            while (true) {
                int b = fsin.read();

                if (b == -1)
                    return false;

                if (withinBlock)
                    buffer.write(b);

                if (b == match[i]) {
                    i++;
                    if (i >= match.length)
                        return true;
                } else
                    i = 0;

                if (!withinBlock && i == 0 && fsin.getPos() >= end)
                    return false;
            }
        }

    }

}


PIG Script

bookmarkanalysis.pig

Input_File = LOAD '/BookMarkOutput/' using PigStorage(',') as
(book_id:chararray,author:chararray,Title:chararray,genre:chararray,price:float,publish_date:chararray,
description:chararray,review:chararray,rate:float,comments:chararray); 
orderedfile1  = order Input_File by genre;  
split orderedfile1 into computer_file if genre == 'Computer',
                       database_file if genre == 'Database';
store computer_file into '/BookMarkOutput/Type_Computer/';
Store database_file into '/BookMarkOutput/Type_Database/';
orderedfile2 = order Input_File by rate desc;
split orderedfile2 into rate5plus if rate >= 5 and rate <= 10,
                                                rate5minus if rate >= 1 and rate < 5;
store rate5plus into '/BookMarkOutput/Rating5+/';
store rate5minus into '/BookMarkOutput/Rating5-/';


Shell Script
#####################################################################
#############################  COMPLETE SCRIPT   ###################
### HEADER - PROGRAM NAME - <bookmarkanalysis.sh>                              ###
### AUTHOR - BHAVESH BHADRICHA                                                           ###
### DATE  - 12/NOV/2015                                                                                      ###
### VERSION - 1.0                                                                                                  ###
### DESCRIPTION - Data: It comprises of the information gathered from sites     ###
### which are bookmarking sites and allow you to bookmark, review, rate, on        ###
### specific topic. A bookmarking site allows you to bookmark, review, rate,         ###
### search various links on any topic. The data is in XML format and                     ###
### contains various links/posts URL, categories defining it and the ratings             ###
### linked with it.                                                                                                       ###
### Problem Statement: Analyse the data in Hadoop Eco-system to:                         ###
### 1. Fetch the data into Hadoop Distributed File System and analyze it                 ###
### with the help of MapReduce, Pig and Hive to find the top rated books              ###
### based on the user comments, likes etc.                                                                ###
### 2. Using MapReduce convert the semi-structured format (XML data) into        ###
### structured format and categorize the user rating as positive and                          ###
### negative for each of the thousand links.                                                              ###
### 3. Push the output HDFS and then feed it into PIG, which splits the                 ###
### data into two parts: Category data and Ratings data.                                           ###
### 4. Write a fancy Hive Query to analyze the data further and push the                 ###
### output is into relational database (RDBMS) using Sqoop.                                  ###
###############################################################################
###############################################################################
##################################
###DEFINING THE LOCAL VARIABLES###
##################################
DATE=$(date +"%Y%m%d_%H%M%S")
LOGFILE="/home/bhavesh/Bookmark_POC/LOG/"$DATE".log"
##################################################################################
############## Converting XML to Flatfile using Mapreduce ########################
##################################################################################
echo "Mapreduce Program starts here"

echo "Converting XML to Flatfile using Mapreduce" >> $LOGFILE

hadoop fs -rmr /BookMarkOutput

hadoop jar /home/bhavesh/Bookmark_POC/Mapreduce/XMLProcessing.jar XMLDriver /BookMarkInput/* /BookMarkOutput

if [ $? -eq 0 ]; then
    echo "Succesfully finished Mapreduce Processing " >> $LOGFILE
else
    echo "XMLProcessing MapReduce Failed Please check the Log " >> $LOGFILE
fi

########################## PIG Processing ########################################
#### PIG, which splits the data into two parts: Category data and Ratings data ###
##################################################################################

echo "Pig Script starts here"

echo "PIG Script,which splits the data into two parts: Category data and Ratings data" >> $LOGFILE

hadoop fs -rmr /BookMarkOutput/Type_Computer
hadoop fs -rmr /BookMarkOutput/Type_Database
hadoop fs -rmr /BookMarkOutput/Rating5+
hadoop fs -rmr /BookMarkOutput/Rating5-

pig /home/bhavesh/Bookmark_POC/PIG/bookmarkanalysis.pig

if [ $? -eq 0 ]; then
    echo "Succesfully finished PIG  Processing " >> $LOGFILE
else
    echo "PIG Processing Failed Please check the Log " >> $LOGFILE
fi


############################ HIVE Processing #######################################
###### HIVE will load the Category data and Rating Data into Hive Tables  ##########
####################################################################################

echo "HIVE Script starts here"

echo "HIVE which LOAD the data into two parts: Category data Tables and Ratings data Table " >> $LOGFILE

hive -e 'drop table if exists ComputerBooks';
hive -e 'drop table if exists DatabaseBooks';
hive -e 'drop table if exists Highest_Rating';
hive -e 'drop table if exists Lowest_Rating';

hive -e "create external table ComputerBooks
(Bookid string,
author string,
title string,
genre string,
price float,
publish_date string,
descriptions string,
review string,
rate float,
comments int)
row format delimited
fields terminated by','
lines terminated by '\n'
stored as textfile location '/BookMarkOutput/hive/Computerbooks'";

hive -e "create external table DatabaseBooks
(Bookid string,
author string,
title string,
genre string,
price float,
publish_date string,
descriptions string,
review string,
rate float,
comments int)
row format delimited
fields terminated by','
lines terminated by '\n'
stored as textfile location '/BookMarkOutput/hive/Databasebooks'";

hive -e "create external table Highest_Rating
(Bookid string,
author string,
title string,
genre string,
price float,
publish_date string,
descriptions string,
review string,
rate float,
comments int)
row format delimited
fields terminated by','
lines terminated by '\n'
stored as textfile location '/BookMarkOutput/hive/HighestRating'";

hive -e "create external table Lowest_Rating
(Bookid string,
author string,
title string,
genre string,
price float,
publish_date string,
descriptions string,
review string,
rate float,
comments int)
row format delimited
fields terminated by','
lines terminated by '\n'
stored as textfile location '/BookMarkOutput/hive/LowestRating'";

hive -e "load data inpath '/BookMarkOutput/Type_Computer/part-r-00000' overwrite into table ComputerBooks";
hive -e "load data inpath '/BookMarkOutput/Type_Database/part-r-00000' overwrite into table DatabaseBooks";
hive -e "load data inpath '/BookMarkOutput/Rating5+/part-r-00000' overwrite into table Highest_Rating";
hive -e "load data inpath '/BookMarkOutput/Rating5-/part-r-00000' overwrite into table Lowest_Rating";

############################ SQOOP Processing #######################################
###### Pushing the HIVE Tale data into RDBMS Tables via SQOOP #######################
#####################################################################################

sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table ComputerBooks --export-dir /BookMarkOutput/hive/Computerbooks/part-r-00000 --input-fields-terminated-by '\t';

sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table Databasebooks --export-dir /BookMarkOutput/hive/Databasebooks/part-r-00000 --input-fields-terminated-by '\t';

sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table HighestRating --export-dir /BookMarkOutput/hive/HighestRating/part-r-00000 --input-fields-terminated-by '\t';

sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table LowestRating --export-dir /BookMarkOutput/hive/LowestRating/part-r-00000 --input-fields-terminated-by '\t';

####################################################################################

Execution of Shell Script


MapReduce Output (XML Converted to Comma Separated file)


PIG Script Execution


Pig script generate 4 Output files


HIVE Execution


HIVE Output





Sqoop Execution


Sqoop Output in RDBMS Tables





3 comments:

  1. please send me the dataset url for this project

    ReplyDelete
  2. Hi... Please let me know how to get the data from social bookmarking site...

    ReplyDelete
  3. could you please send me the dataset link, where to find or mail me drive link if any at gopinath.678@gmail.com.
    pleaseeee

    ReplyDelete