

Table of Contents
- 1. Overview
- 2. Development environment
- 3.Problem Statement
- 4 Project Structure
- 5. Solution
- 5.1 Loading the External Dataset
- 5.2 Separate the Adult and Minors for counting
- 5.2.1 Passing Function to Spark
- 5.2.2 Create Paired RDD in Spark For Adult, Minor Counts
- 5.2.3 Aggregation using reduceByKey() for Adults & Minors counts
- 5.2.4 Save the RDD to External file
- 5.3 Create family aggregator
- 5.3.1 Create Paired RDD in Spark for Family, FamilyID Pairing
- 5.3.2 Aggregation using reduceByKey() for Family
- 5.3.4 Save the RDD to External file
- 6 Output
- 7. API & References
1. Overview
In Apache Spark, most of the transformations and many of its actions, rely on passing function to
spark.
Different languages like, Java, Python and Scala provides multiple ways for passing function to apache spark. so in this article, we are going to discuss how we can pass function to spark using Java.
2. Development environment
Java : Oracle JDK 1.8
Spark : Apache Spark 2.0.0-bin-hadoop2.6
IDE : Eclipse
Build Tool: Gradle 4.4.1
3.Problem Statement
Consider a hypothetical case where our best friend John is planning to host a new Year party on his farm.
John is preparing a guest’s list and trying to estimate the number of guests for the party.
He has prepared the guest list and also listed all the friends and families.
Now John is curious to know the answers of following questions,
From guests list,
- How many Adults and Minors are there? As he is planning to have games for kids !!
- How many families will participate in the party? As he has to book the hotel rooms for families.
John has prepared his guest list is and guest list file is present in csv format which looks like,
1,John,35,1 2,Jinni,32,1 3,Ronan,3,1 4,Alex,40,2 5,Maria,36,2 6,Shira,4,2 7,Hugo,2,2 8,Erik,26,3 9,Robert,42,4 10,Anna,40,4 11,Antonio,6,4 12,Marco,4,4 13,Daniel,31,5 14,Milena,30,5 15,Brayden,2,5 16,Sergey,28,6 17,Elen,29,6 18,Eduard,32,7 19,Levon,34,8 20,Mark,30,9
The entries in guests.csv file is in following format,
<Sequence><MemberName><Age><FamilyID>
Each row represents individual guest’s information like, name, age, family id etc.
Last column represents the Family id which represents unique id assigned to each family.
So the guests whose family id is same, belongs to same family.
4 Project Structure
5. Solution
Now, being a Senior Big Data developer, John tries to solve his problem by using his Big Data toolkit.
So he can divide the problem in multiple steps for solving all the problem statements he has.
So Now, his checklist for solving the problems is like,
- Loading the external dataset (guests.csv)
- Separate the Adult and Minors for counting Minors( for knowing Minor counts for Games !!)
- Create family aggregator (For knowing number of families for Hotel booking !!)
5.1 Loading the External Dataset
Loading external dataset and creating RDD is fairly easy in Spark you may check Spark RDD Example to load the CSV file or the snippet looks like,
JavaRDD<String> guestsRDD = sparkContext.textFile(args[0]);
5.2 Separate the Adult and Minors for counting
Now for separating Adult and Minors, John wrote a custom Function by implementing Function interface in Spark, and the code for Adult Minor separator looks like,
static class AdultMinorSeparator implements Function<String,String>{ @Override public String call(String input){ int age = Integer.parseInt(input.split(",")[2]); return ( age >= 18 ? "Adult" : "Minor"); } }
5.2.1 Passing Function to Spark
/*Java Function Passing with named class.*/ JavaRDD<String> adultMinorRDD = guestsRDD.map(new AdultMinorSeparator());
5.2.2 Create Paired RDD in Spark For Adult, Minor Counts
/*Java function passing with anonymous inner class*/ JavaPairRDD<String,Integer> pairedRDD= adultMinorRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String input) throws Exception { Tuple2<String,Integer> result = new Tuple2<String, Integer>(input,1); return result; } });
5.2.3 Aggregation using reduceByKey() for Adults & Minors counts
JavaPairRDD<String,Integer> adultMinorCountsRDD = pairedRDD.reduceByKey(new Function2<Integer,Integer,Integer>(){ @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } });
5.2.4 Save the RDD to External file
adultMinorCountsRDD.saveAsTextFile(args[1]);
5.3 Create family aggregator
In order to create Family aggregation, John wrote Family class which looks like,
public class Family implements Serializable { private int familyId; private List<String> membersList; public Family() { } public Family(int familyId, List<String> membersList) { this.familyId = familyId; this.membersList = membersList; } public int getFamilyId() { return familyId; } public void setFamilyId(int familyId) { this.familyId = familyId; } public List<String> getMembersList() { return membersList; } public void setMembersList(List<String> membersList) { this.membersList = membersList; } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof Family)) return false; Family that = (Family) o; return familyId == that.familyId; } @Override public int hashCode() { return Objects.hash(familyId); } @Override public String toString() { return membersList.toString().replace("[","").replace("]",""); } }
5.3.1 Create Paired RDD in Spark for Family, FamilyID Pairing
JavaPairRDD<Integer,Family> familyJavaPairRDD = guestsRDD.mapToPair( new PairFunction<String, Integer, Family>() { @Override public Tuple2<Integer, Family> call(String input) { Family family = new Family(); String[] values = input.split(","); int familyId = Integer.parseInt(values[values.length-1]); family.setFamilyId(familyId); List<String> memberList = new ArrayList<String>(); memberList.add(values[1]); family.setMembersList(memberList); return new Tuple2<Integer, Family>(familyId,family); } } );
5.3.2 Aggregation using reduceByKey() for Family
JavaPairRDD<Integer,Family> aggregatedFamilyPairsRDD =familyJavaPairRDD.reduceByKey(new Function2<Family, Family, Family>() { @Override public Family call(Family v1, Family v2) throws Exception { Family result = new Family(); result.setFamilyId(v1.getFamilyId()); List<String> membersList = new ArrayList<>(v1.getMembersList().size() + v2.getMembersList().size()); membersList.addAll(v1.getMembersList()); membersList.addAll(v2.getMembersList()); result.setMembersList(membersList); return result; } });
5.3.4 Save the RDD to External file
aggregatedFamilyPairsRDD.saveAsTextFile(args[2]);
6 Output
After running the program John found the solution output as shown below.
6.1 Adult Minor Count output
(Minor,6) (Adult,14)
6.2 Family Aggregation Output
(4,Robert, Anna, Antonio, Marco) (1,John, Jinni, Ronan) (6,Sergey, Elen) (3,Erik) (7,Eduard) (9,Mark) (8,Levon) (5,Daniel, Milena, Brayden) (2,Alex, Maria, Shira, Hugo)
So now John is able to find solution of all his problems and he is Happy to Host the party, Thanks to Apache Spark.
In this article we have learned use of Function, Function2 Java API in Spark apart from those functions we also learned use of RDD functions reduceByKey() and mapToPair.
7. API & References
We have used Spark API for Java for writing this article, you can find complete John Party problem solution at our Git repository.