Industry: Social Media
Data: It comprises of the information gathered from sites
which are bookmarking sites and allow you to bookmark, review, rate, on specific
topic. A bookmarking site allows you to bookmark, review, rate, search various
links on any topic. The data is in XML format and contains various categories defining it and the ratings linked with it.
Problem Statement: Analyse the data in Hadoop Eco-system to:
1. Fetch the data into Hadoop Distributed File System and
analyze it with the help of MapReduce, Pig and Hive to find the top rated links
based on the user comments, likes etc.
2. Using MapReduce convert the semi-structured format (XML
data) into structured
3. Push the (MapReduce) output HDFS and then feed it into
PIG, which splits the data into two parts: Category data and Ratings data.
4. Write a fancy Hive Query to analyze the data further and
push the output is into relational database (RDBMS) using Sqoop.
POC Coding Details
Input Data (XML Files)
MapReduce Code to convert XML File to Flat File
or Comma Separated File.
MyMapper.java
import
java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import
javax.xml.parsers.DocumentBuilder;
import
javax.xml.parsers.DocumentBuilderFactory;
import
org.apache.commons.logging.Log;
import
org.apache.commons.logging.LogFactory;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import
org.w3c.dom.NodeList;
public class MyMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
private static final Log LOG = LogFactory.getLog(MyMapper.class);
//
Fprivate Text videoName = new Text();
public void map(LongWritable
key, Text value, Context context) throws IOException, InterruptedException {
try {
InputStream is = new
ByteArrayInputStream(value.toString().getBytes());
DocumentBuilderFactory dbFactory =
DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(is);
doc.getDocumentElement().normalize();
NodeList nList =
doc.getElementsByTagName("book");
for (int temp = 0; temp <
nList.getLength(); temp++) {
Node nNode = nList.item(temp);
if (nNode.getNodeType()
== Node.ELEMENT_NODE) {
Element eElement =
(Element) nNode;
String id =
eElement.getElementsByTagName("id").item(0).getTextContent();
String author =
eElement.getElementsByTagName("author").item(0).getTextContent();
String title =
eElement.getElementsByTagName("title").item(0).getTextContent();
String genre =
eElement.getElementsByTagName("genre").item(0).getTextContent();
String price =
eElement.getElementsByTagName("price").item(0).getTextContent();
String publish_date =
eElement.getElementsByTagName("publish_date").item(0).getTextContent();
String descriptions =
eElement.getElementsByTagName("descriptions").item(0).getTextContent();
String review =
eElement.getElementsByTagName("review").item(0).getTextContent();
String rate =
eElement.getElementsByTagName("rate").item(0).getTextContent();
String comments =
eElement.getElementsByTagName("comments").item(0).getTextContent();
context.write(new Text(id + "," + author + "," + title + "," + genre + "," + price + "," + publish_date + "," + descriptions + "," + review + "," + rate + "," + comments),
NullWritable.get());
}
}
} catch (Exception e) {
throw new IOException(e);
}
}
}
XMLDriver.java
import java.io.IOException;
import javax.xml.stream.XMLInputFactory;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import
org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.*;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.*;
import
org.apache.hadoop.util.GenericOptionsParser;
public class XMLDriver {
/**
Bhavesh - for processing XML file using Hadoop MapReduce
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
try {
Configuration conf = new Configuration();
String[] arg = new
GenericOptionsParser(conf,args).getRemainingArgs();
conf.set("START_TAG_KEY", "<book>");
conf.set("END_TAG_KEY", "</book>");
Job job = new Job(conf, "XML Processing");
job.setJarByClass(XMLDriver.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(XMLInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job,
new Path(args[0]));
FileOutputFormat.setOutputPath(job,
new Path(args[1]));
job.waitForCompletion(true);
} catch (Exception e)
{
throw new IOException(e);
}
}
}
XMLInputFormat.java
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.BlockLocation;
import
org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import
org.apache.hadoop.mapreduce.RecordReader;
import
org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.hadoop.mapreduce.lib.input.FileSplit;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class XMLInputFormat extends TextInputFormat {
public static final String START_TAG_KEY = "<book>";
public static final String END_TAG_KEY = "</book>";
/*Bhavesh
- Creating XMLInputformat Class for reading XML File*/
@Override
public
RecordReader<LongWritable, Text> createRecordReader(
InputSplit split,
TaskAttemptContext context) {
return new XmlRecordReader();
}
public static class XmlRecordReader extends
RecordReader<LongWritable,
Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void
initialize(InputSplit is, TaskAttemptContext tac)
throws IOException,
InterruptedException {
FileSplit fileSplit = (FileSplit)
is;
String START_TAG_KEY = "<book>";
String END_TAG_KEY = "</book>";
startTag = START_TAG_KEY.getBytes("utf-8");
endTag = END_TAG_KEY.getBytes("utf-8");
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs =
file.getFileSystem(tac.getConfiguration());
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException,
InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
value.set(buffer.getData(), 0, buffer.getLength());
key.set(fsin.getPos());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public LongWritable
getCurrentKey() throws IOException,
InterruptedException {
return key;
}
@Override
public Text
getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException
{
return (fsin.getPos() - start) / (float) (end - start);
}
@Override
public void close() throws IOException {
fsin.close();
}
private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
if (b == -1)
return false;
if (withinBlock)
buffer.write(b);
if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;
if (!withinBlock
&& i == 0 && fsin.getPos() >= end)
return false;
}
}
}
}
PIG Script
bookmarkanalysis.pig
Input_File = LOAD
'/BookMarkOutput/' using PigStorage(',') as
(book_id:chararray,author:chararray,Title:chararray,genre:chararray,price:float,publish_date:chararray,
description:chararray,review:chararray,rate:float,comments:chararray);
orderedfile1 = order Input_File by genre;
split orderedfile1 into
computer_file if genre == 'Computer',
database_file if genre
== 'Database';
store computer_file into
'/BookMarkOutput/Type_Computer/';
Store database_file into
'/BookMarkOutput/Type_Database/';
orderedfile2 = order Input_File
by rate desc;
split orderedfile2 into
rate5plus if rate >= 5 and rate <= 10,
rate5minus
if rate >= 1 and rate < 5;
store rate5plus into '/BookMarkOutput/Rating5+/';
store rate5minus into
'/BookMarkOutput/Rating5-/';
Shell Script
#####################################################################
############################# COMPLETE SCRIPT ###################
### HEADER -
PROGRAM NAME - <bookmarkanalysis.sh> ###
### AUTHOR -
BHAVESH BHADRICHA ###
### DATE - 12/NOV/2015 ###
### VERSION -
1.0 ###
###
DESCRIPTION - Data: It comprises of the information gathered from sites ###
### which are
bookmarking sites and allow you to bookmark, review, rate, on ###
### specific
topic. A bookmarking site allows you to bookmark, review, rate, ###
### search various
links on any topic. The data is in XML format and ###
### contains
various links/posts URL, categories defining it and the ratings ###
### linked
with it. ###
### Problem
Statement: Analyse the data in Hadoop Eco-system to: ###
### 1. Fetch
the data into Hadoop Distributed File System and analyze it ###
### with the
help of MapReduce, Pig and Hive to find the top rated books ###
### based on
the user comments, likes etc. ###
### 2. Using
MapReduce convert the semi-structured format (XML data) into ###
###
structured format and categorize the user rating as positive and ###
### negative
for each of the thousand links. ###
### 3. Push
the output HDFS and then feed it into PIG, which splits the ###
### data into
two parts: Category data and Ratings data. ###
### 4. Write
a fancy Hive Query to analyze the data further and push the ###
### output is
into relational database (RDBMS) using Sqoop. ###
###############################################################################
###############################################################################
##################################
###DEFINING
THE LOCAL VARIABLES###
##################################
DATE=$(date
+"%Y%m%d_%H%M%S")
LOGFILE="/home/bhavesh/Bookmark_POC/LOG/"$DATE".log"
##################################################################################
##############
Converting XML to Flatfile using Mapreduce ########################
##################################################################################
echo
"Mapreduce Program starts here"
echo
"Converting XML to Flatfile using Mapreduce" >> $LOGFILE
hadoop fs
-rmr /BookMarkOutput
hadoop jar
/home/bhavesh/Bookmark_POC/Mapreduce/XMLProcessing.jar XMLDriver
/BookMarkInput/* /BookMarkOutput
if [ $? -eq 0
]; then
echo "Succesfully finished Mapreduce
Processing " >> $LOGFILE
else
echo "XMLProcessing MapReduce Failed
Please check the Log " >> $LOGFILE
fi
##########################
PIG Processing ########################################
#### PIG,
which splits the data into two parts: Category data and Ratings data ###
##################################################################################
echo
"Pig Script starts here"
echo
"PIG Script,which splits the data into two parts: Category data and
Ratings data" >> $LOGFILE
hadoop fs
-rmr /BookMarkOutput/Type_Computer
hadoop fs
-rmr /BookMarkOutput/Type_Database
hadoop fs
-rmr /BookMarkOutput/Rating5+
hadoop fs
-rmr /BookMarkOutput/Rating5-
pig
/home/bhavesh/Bookmark_POC/PIG/bookmarkanalysis.pig
if [ $? -eq 0
]; then
echo "Succesfully finished PIG Processing " >> $LOGFILE
else
echo "PIG Processing Failed Please
check the Log " >> $LOGFILE
fi
############################
HIVE Processing #######################################
###### HIVE
will load the Category data and Rating Data into Hive Tables ##########
####################################################################################
echo
"HIVE Script starts here"
echo
"HIVE which LOAD the data into two parts: Category data Tables and Ratings
data Table " >> $LOGFILE
hive -e 'drop
table if exists ComputerBooks';
hive -e 'drop
table if exists DatabaseBooks';
hive -e 'drop
table if exists Highest_Rating';
hive -e 'drop
table if exists Lowest_Rating';
hive -e
"create external table ComputerBooks
(Bookid
string,
author
string,
title string,
genre string,
price float,
publish_date
string,
descriptions
string,
review
string,
rate float,
comments int)
row format
delimited
fields
terminated by','
lines terminated
by '\n'
stored as
textfile location '/BookMarkOutput/hive/Computerbooks'";
hive -e
"create external table DatabaseBooks
(Bookid
string,
author
string,
title string,
genre string,
price float,
publish_date
string,
descriptions
string,
review
string,
rate float,
comments int)
row format
delimited
fields
terminated by','
lines
terminated by '\n'
stored as
textfile location '/BookMarkOutput/hive/Databasebooks'";
hive -e
"create external table Highest_Rating
(Bookid
string,
author
string,
title string,
genre string,
price float,
publish_date
string,
descriptions
string,
review
string,
rate float,
comments int)
row format
delimited
fields
terminated by','
lines
terminated by '\n'
stored as
textfile location '/BookMarkOutput/hive/HighestRating'";
hive -e
"create external table Lowest_Rating
(Bookid
string,
author
string,
title string,
genre string,
price float,
publish_date
string,
descriptions
string,
review
string,
rate float,
comments int)
row format
delimited
fields
terminated by','
lines
terminated by '\n'
stored as
textfile location '/BookMarkOutput/hive/LowestRating'";
hive -e
"load data inpath '/BookMarkOutput/Type_Computer/part-r-00000' overwrite
into table ComputerBooks";
hive -e
"load data inpath '/BookMarkOutput/Type_Database/part-r-00000' overwrite
into table DatabaseBooks";
hive -e
"load data inpath '/BookMarkOutput/Rating5+/part-r-00000' overwrite into
table Highest_Rating";
hive -e
"load data inpath '/BookMarkOutput/Rating5-/part-r-00000' overwrite into
table Lowest_Rating";
############################
SQOOP Processing #######################################
######
Pushing the HIVE Tale data into RDBMS Tables via SQOOP #######################
#####################################################################################
sqoop export
--connect jdbc:mysql://localhost/mysql --username root --password root --table
ComputerBooks --export-dir /BookMarkOutput/hive/Computerbooks/part-r-00000
--input-fields-terminated-by '\t';
sqoop export
--connect jdbc:mysql://localhost/mysql --username root --password root --table
Databasebooks --export-dir /BookMarkOutput/hive/Databasebooks/part-r-00000
--input-fields-terminated-by '\t';
sqoop export
--connect jdbc:mysql://localhost/mysql --username root --password root --table
HighestRating --export-dir /BookMarkOutput/hive/HighestRating/part-r-00000
--input-fields-terminated-by '\t';
sqoop export
--connect jdbc:mysql://localhost/mysql --username root --password root --table
LowestRating --export-dir /BookMarkOutput/hive/LowestRating/part-r-00000
--input-fields-terminated-by '\t';
####################################################################################
Execution of Shell Script
MapReduce Output (XML Converted to Comma Separated file)
PIG
Script Execution
Pig script generate 4 Output files
HIVE Execution
HIVE Output
Sqoop
Execution
Sqoop Output in RDBMS Tables
please send me the dataset url for this project
ReplyDeleteHi... Please let me know how to get the data from social bookmarking site...
ReplyDeletecould you please send me the dataset link, where to find or mail me drive link if any at gopinath.678@gmail.com.
ReplyDeletepleaseeee