Breaking

Monday, 28 December 2015

POC #: Sensex Log Data Processing (PDF File Processing in Map Reduce)

Industry: Financial
Data: Input Format - .PDF (Our Input Data is in PDF Format)
Like this below created 3000 records on my own 




















Input Dataset with attributes like:

Column
Description
1
SENSEXID
2
SENSEXNAME
3
TYPEOFTRADING
4
SENSEXLOC
5
OPEN_BALANCE
6
CLOSING_BAL
7
FLTUATION_RATE


Problem Statement: Analyse the data in Hadoop Eco-system to:

1.       Take the complete PDF Input data on HDFS

2.       Develop a Map Reduce Use Case to get the below filtered results from the HDFS Input       
data(Excel data)

     If TYPE OF TRADING is -->'SIP'

          - OPEN_BALANCE > 25000 & FLTUATION_RATE > 10  --> store "HighDemandMarket"
          -CLOSING_BALANCE<22000 & FLTUATION_RATE IN BETWEEN 20 - 30  --> store "OnGoingMarketStretegy"
              
     If TYPE OF TRADING is -->'SHORTTERM
   
          - OPEN_BALANCE < 5000 --> store "WealthyProducts"
          - SensexLoc --> "NewYork OR Mumbai"  --> “ReliableProducts
    else
          store in "OtherProducts"

  NOTE: In the mentioned file names only 5 outputs have to be generated

  3. Develop a PIG Script to filter the Map Reduce Output in the below fashion

       - Provide the Unique data

       - Sort the Unique data based on SensexID.

  4. EXPORT the same PIG Output from HDFS to MySQL using SQOOP

  5. Store the same PIG Output in a HIVE External Table

POC Coding Details


MapReduce Code

PdfInputDriver.java

package com.bhavesh.poc.sensex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PdfInputDriver {

            public static void main(String[] args) throws IOException,
                                    InterruptedException, ClassNotFoundException {
                        Configuration conf = new Configuration();
                        //GenericOptionsParser is a utility to parse command line arguments generic to the Hadoop framework
                        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
                       
                        //Returns an array of Strings containing only application-specific arguments
                        args = parser.getRemainingArgs();
                       
                        Job job = new Job(conf, "PdfSensexDetails");
                        job.setJarByClass(PdfInputDriver.class);
                        job.setOutputKeyClass(Text.class);
                       
                        job.setOutputValueClass(Text.class);
                        // Custom InputFormat class
                        job.setInputFormatClass(PdfInputFormat.class);
                       
                        //job.setOutputFormatClass(TextOutputFormat.class);
                        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
                        MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,IntWritable.class, Text.class);
                       
                        FileInputFormat.setInputPaths(job, new Path(args[0]));
                        FileOutputFormat.setOutputPath(job, new Path(args[1]));
                        job.setMapperClass(SensexTradeMapper.class);
                        job.setReducerClass(SensexTradeReducer.class);
                        System.out.println(job.waitForCompletion(true));
            }
}

PdfInputFormat.java

package com.bhavesh.poc.sensex;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class PdfInputFormat extends FileInputFormat {
            @Override
            public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,
                                    InterruptedException {

                        return new PdfRecordReader();
            }
}

PdfRecordReader.java

package com.bhavesh.poc.sensex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;
public class PdfRecordReader extends RecordReader{
            private String[] lines = null;
            private LongWritable key = null;
            private Text value = null;
            @Override
            public void initialize(InputSplit genericSplit, TaskAttemptContext context)
                                    throws IOException, InterruptedException {
                        FileSplit split = (FileSplit) genericSplit;
                        Configuration job = context.getConfiguration();
                        final Path file = split.getPath();

                        /*
                         * The below code contains the logic for opening the file and seek to
                         * the start of the split. Here we are applying the Pdf Parsing logic
                         */

                        FileSystem fs = file.getFileSystem(job);
                        FSDataInputStream fileIn = fs.open(split.getPath());
                        PDDocument pdf = null;
                        String parsedText = null;
                        PDFTextStripper stripper;               // Here we are coverting PDF to textinputformate
                        pdf = PDDocument.load(fileIn);
                        stripper = new PDFTextStripper();
                        parsedText = stripper.getText(pdf);                      
                        this.lines = parsedText.split("\n");
            }
            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                        if (key == null) {
                                    key = new LongWritable();
                                    key.set(1);
                                    value = new Text();
                                    value.set(lines[0]);
                        } else {
                                    int temp = (int) key.get();
                                    if (temp < (lines.length - 1)) {
                                                int count = (int) key.get();
                                                value = new Text();
                                                value.set(lines[count]);
                                                count = count + 1;
                                                key = new LongWritable(count);
                                    } else {
                                                return false;
                                    }
                        }
                        if (key == null || value == null) {
                                    return false;
                        } else {
                                    return true;
                        }
            }

            @Override
            public LongWritable getCurrentKey() throws IOException,
                                    InterruptedException {
                        return key;
            }
            @Override
            public Text getCurrentValue() throws IOException, InterruptedException {
                        return value;
            }
            @Override
            public float getProgress() throws IOException, InterruptedException {
                        return 0;
            }
            @Override
            public void close() throws IOException {
            }
}

SensexTradeMapper.java

package com.bhavesh.poc.sensex;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SensexTradeMapper extends Mapper<LongWritable, Text, Text, Text> {
            private Text word = new Text();
            protected void map(LongWritable key, Text value, Context context)
                                    throws IOException, InterruptedException {
                        String TradeType;
                        String Location;
                        int Open_bal;
                        int Close_bal;
                        int Fluct_rate;
                        String str[] = value.toString().split(",");
                        TradeType =str[2].toString();
                        Location = str[3].toString();
                        String Opbal = str[4];
                        Open_bal = Integer.parseInt(Opbal);
                        String Clbal = str[5];
                        Close_bal = Integer.parseInt(Clbal);
                        String fcrate = str[6].trim();
                        Fluct_rate = Integer.parseInt(fcrate);
                        if (TradeType.equals(new String("SIP")))
                        {
                                    if (Open_bal > 20000 && Fluct_rate <= 15)
                                    {
                                                word.set(new String("HighDemandMarket"));                              //word == key
                                    }
                                    else if(Close_bal > 30000 && Fluct_rate > 15 && Fluct_rate < 25)
                                    {
                                                word.set(new String("OnGoingMarketStretegy"));
                                    }
                        }
                        else if(TradeType.equals(new String("SHORTTERM")))
                        {
                                    if (Open_bal > 5000 && Open_bal < 30000)
                                    {
                                                word.set(new String("WealthyProducts"));
                                    }
                                    else if((Location.equals(new String("NEWYORK"))) ||
                                                            (Location.equals(new String("MUMBAI"))))
                                    {
                                                word.set(new String("ReliableProducts"));
                                    }
                        }
                        else
                        {
                                    word.set(new String("OtherProducts"));
                        }
                        String rec = str[0]+"\t"+str[1]+"\t"+str[2]+"\t"+str[3]+"\t"+str[4]+"\t"+str[5]+"\t"+str[6];
                       
                        //context.progress();
                        context.write(word, new Text(rec));
            }
}

SensexTradeReducer.java

package com.poc.ssx;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class SensexTradeReducer extends
                        Reducer<Text, Text, IntWritable, Text> {
            private MultipleOutputs<IntWritable, Text> multipleOutputs;
            protected void reduce(Text key, Iterable<Text> values,
                                    Context context) throws IOException, InterruptedException {
                        String str1 = null;
                        int ssxid = 0;
                        for (Text value : values) {
                                    String str[] = value.toString().split("\t");
                                    String sxid = str[0].trim();
                                    ssxid = Integer.parseInt(sxid);
                                    str1 = str[1]+"\t"+str[2]+"\t"+str[3]+"\t"+str[4]+"\t"+str[5]+"\t"+str[6];
                                    //context.write(new IntWritable(ssxid), new Text(ssxname+","+TradeType+","+Location+","+Open_bal+","+Close_bal+","+Fluct_rate));
                                    multipleOutputs.write(new IntWritable(ssxid),
                                                            new Text(str1),
                                                            generateFileName(key));
                        }
            }
            String generateFileName(Text key){
                       
                        return key.toString();                      
            }
            @Override
            public void setup(Context context){
                        multipleOutputs = new MultipleOutputs<IntWritable, Text>(context);
            }
            @Override
            public void cleanup(final Context context) throws IOException, InterruptedException{
                        multipleOutputs.close();
            }          
}

PIG Script

SENSEX.pig

A = LOAD '/hdfs/bhavesh/SENSEX/OUTPUT/HighDemandMarket-r-00000' using PigStorage('\t') as (Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disHM = DISTINCT A;
orHM = ORDER disHM by Sid;
STORE orHM into '/hdfs/bhavesh/SENSEX/HM' using PigStorage(',');

A = LOAD '/hdfs/bhavesh/SENSEX/OUTPUT/ReliableProducts-r-00000' using PigStorage('\t') as (Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disRP = DISTINCT A;
orRP = ORDER disRP by Sid;
STORE orRP into '/hdfs/bhavesh/SENSEX/RP' using PigStorage(',');

A = LOAD '/hdfs/bhavesh/SENSEX/OUTPUT/OtherProducts-r-00000' using PigStorage('\t') as (Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disOP = DISTINCT A;
orOP = ORDER disOP by Sid;
STORE orOP into '/hdfs/bhavesh/SENSEX/OP' using PigStorage(',');

A = LOAD '/hdfs/bhavesh/SENSEX/OUTPUT/WealthyProducts-r-00000' using PigStorage('\t') as (Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disWP= DISTINCT A;
orWP = ORDER disWP by Sid;
STORE orWP into '/hdfs/bhavesh/SENSEX/WP' using PigStorage(',');

A = LOAD '/hdfs/bhavesh/SENSEX/OUTPUT/OnGoingMarketStretegy-r-00000' using PigStorage('\t') as (Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disOMS = DISTINCT A;
orOMS = ORDER disOMS by Sid;
STORE orOMS into '/hdfs/bhavesh/SENSEX/OMS' using PigStorage(',');

Shell Script

SENSEX.sh

###############################################################################
#############################  COMPLETE SCRIPT   ##############################
### HEADER - PROGRAM NAME - <SENSEX.sh>                                                       ###
### AUTHOR - BHAVESH BHADRICHA                                                                        ###
### DATE  - 27/DEC/2015                                                                                                    ###
### VERSION - 1.0                                                                                                                  ###
### DESCRIPTION - Data: Sensex Log Data Processing                                            ###
### (PDF File Processing in Map Reduce)                                                                       ###
###############################################################################
###############################################################################
##################################
###DEFINING THE LOCAL VARIABLES###
##################################
DATE=$(date +"%Y%m%d_%H%M%S")
LOGFILE="/home/bhavesh/POC/SENSEX/LOG/"$DATE".log"

####### Removing if any existent directories ##################################

hadoop fs -rmr /hdfs/bhavesh/SENSEX/RP
hadoop fs -rmr /hdfs/bhavesh/SENSEX/WP
hadoop fs -rmr /hdfs/bhavesh/SENSEX/OP
hadoop fs -rmr /hdfs/bhavesh/SENSEX/OMS
hadoop fs -rmr /hdfs/bhavesh/SENSEX/HM

##################################################################################
############## PDF File Processing USING Map Reduce ##############################
##################################################################################
echo "Mapreduce Program starts here"

echo "PDF File Processing in Map Reduce Started" >> $LOGFILE

hadoop fs -rmr /hdfs/bhavesh/SENSEX/OUTPUT

hadoop jar /home/bhavesh/POC/SENSEX/Mapreduce/SENSEX.jar com.bhavesh.poc.sensex.PdfInputDriver /hdfs/bhavesh/SENSEX/INPUT/sensexinputfile.pdf /hdfs/bhavesh/SENSEX/OUTPUT

if [ $? -eq 0 ]; then
    echo "Succesfully finished Mapreduce Processing " >> $LOGFILE
else
    echo "SENSEX MapReduce Failed Please check the Log " >> $LOGFILE
fi

#################################################################################
############### PIG Processing for SEXSEX DATA  #################################
#################################################################################

echo "SENSEX Pig Processing started "

echo "SENSEX PIG Processing Started" >> $LOGFILE

pig -f /home/bhavesh/POC/SENSEX/PIG/SENSEX.pig

if [ $? -eq 0 ]; then
    echo "PIG Succesfully finished SENSEX Processing " >> $LOGFILE
else
    echo "PIG SENSEX Processing Failed Please check the Log " >> $LOGFILE
fi

################################################################################
############# IMPORTING DATA in SQOOP ##########################################
################################################################################

echo "Importing the data to MYSQL  using SQOOP ";

echo "Importing the data to MYSQL " >> $LOGFILE

##### Creating the tables in MySql
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "create database if not exists  SENSEX;";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "use SENSEX;";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "grant all privileges on SENSEX.* to '%'@'localhost'";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "grant all privileges on SENSEX.* to ''@'localhost'";

sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "drop table if exists HighDemandMarket";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "drop table if exists WealthyProducts";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "drop table if exists OngoingMarketSt";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "drop table if exists ReliableProducts";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "drop table if exists OtherProducts";

echo " MYSQL table creation"

sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "create table HighDemandMarket (Sid int,Sname varchar(30),TType varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "create table WealthyProducts(Sid int,Sname varchar(30),TType varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "create table OngoingMarketSt(Sid int,Sname varchar(30),TType varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "create table ReliableProducts(Sid int,Sname varchar(30),TType varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";
sqoop eval --connect jdbc:mysql://localhost/SENSEX -username root -password root --query "create table OtherProducts(Sid int,Sname varchar(30),TType varchar(20),TLoc varchar(20),OpenBal int,CloseBal int,FlucRate int)";

echo "data exporting";

#### exporting the data into MYSQL
sqoop export --connect jdbc:mysql://localhost/SENSEX -username root -password root --table HighDemandMarket --export-dir /hdfs/bhavesh/SENSEX/HM/part-r-00000 --fields-terminated-by ',';
sqoop export --connect jdbc:mysql://localhost/SENSEX -username root -password root --table WealthyProducts --export-dir /hdfs/bhavesh/SENSEX/WP/part-r-00000 --fields-terminated-by ',';
sqoop export --connect jdbc:mysql://localhost/SENSEX -username root -password root --table OngoingMarketSt --export-dir /hdfs/bhavesh/SENSEX/OMS/part-r-00000 --fields-terminated-by ',';
sqoop export --connect jdbc:mysql://localhost/SENSEX -username root -password root --table ReliableProducts --export-dir /hdfs/bhavesh/SENSEX/RP/part-r-00000 --fields-terminated-by ',';
sqoop export --connect jdbc:mysql://localhost/SENSEX -username root -password root --table OtherProducts --export-dir /hdfs/bhavesh/SENSEX/OP/part-r-00000  --fields-terminated-by ',';

if[$? -eq 0]
echo "exporting of data to MYSQL is done";

echo "exporting of data to MYSQL is done" >> $LOGFILE

echo "creation of hive tables started";

echo "creation of hive tables started " >> $LOGFILE

hive -f /home/bhavesh/POC/SENSEX/HIVE/SENSEX.hql

echo "Hive process is done";
echo "HIVE PROCESSING is done" >> $LOGFILE
exit;

HIVE

SENSEX.hql

use SENSEX;

Drop table HDM;
Drop table WP;
Drop table RP;
Drop table OP;
Drop table OMS;

create table HDM(Sid int,Sname string,TTrading string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;

load data inpath '/hdfs/bhavesh/SENSEX/HM/part-r-00000' into table HDM;

create table WP(Sid int,Sname string,TTrading string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;

load data inpath '/hdfs/bhavesh/SENSEX/WP/part-r-00000' into table WP;

create table RP(Sid int,Sname string,TTrading string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;

load data inpath '/hdfs/bhavesh/SENSEX/RP/part-r-00000' into table RP;

create table OP(Sid int,Sname string,TTrading string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;

load data inpath '/hdfs/bhavesh/SENSEX/OP/part-r-00000' into table OP;

create table OMS(Sid int,Sname string,TTrading string,Sloc String,OpenBal int,CloseBal int,FlucRate int)
row format delimited
fields terminated by ","
stored as textfile;

load data inpath '/hdfs/bhavesh/SENSEX/OMS/part-r-00000' into table OMS;

Exection








MapReduce Output


MYSQL Output






10 comments:

  1. Hi, good & thank you for the nice work, can you also please share the pdf document that you have used for this sample, I would like to try it out please.

    ReplyDelete
  2. Hi Bhavesh,
    Can you please share the document u used as the raw file to test this

    ReplyDelete
  3. Can you please provide me with JAR files of PDDocument?

    ReplyDelete
  4. plz upload the input file.or give a link so we can download that input file.

    ReplyDelete
  5. Heyy Bhavesh,
    Can you please share the row documents you used as the raw file.

    ReplyDelete
  6. please send me your dataset

    ReplyDelete