In this activity, you we will load data into Apache Spark and inspect the data using the Spark In this section, we use the SparkContext method, textFile, to load the data into a Resilient Distributed Dataset (RDD).
Our dataset is a .csv file that consists of online auction data. Each auction has an auction id associated with it and can have multiple bids. Each row represents a bid. For each bid, we have the following information:
Column | Type | Description |
Aucid | String | Auction ID |
Bid | Float | Bid amount |
Bidtime | Float | Time of bid from start of auction |
Bidder | String | The bidder’s userid |
Bidrate | Int | The bidder’s rating |
Openbid | Float | Opening price |
Price | Float | Final price |
Itemtype | String | Item type |
Dtl | Int | Days to live |
We load this data into Spark using RDDs
Objectives
• Load data into Spark
• Use transformations and actions to inspect the data
What transformations and actions would you use in each case?
1. How do you see the first element of the inputRDD?
2. What do you use to see the first 5 elements of the RDD?
3. What is the total number of bids?
4. What is the total number of distinct items that were auctioned?
5. What is the total number of item types that were auctioned?
6. What is the total number of bids per item type?
7. Across all auctioned items, what is the minimum number of bids?
8. Across all auctioned items, what is the maximum number of bids?
9. What is the average number of bids?
Input file (contains 10654 Records)
Spark Code written in Java
package BiddingDataAnalysis;
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Function;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
public class Bidding
{
public static void main(String[] args) throws Exception
{
if (args.length<1)
{
System.err.println("Usage: Auction Bid <File> ");
System.exit(1);
}
SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> diskfile = sc.textFile(args[0]);
/* How do you see the first element of the Input RDD? */
System.out.println("First Element is = " + diskfile.first());
/* How do you use to see the first 5 element of the RDD? */
System.out.println("First Five Element are = " + diskfile.take(5));
/* What is the total number of bid? */
System.out.println("Total Number of Bids are = " + diskfile.count());
JavaRDD<String> words = diskfile.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) {
return Arrays.asList(s.split(",")[0]).iterator();
}
});
System.out.println("Total number of distinct item that were auctioned =" +words.distinct().count());
/* What is the total number of item types that were auctioned? */
JavaRDD<String> items = diskfile.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s){
return Arrays.asList(s.split(",")[7]).iterator();
}
});
System.out.println("Total number of Item types that were auctioned = " + items.distinct().count());
/* What is the Total number of Bids per item type */
JavaPairRDD<String,Integer> pairs = items.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String t) throws Exception {
// TODO Auto-generated method stub
return new Tuple2(t,1);
}
});
JavaPairRDD<String,Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
System.out.println("Total number of Bids per item type" + counts.collect());
JavaPairRDD<Integer,Integer> auctionidkey = diskfile.mapToPair(s-> new Tuple2(s.split(",")[0],1));
JavaPairRDD<Integer, Integer> countgroup = auctionidkey.reduceByKey((a,b)->a+b);
JavaRDD<Integer> onlycount = countgroup.values();
JavaRDD<Integer> sortedcount = onlycount.sortBy(new org.apache.spark.api.java.function.Function<Integer,Integer>() {
@Override
public Integer call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1.intValue();
}
}, true, 1 );
System.out.println("Across all auctioned items,the minimum number of bids" + sortedcount.first());
Integer max = sortedcount.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
if (v1 > v2)
{
return v1;
}
else
{
return v2;
}
}
});
System.out.println("Across all auctioned items,the maximum number of bids = " + max);
Integer summationcount = sortedcount.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
int totalcount = (int) sortedcount.count();
float average = (float) summationcount/totalcount;
System.out.println("Across all auctioned items,the avarage number of bids = " + average);
}
}
Code Output
This code is working for single column of data, can you please update the code for multicolumn operations ?
ReplyDeleteexcellent work
ReplyDeleteMore Knowledge about Hadoop Here