1. Overview

In Apache Spark, most of the transformations and many of its actions, rely on passing function to
spark.
Different languages like, Java, Python and Scala provides multiple ways for passing function to apache spark. so in this article, we are going to discuss how we can pass function to spark using Java.

2. Development environment

Java : Oracle JDK 1.8
Spark : Apache Spark 2.0.0-bin-hadoop2.6
IDE : Eclipse
Build Tool: Gradle 4.4.1

3.Problem Statement

Consider a hypothetical case where our best friend John is planning to host a new Year party on his farm.
John is preparing a guest’s list and trying to estimate the number of guests for the party.
He has prepared the guest list and also listed all the friends and families.
Now John is curious to know the answers of following questions,
From guests list,

  • How many Adults and Minors are there? As he is planning to have games for kids !!
  • How many families will participate in the party? As he has to book the hotel rooms for families.

John has prepared his guest list is and guest list file is present in csv format which looks like,

1,John,35,1
2,Jinni,32,1
3,Ronan,3,1
4,Alex,40,2
5,Maria,36,2
6,Shira,4,2
7,Hugo,2,2
8,Erik,26,3
9,Robert,42,4
10,Anna,40,4
11,Antonio,6,4
12,Marco,4,4
13,Daniel,31,5
14,Milena,30,5
15,Brayden,2,5
16,Sergey,28,6
17,Elen,29,6
18,Eduard,32,7
19,Levon,34,8
20,Mark,30,9

The entries in guests.csv file is in following format,
<Sequence><MemberName><Age><FamilyID>

Each row represents individual guest’s information like, name, age, family id etc.
Last column represents the Family id which represents unique id assigned to each family.
So the guests whose family id is same, belongs to same family.

4 Project Structure

Passing Function To Spark

5. Solution

Now, being a Senior Big Data developer, John tries to solve his problem by using his Big Data toolkit.
So he can divide the problem in multiple steps for solving all the problem statements he has.
So Now, his checklist for solving the problems is like,

  • Loading the external dataset (guests.csv)
  • Separate the Adult and Minors for counting Minors( for knowing Minor counts for Games !!)
  • Create family aggregator (For knowing number of families for Hotel booking !!)

5.1 Loading the External Dataset

Loading external dataset and creating RDD is fairly easy in Spark you may check Spark RDD Example to load the CSV file or the snippet looks like,

JavaRDD<String> guestsRDD =  sparkContext.textFile(args[0]);

5.2 Separate the Adult and Minors for counting

Now for separating Adult and Minors, John wrote a custom Function by implementing Function interface in Spark, and the code for Adult Minor separator looks like,

static class AdultMinorSeparator implements Function<String,String>{
        @Override
        public String call(String input){
            int age = Integer.parseInt(input.split(",")[2]);
            return ( age >= 18 ? "Adult" : "Minor");
        }
}

5.2.1 Passing Function to Spark

/*Java Function Passing with named class.*/
JavaRDD<String> adultMinorRDD = guestsRDD.map(new AdultMinorSeparator());

5.2.2 Create Paired RDD in Spark For Adult, Minor Counts

/*Java function passing with anonymous inner class*/
JavaPairRDD<String,Integer> pairedRDD= adultMinorRDD.mapToPair(new PairFunction<String, String, Integer>() {
       	@Override
       	public Tuple2<String, Integer> call(String input) throws Exception {
        		
       		Tuple2<String,Integer> result = new Tuple2<String, Integer>(input,1);
        		
       		return result;
       	}
});

5.2.3 Aggregation using reduceByKey() for Adults & Minors counts

JavaPairRDD<String,Integer> adultMinorCountsRDD = pairedRDD.reduceByKey(new Function2<Integer,Integer,Integer>(){
        	
        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
        		
        	return v1+v2;
        }
        	
});

5.2.4 Save the RDD to External file

adultMinorCountsRDD.saveAsTextFile(args[1]);

5.3 Create family aggregator

In order to create Family aggregation, John wrote Family class which looks like,

public class Family implements Serializable {
    private int familyId;
    private List<String> membersList;
    public Family() {
    }
    public Family(int familyId, List<String> membersList) {
        this.familyId = familyId;
        this.membersList = membersList;
    }
    public int getFamilyId() {
        return familyId;
    }
    public void setFamilyId(int familyId) {
        this.familyId = familyId;
    }
    public List<String> getMembersList() {
        return membersList;
    }
    public void setMembersList(List<String> membersList) {
        this.membersList = membersList;
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof Family)) return false;
        Family that = (Family) o;
        return familyId == that.familyId;
    }
    @Override
    public int hashCode() {
        return Objects.hash(familyId);
    }
    @Override
    public String toString() {
        return membersList.toString().replace("[","").replace("]","");
    }
}

5.3.1 Create Paired RDD in Spark for Family, FamilyID Pairing

JavaPairRDD<Integer,Family> familyJavaPairRDD = guestsRDD.mapToPair(
  new PairFunction<String, Integer, Family>() {
    @Override
    public Tuple2<Integer, Family> call(String input) {
      Family family = new Family();
      String[] values = input.split(",");
      int familyId = Integer.parseInt(values[values.length-1]);
      family.setFamilyId(familyId);
      List<String> memberList = new ArrayList<String>();
      memberList.add(values[1]);
      family.setMembersList(memberList);
      return new Tuple2<Integer, Family>(familyId,family);
    }
  }
);

5.3.2 Aggregation using reduceByKey() for Family

JavaPairRDD<Integer,Family> aggregatedFamilyPairsRDD =familyJavaPairRDD.reduceByKey(new Function2<Family, Family, Family>() {
  @Override
  public Family call(Family v1, Family v2) throws Exception {
    Family result = new Family();
    result.setFamilyId(v1.getFamilyId());
    List<String> membersList = new ArrayList<>(v1.getMembersList().size() + v2.getMembersList().size());
    membersList.addAll(v1.getMembersList());
    membersList.addAll(v2.getMembersList());
    result.setMembersList(membersList);
    return result;
  }
});

5.3.4 Save the RDD to External file

aggregatedFamilyPairsRDD.saveAsTextFile(args[2]);

6 Output

After running the program John found the solution output as shown below.

6.1 Adult Minor Count output

(Minor,6)
(Adult,14)

6.2 Family Aggregation Output

(4,Robert, Anna, Antonio, Marco)
(1,John, Jinni, Ronan)
(6,Sergey, Elen)
(3,Erik)
(7,Eduard)
(9,Mark)
(8,Levon)
(5,Daniel, Milena, Brayden)
(2,Alex, Maria, Shira, Hugo)

So now John is able to find solution of all his problems and he is Happy to Host the party, Thanks to Apache Spark.

In this article we have learned use of Function, Function2 Java API in Spark apart from those functions we also learned use of RDD functions reduceByKey() and mapToPair.

7. API & References

We have used Spark API for Java for writing this article, you can find complete John Party problem solution at our Git repository.

 

 

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *