1. Overview

I have scoured the internet and I think Apache Spark is first choice among bigdata processing frameworks.
For processing and finding meaningful business insights from messive datasets, joining two or more dataset is a principle operation so, in this article we are going to explain Spark Joins.

2. Development environment

Java : Oracle JDK 1.8
SCALA : 2.11.7
Spark : Apache Spark 2.0.0-bin-hadoop2.6
IDE : Eclipse
Build Tool: Gradle 4.4.1

3. Input Files

For explaining Spark Joins, we are going to use a data set from an online retail store.
The dataset contains 2 files.

i). UserDetails.csv
This file contains User data in <UserID>,<FirstName>,<LastName>format.

ii). AddressDetails.csv
This file contains Address data in <AddressID>,<City>,<State>,<Country>format.

In above files, Address ID is mapped to User ID. That means if User ID is 1, his address is stored at Address ID as 1.

3.1 User details sample input file for Spark Joins:

1,Shyama,Patni
2,Paul,Kam
3,Jayesh,Patel
4,Sid,Dave
5,Prakash,Aarya
6,Jastin,Cohelo

3.2 Address details sample input file for Spark Joins:

1,Kolkata,W.Bengal,India
2,Atlanta,Georgia,USA
3,Rajkot,Gujarat,India
4,Mumbai,Maharashtra,India
6,San Francisco,California,USA

 

4 Project Structure

Spark Joins

Spark Joins

5. Solution

We are using JavaRDD,JavaPairRDD,PairFunction from Spark Java API for explaining Spark Joins using Java.

Java source file
SparkJoins.java

Scala source file
SparkJoins.java

5.1 Build File : build.gradle

apply plugin: 'java'
apply plugin: 'scala'

description """Spark Joins"""

sourceCompatibility = 1.8
targetCompatibility = 1.8

/* In this section you declare where to find the dependencies of your project*/
repositories {
    jcenter()
    mavenCentral()
    maven { url "https://mvnrepository.com/artifact/org.apache.spark/spark-core" }
}

dependencies {
    
    compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0'
       
}

5.2 Java Code: SparkJoins.java

package com.javadeveloperzone.spark.java;

import java.io.FileNotFoundException;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SparkJoins {

    
    public static void main(String[] args) throws FileNotFoundException {
    	
    	SparkConf sparkConf = new SparkConf().setAppName("Apache Spark example - Spark Joins");
        
    	/*Setting Master for running it from IDE.
    	 *User may set more than 1 if user is running it on multicore processor */
    sparkConf.setMaster("local[1]");
    	
    	JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
    	
        JavaRDD<String> userInputRDD = sparkContext.textFile(args[0]);
        
        /*Method 1 :: We are creating PairFunction first, in which each Tuple2 will contain the Key, Value like,
         *KEY :: UserID  
         *VALUE :: <FirstName,LastName> */
        PairFunction<String, String, String> userKeyValueData = new PairFunction<String, String, String>() {
        	
        	public Tuple2<String, String> call(String s) {
                String[] userVaues = s.split(",");

                return new Tuple2<String, String>(userVaues[0], userVaues[1]+","+userVaues[2]);
            }
    };
  
    /*Once the userKeyValue data is ready, we are mapping it using mapToPair function with distinct values
     *which returns JavaPairRDD.*/
    JavaPairRDD<String,String> userPairs = userInputRDD.mapToPair(userKeyValueData).distinct(); 
        
        JavaRDD<String> addressInputRDD = sparkContext.textFile(args[1]);
       
        /*Method 2 :: We are directly creating JavaPairRDD using mapToPair function and we are passing the new PairFunction
         *with its definition which is returning a Tuple2 object which contains,
         *KEY :: AddressID  
         *VALUE :: <City,State,Country> */
        
        JavaPairRDD<String, String> contactDetailPairs = addressInputRDD.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) {
                String[] contactDetailValues = s.split(",");
                return new Tuple2<String, String>(contactDetailValues[0], contactDetailValues[1]+","+contactDetailValues[2]+","+contactDetailValues[3]);
            }
        });

        /*Default Join operation (Inner join)*/
        JavaPairRDD<String, Tuple2<String, String>> joinsOutput = userPairs.join(contactDetailPairs);
        
        /*Storing the result of inner Join values*/
        joinsOutput.saveAsTextFile(args[2]+"/InnerJoin");

        /*Left Outer join operation*/
        JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = userPairs.leftOuterJoin(contactDetailPairs).groupByKey().sortByKey();
        
        /*Storing values of Left Outer join*/
        leftJoinOutput.saveAsTextFile(args[2]+"/LeftOuterJoin");

        /*Right Outer join operation*/
        JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = userPairs.rightOuterJoin(contactDetailPairs).groupByKey().sortByKey();
        
        /*Storing values of Right Outer join*/
        rightJoinOutput.saveAsTextFile(args[2]+"/RightOuterJoin");

        sparkContext.stop();
        
        sparkContext.close();
        
       
    
    }
}

 

5.3 Scala Code: SparkJoins.scala

package com.javadeveloperzone.spark.scala

import org.apache.spark._
import org.apache.spark.SparkContext._

object SparkJoins {
  
  def main(args: Array[String]) 
   {
     
     /*By default we are setting local so it will run locally with one thread 
      *Specify: "local[2]" to run locally with 2 cores, OR 
      *        "spark://master:7077" to run on a Spark standalone cluster */
     
      val sparkContext = new SparkContext("local","Spark Joins",
          System.getenv("SPARK_HOME"))
      
      /*Reading input from User Data File*/
      val userInputRDD = sparkContext.textFile(args(0))
      
      /*We are creating User Pairs which will contain the Key, Value like,
       *KEY :: UserID  
       *VALUE :: <FirstName,LastName> */
      val userPairs = userInputRDD.map(line => (line.split(",")(0), line.split(",")(1)+","+line.split(",")(2)))
      
      /*Reading input from Address Data File*/
      val addressInputRDD = sparkContext.textFile(args(1))
      
      /*We are directly Address Pairs which will contain the Key, Value like,,
       *KEY :: AddressID  
       *VALUE :: <City,State,Country> */
      val addressPairs = addressInputRDD.map(line => (line.split(",")(0), line.split(",")(1)+","+line.split(",")(2)+","+line.split(",")(3)))
      
      /*Default Join operation (Inner join)*/
      val innerJoinResult = userPairs.join(addressPairs);
      
      innerJoinResult.saveAsTextFile(args(2)+"/InnerJoin")
      
      /*Left Outer join operation*/
      val leftOuterJoin = userPairs.leftOuterJoin(addressPairs)
      
      /*Storing values of Left Outer join*/
      leftOuterJoin.saveAsTextFile(args(2)+"/LeftOuterJoin")
      
      /*Right Outer join operation*/
      val rightOuterJoin = userPairs.rightOuterJoin(addressPairs)
      
      /*Storing values of Right Outer join*/
      rightOuterJoin.saveAsTextFile(args(2)+"/RightOuterJoin")
      
      sparkContext.stop();
      
    }
}

6. Build & Run Spark Joins example

We need to pass 3 arguments to run the program.
First argument will be input file for user details file.
Second argument will be input file for address details file.
Third argument will be output path for saving Spark Joins output.
In our example we are creating following 3 different folders under output path,

InnerJoin
LeftOuterJoin
RightOuterJoin

All 3 folders contains the output of respective joins using Java / Scala.

7. Output (Portion)

If you run the SparkJoins.java after successful completion of program, you will get the output which will look like,

7.1 Inner Join output

(4,(Sid,Dave,Mumbai,Maharashtra,India))
(6,(Jastin,Cohelo,San Francisco,California,USA))
(2,(Paul,Kam,Atlanta,Georgia,USA))
(3,(Jayesh,Patel,Rajkot,Gujarat,India))
(1,(Shyama,Patni,Kolkata,W.Bengal,India))

 

7.2 Left Outer Join output

(1,[(Shyama,Patni,Optional[Kolkata,W.Bengal,India])])
(2,[(Paul,Kam,Optional[Atlanta,Georgia,USA])])
(3,[(Jayesh,Patel,Optional[Rajkot,Gujarat,India])])
(4,[(Sid,Dave,Optional[Mumbai,Maharashtra,India])])
(5,[(Prakash,Aarya,Optional.empty)])
(6,[(Jastin,Cohelo,Optional[San Francisco,California,USA])])

 

7.3 Right Outer Join output

(1,[(Optional[Shyama,Patni],Kolkata,W.Bengal,India)])
(2,[(Optional[Paul,Kam],Atlanta,Georgia,USA)])
(3,[(Optional[Jayesh,Patel],Rajkot,Gujarat,India)])
(4,[(Optional[Sid,Dave],Mumbai,Maharashtra,India)])
(6,[(Optional[Jastin,Cohelo],San Francisco,California,USA)])

As you can see in the above output Spark emits values as Optional in Left Outer Join and Right Outer join.

8. API & References

Optional(Java Class) : This class represents a value of a given type that may or may not exist. It is used in methods that wish to optionally return a value, in preference to returning null.
Some (Scala Class) : Similarly if you run SparkJoins.scala, you will find that it uses Some to represent the values in Left outer join and Right outer Join which means that one or more matching value might be present.
Reference: https://spark.apache.org/docs/2.0.0/

9. Source Code

You can download the source code of Spark Joins example from our git repository, which gives you step by step understanding for writing Spark Joins using Java/Scala.

Was this post helpful?
Let us know, if you liked the post. Only in this way, we can improve us.
Yes
No

Leave a Reply

Your email address will not be published. Required fields are marked *