Industry: Financial
Data:
Input Format - .PDF (Our Input Data is in PDF Format)
Like this below created 3000 records on my own
Input Dataset with attributes like:
Column
|
Description
|
1
|
SENSEXID
|
2
|
SENSEXNAME
|
3
|
TYPEOFTRADING
|
4
|
SENSEXLOC
|
5
|
OPEN_BALANCE
|
6
|
CLOSING_BAL
|
7
|
FLTUATION_RATE
|
Problem Statement: Analyse the data in
Hadoop Eco-system to:
1.
Take the complete
PDF Input data on HDFS
2.
Develop a Map
Reduce Use Case to get the below filtered results from the HDFS Input
data(Excel data)
If TYPE OF TRADING is -->'SIP'
- OPEN_BALANCE > 25000 &
FLTUATION_RATE > 10 --> store "HighDemandMarket"
-CLOSING_BALANCE<22000 &
FLTUATION_RATE IN BETWEEN 20 - 30 -->
store "OnGoingMarketStretegy"
If TYPE OF TRADING is -->'SHORTTERM
- OPEN_BALANCE < 5000 --> store
"WealthyProducts"
- SensexLoc --> "NewYork OR
Mumbai" --> “ReliableProducts
else
store in "OtherProducts"
NOTE: In the mentioned file names only 5
outputs have to be generated
3. Develop a PIG Script to filter the Map
Reduce Output in the below fashion
- Provide the Unique data
- Sort the Unique data based on
SensexID.
4. EXPORT the same PIG Output from HDFS to
MySQL using SQOOP
5. Store the same PIG Output in a HIVE
External Table
POC Coding Details
MapReduce Code
PdfInputDriver.java
package com.bhavesh.poc.sensex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
public class PdfInputDriver {
public
static void main(String[] args) throws IOException,
InterruptedException,
ClassNotFoundException {
Configuration
conf = new Configuration();
//GenericOptionsParser
is a utility to parse command line arguments generic to the Hadoop framework
GenericOptionsParser
parser = new GenericOptionsParser(conf, args);
//Returns
an array of Strings containing only application-specific arguments
args
= parser.getRemainingArgs();
Job
job = new Job(conf, "PdfSensexDetails");
job.setJarByClass(PdfInputDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//
Custom InputFormat class
job.setInputFormatClass(PdfInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job,
TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job,
"text", TextOutputFormat.class,IntWritable.class, Text.class);
FileInputFormat.setInputPaths(job,
new Path(args[0]));
FileOutputFormat.setOutputPath(job,
new Path(args[1]));
job.setMapperClass(SensexTradeMapper.class);
job.setReducerClass(SensexTradeReducer.class);
System.out.println(job.waitForCompletion(true));
}
}
PdfInputFormat.java
package com.bhavesh.poc.sensex;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.InputSplit;
import
org.apache.hadoop.mapreduce.RecordReader;
import
org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class PdfInputFormat extends
FileInputFormat {
@Override
public
RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException,
InterruptedException
{
return
new PdfRecordReader();
}
}
PdfRecordReader.java
package com.bhavesh.poc.sensex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.InputSplit;
import
org.apache.hadoop.mapreduce.RecordReader;
import
org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;
public class PdfRecordReader extends
RecordReader{
private
String[] lines = null;
private
LongWritable key = null;
private
Text value = null;
@Override
public
void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws
IOException, InterruptedException {
FileSplit
split = (FileSplit) genericSplit;
Configuration
job = context.getConfiguration();
final
Path file = split.getPath();
/*
* The below code contains the logic for
opening the file and seek to
* the start of the split. Here we are applying
the Pdf Parsing logic
*/
FileSystem
fs = file.getFileSystem(job);
FSDataInputStream
fileIn = fs.open(split.getPath());
PDDocument
pdf = null;
String
parsedText = null;
PDFTextStripper
stripper; // Here we are
coverting PDF to textinputformate
pdf
= PDDocument.load(fileIn);
stripper
= new PDFTextStripper();
parsedText
= stripper.getText(pdf);
this.lines
= parsedText.split("\n");
}
@Override
public
boolean nextKeyValue() throws IOException, InterruptedException {
if
(key == null) {
key
= new LongWritable();
key.set(1);
value
= new Text();
value.set(lines[0]);
}
else {
int
temp = (int) key.get();
if
(temp < (lines.length - 1)) {
int
count = (int) key.get();
value
= new Text();
value.set(lines[count]);
count
= count + 1;
key
= new LongWritable(count);
}
else {
return
false;
}
}
if
(key == null || value == null) {
return
false;
}
else {
return
true;
}
}
@Override
public
LongWritable getCurrentKey() throws IOException,
InterruptedException
{
return
key;
}
@Override
public
Text getCurrentValue() throws IOException, InterruptedException {
return
value;
}
@Override
public
float getProgress() throws IOException, InterruptedException {
return
0;
}
@Override
public
void close() throws IOException {
}
}
SensexTradeMapper.java
package com.bhavesh.poc.sensex;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SensexTradeMapper extends
Mapper<LongWritable, Text, Text, Text> {
private
Text word = new Text();
protected
void map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
String
TradeType;
String
Location;
int
Open_bal;
int
Close_bal;
int
Fluct_rate;
String
str[] = value.toString().split(",");
TradeType
=str[2].toString();
Location
= str[3].toString();
String
Opbal = str[4];
Open_bal
= Integer.parseInt(Opbal);
String
Clbal = str[5];
Close_bal
= Integer.parseInt(Clbal);
String
fcrate = str[6].trim();
Fluct_rate
= Integer.parseInt(fcrate);
if
(TradeType.equals(new String("SIP")))
{
if
(Open_bal > 20000 && Fluct_rate <= 15)
{
word.set(new
String("HighDemandMarket")); //word
== key
}
else
if(Close_bal > 30000 && Fluct_rate > 15 && Fluct_rate
< 25)
{
word.set(new
String("OnGoingMarketStretegy"));
}
}
else
if(TradeType.equals(new String("SHORTTERM")))
{
if
(Open_bal > 5000 && Open_bal < 30000)
{
word.set(new
String("WealthyProducts"));
}
else
if((Location.equals(new String("NEWYORK"))) ||
(Location.equals(new
String("MUMBAI"))))
{
word.set(new
String("ReliableProducts"));
}
}
else
{
word.set(new
String("OtherProducts"));
}
String
rec = str[0]+"\t"+str[1]+"\t"+str[2]+"\t"+str[3]+"\t"+str[4]+"\t"+str[5]+"\t"+str[6];
//context.progress();
context.write(word,
new Text(rec));
}
}
SensexTradeReducer.java
package com.poc.ssx;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class SensexTradeReducer extends
Reducer<Text,
Text, IntWritable, Text> {
private
MultipleOutputs<IntWritable, Text> multipleOutputs;
protected
void reduce(Text key, Iterable<Text> values,
Context
context) throws IOException, InterruptedException {
String
str1 = null;
int
ssxid = 0;
for
(Text value : values) {
String
str[] = value.toString().split("\t");
String
sxid = str[0].trim();
ssxid
= Integer.parseInt(sxid);
str1
=
str[1]+"\t"+str[2]+"\t"+str[3]+"\t"+str[4]+"\t"+str[5]+"\t"+str[6];
//context.write(new
IntWritable(ssxid), new Text(ssxname+","+TradeType+","+Location+","+Open_bal+","+Close_bal+","+Fluct_rate));
multipleOutputs.write(new
IntWritable(ssxid),
new
Text(str1),
generateFileName(key));
}
}
String
generateFileName(Text key){
return
key.toString();
}
@Override
public
void setup(Context context){
multipleOutputs
= new MultipleOutputs<IntWritable, Text>(context);
}
@Override
public
void cleanup(final Context context) throws IOException, InterruptedException{
multipleOutputs.close();
}
}
PIG Script
SENSEX.pig
A = LOAD '/hdfs/bhavesh/SENSEX/OUTPUT/HighDemandMarket-r-00000'
using PigStorage('\t') as
(Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disHM = DISTINCT A;
orHM = ORDER disHM by Sid;
STORE orHM into '/hdfs/bhavesh/SENSEX/HM' using
PigStorage(',');
A = LOAD
'/hdfs/bhavesh/SENSEX/OUTPUT/ReliableProducts-r-00000' using PigStorage('\t')
as
(Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disRP = DISTINCT A;
orRP = ORDER disRP by Sid;
STORE orRP into '/hdfs/bhavesh/SENSEX/RP' using
PigStorage(',');
A = LOAD '/hdfs/bhavesh/SENSEX/OUTPUT/OtherProducts-r-00000'
using PigStorage('\t') as
(Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disOP = DISTINCT A;
orOP = ORDER disOP by Sid;
STORE orOP into '/hdfs/bhavesh/SENSEX/OP' using
PigStorage(',');
A = LOAD
'/hdfs/bhavesh/SENSEX/OUTPUT/WealthyProducts-r-00000' using PigStorage('\t') as
(Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disWP= DISTINCT A;
orWP = ORDER disWP by Sid;
STORE orWP into '/hdfs/bhavesh/SENSEX/WP' using
PigStorage(',');
A = LOAD
'/hdfs/bhavesh/SENSEX/OUTPUT/OnGoingMarketStretegy-r-00000' using
PigStorage('\t') as (Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disOMS = DISTINCT A;
orOMS = ORDER disOMS by Sid;
STORE orOMS into '/hdfs/bhavesh/SENSEX/OMS' using
PigStorage(',');
Shell Script
SENSEX.sh
###############################################################################
############################# COMPLETE SCRIPT ##############################
### HEADER - PROGRAM NAME -
<SENSEX.sh> ###
### AUTHOR - BHAVESH BHADRICHA ###
### DATE
- 27/DEC/2015 ###
### VERSION - 1.0 ###
### DESCRIPTION - Data: Sensex Log Data
Processing ###
### (PDF File Processing in Map Reduce) ###
###############################################################################
###############################################################################
##################################
###DEFINING THE LOCAL VARIABLES###
##################################
DATE=$(date +"%Y%m%d_%H%M%S")
LOGFILE="/home/bhavesh/POC/SENSEX/LOG/"$DATE".log"
####### Removing if any existent directories
##################################
hadoop fs -rmr /hdfs/bhavesh/SENSEX/RP
hadoop fs -rmr /hdfs/bhavesh/SENSEX/WP
hadoop fs -rmr /hdfs/bhavesh/SENSEX/OP
hadoop fs -rmr /hdfs/bhavesh/SENSEX/OMS
hadoop fs -rmr /hdfs/bhavesh/SENSEX/HM
##################################################################################
############## PDF File Processing USING Map
Reduce ##############################
##################################################################################
echo "Mapreduce Program starts
here"
echo "PDF File Processing in Map Reduce
Started" >> $LOGFILE
hadoop fs -rmr /hdfs/bhavesh/SENSEX/OUTPUT
hadoop jar
/home/bhavesh/POC/SENSEX/Mapreduce/SENSEX.jar
com.bhavesh.poc.sensex.PdfInputDriver /hdfs/bhavesh/SENSEX/INPUT/sensexinputfile.pdf
/hdfs/bhavesh/SENSEX/OUTPUT
if [ $? -eq 0 ]; then
echo "Succesfully finished Mapreduce Processing " >>
$LOGFILE
else
echo "SENSEX MapReduce Failed Please check the Log " >>
$LOGFILE
fi
#################################################################################
############### PIG Processing for SEXSEX
DATA #################################
#################################################################################
echo "SENSEX Pig Processing started
"
echo "SENSEX PIG Processing
Started" >> $LOGFILE
pig -f
/home/bhavesh/POC/SENSEX/PIG/SENSEX.pig
if [ $? -eq 0 ]; then
echo "PIG Succesfully finished SENSEX Processing " >>
$LOGFILE
else
echo "PIG SENSEX Processing Failed Please check the Log "
>> $LOGFILE
fi
################################################################################
############# IMPORTING DATA in SQOOP
##########################################
################################################################################
echo "Importing the data to MYSQL using SQOOP ";
echo "Importing the data to MYSQL "
>> $LOGFILE
##### Creating the tables in MySql
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query
"create database if not exists
SENSEX;";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query "use
SENSEX;";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query "grant
all privileges on SENSEX.* to '%'@'localhost'";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query "grant
all privileges on SENSEX.* to ''@'localhost'";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query "drop
table if exists HighDemandMarket";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query "drop
table if exists WealthyProducts";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query "drop
table if exists OngoingMarketSt";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query "drop
table if exists ReliableProducts";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query "drop
table if exists OtherProducts";
echo " MYSQL table creation"
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query
"create table HighDemandMarket (Sid int,Sname varchar(30),TType
varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query
"create table WealthyProducts(Sid int,Sname varchar(30),TType
varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query
"create table OngoingMarketSt(Sid int,Sname varchar(30),TType
varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query
"create table ReliableProducts(Sid int,Sname varchar(30),TType
varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";
sqoop eval --connect
jdbc:mysql://localhost/SENSEX -username root -password root --query
"create table OtherProducts(Sid int,Sname varchar(30),TType
varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";
echo "data exporting";
#### exporting the data into MYSQL
sqoop export --connect
jdbc:mysql://localhost/SENSEX -username root -password root --table
HighDemandMarket --export-dir /hdfs/bhavesh/SENSEX/HM/part-r-00000
--fields-terminated-by ',';
sqoop export --connect
jdbc:mysql://localhost/SENSEX -username root -password root --table
WealthyProducts --export-dir /hdfs/bhavesh/SENSEX/WP/part-r-00000
--fields-terminated-by ',';
sqoop export --connect
jdbc:mysql://localhost/SENSEX -username root -password root --table
OngoingMarketSt --export-dir /hdfs/bhavesh/SENSEX/OMS/part-r-00000
--fields-terminated-by ',';
sqoop export --connect
jdbc:mysql://localhost/SENSEX -username root -password root --table
ReliableProducts --export-dir /hdfs/bhavesh/SENSEX/RP/part-r-00000
--fields-terminated-by ',';
sqoop export --connect
jdbc:mysql://localhost/SENSEX -username root -password root --table
OtherProducts --export-dir /hdfs/bhavesh/SENSEX/OP/part-r-00000 --fields-terminated-by ',';
if[$? -eq 0]
echo "exporting of data to MYSQL is
done";
echo "exporting of data to MYSQL is
done" >> $LOGFILE
echo "creation of hive tables
started";
echo "creation of hive tables started
" >> $LOGFILE
hive -f
/home/bhavesh/POC/SENSEX/HIVE/SENSEX.hql
echo "Hive process is done";
echo "HIVE PROCESSING is done"
>> $LOGFILE
exit;
HIVE
SENSEX.hql
use SENSEX;
Drop table HDM;
Drop table WP;
Drop table RP;
Drop table OP;
Drop table OMS;
create table HDM(Sid int,Sname
string,TTrading string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;
load data inpath
'/hdfs/bhavesh/SENSEX/HM/part-r-00000' into table HDM;
create table WP(Sid int,Sname string,TTrading
string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;
load data inpath
'/hdfs/bhavesh/SENSEX/WP/part-r-00000' into table WP;
create table RP(Sid int,Sname string,TTrading
string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;
load data inpath
'/hdfs/bhavesh/SENSEX/RP/part-r-00000' into table RP;
create table OP(Sid int,Sname string,TTrading
string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;
load data inpath
'/hdfs/bhavesh/SENSEX/OP/part-r-00000' into table OP;
create table OMS(Sid int,Sname
string,TTrading string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;
load data inpath
'/hdfs/bhavesh/SENSEX/OMS/part-r-00000' into table OMS;
Exection
nice work dude
ReplyDeleteGood
ReplyDeleteHi, good & thank you for the nice work, can you also please share the pdf document that you have used for this sample, I would like to try it out please.
ReplyDeleteHi Bhavesh,
ReplyDeleteCan you please share the document u used as the raw file to test this
Great....
ReplyDeleteCan you please provide me with JAR files of PDDocument?
ReplyDeleteplz upload the input file.or give a link so we can download that input file.
ReplyDeleteplease provide input file
ReplyDeleteHeyy Bhavesh,
ReplyDeleteCan you please share the row documents you used as the raw file.
please send me your dataset
ReplyDelete