Breaking

Sunday 21 August 2016

How to Analyze Data in Apache Spark

In this activity, you we will load data into Apache Spark and inspect the data using the Spark  In this section, we use the SparkContext method, textFile, to load the data into a Resilient Distributed Dataset (RDD).

Our dataset is a .csv file that consists of online auction data. Each auction has an auction id associated with it and can have multiple bids. Each row represents a bid. For each bid, we have the following information:

Column
Type
Description
Aucid
String
Auction ID
Bid
Float
Bid amount
Bidtime
Float
Time of bid from start of auction
Bidder
String
The bidder’s userid
Bidrate
Int
The bidder’s rating
Openbid
Float
Opening price
Price
Float
Final price
Itemtype
String
Item type
Dtl
Int
Days to live


We load this data into Spark using RDDs

Objectives
• Load data into Spark
• Use transformations and actions to inspect the data

What transformations and actions would you use in each case?

1. How do you see the first element of the inputRDD? 
2. What do you use to see the first 5 elements of the RDD?
3. What is the total number of bids?
4. What is the total number of distinct items that were auctioned?
5. What is the total number of item types that were auctioned?
6. What is the total number of bids per item type?
7. Across all auctioned items, what is the minimum number of bids?
8. Across all auctioned items, what is the maximum number of bids?
9. What is the average number of bids? 

Input file (contains 10654 Records)
















Spark Code written in Java 

package BiddingDataAnalysis;
import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;

import scala.Function;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;

public class Bidding 
{
public static void main(String[] args) throws Exception
{
if (args.length<1)
{
System.err.println("Usage: Auction Bid <File> ");
System.exit(1);
}
SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> diskfile = sc.textFile(args[0]);
/* How do you see the first element of the Input RDD? */
System.out.println("First Element is = " + diskfile.first());
/* How do you use to see the first 5 element of the RDD? */
System.out.println("First Five Element are = " + diskfile.take(5));

/* What is the total number of bid? */
System.out.println("Total Number of Bids are = " + diskfile.count());
JavaRDD<String> words = diskfile.flatMap(new FlatMapFunction<String, String>() {
     public Iterator<String> call(String s) {
       return Arrays.asList(s.split(",")[0]).iterator();
     }
   });
System.out.println("Total number of distinct item that were auctioned =" +words.distinct().count());
/* What is the total number of item types that were auctioned? */
JavaRDD<String> items = diskfile.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s){
return Arrays.asList(s.split(",")[7]).iterator();
}
});
System.out.println("Total number of Item types that were auctioned = " + items.distinct().count());
/* What is the Total number of Bids per item type */
JavaPairRDD<String,Integer> pairs = items.mapToPair(new PairFunction<String, String, Integer>() {

public Tuple2<String, Integer> call(String t) throws Exception {
// TODO Auto-generated method stub
return new Tuple2(t,1);
}
});
JavaPairRDD<String,Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
System.out.println("Total number of Bids per item type" + counts.collect());
JavaPairRDD<Integer,Integer> auctionidkey = diskfile.mapToPair(s-> new Tuple2(s.split(",")[0],1));
JavaPairRDD<Integer, Integer> countgroup = auctionidkey.reduceByKey((a,b)->a+b);
JavaRDD<Integer> onlycount = countgroup.values();
JavaRDD<Integer> sortedcount = onlycount.sortBy(new org.apache.spark.api.java.function.Function<Integer,Integer>() {  
@Override
public Integer call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1.intValue();
}
}, true, 1 );
System.out.println("Across all auctioned items,the minimum number of bids" + sortedcount.first());
 
Integer max = sortedcount.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
if (v1 > v2)
{
return v1;
}
else 
{
return v2;
}
}  
});
 
System.out.println("Across all auctioned items,the maximum number of bids = " + max);
 
Integer summationcount = sortedcount.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
 
int totalcount = (int) sortedcount.count();
 
float average = (float) summationcount/totalcount;
 
System.out.println("Across all auctioned items,the avarage number of bids = " + average);
 
}
}

Code Output
























































































































































2 comments:

  1. This code is working for single column of data, can you please update the code for multicolumn operations ?

    ReplyDelete