1. Overview

  • In this article, We are going to explain Reduce Side Join MapReduce example using Java.
  • Joining two or more data sets, is perhaps the most common problem in Bigdata world.
  • Hadoop shines, when it comes to process petabytes scale data using Distributed processing frameworks.
  • To experience true computing power of Hadoop, we should process huge amount of data using MapReduce,
    but in our article, we are going to use small amount of data to explain the Reduce side Joins in Hadoop.

2. Development environment

Java    : Oracle JDK 1.8
Hadoop  : Apache Hadoop 2.6.1
IDE    : Eclipse
Build Tool: Gradle 4.4.1

3. Sample Input

For this tutorial, we are using below text files(UTF-8 format) as input,

Input File 1 : 4-UserDetails.csv

This file contains the user related data, where data is present in following format,
<UserID><FirstName><LastName>
i.e

1,Shyama,Patni
2,Paul,Kam
3,Jayesh,Patel
4,Sid,Dave
5,Prakash,Aarya
6,Jastin,Cohelo

Input File 2 : 4-AddressDetails.csv

This file contains the User’s Address related data, where data is present in following format,
<UserID><City><State><Country>
i.e

1,Kolkata,W.Bengal,India
2,Atlanta,Georgia,USA
3,Rajkot,Gujarat,India
4,Mumbai,Maharashtra,India
6,San Francisco,California,USA

4. Solution

We are going to use following 4 Java files for this example,

UserFileMapper.java
AddressFileMapper.java
UserDataReducer.java
ReduceSideJoinDriver.java
Reduce Side Join Mapreduce example using Java Project Architecture

Project Architecture – Reduce Side Join Mapreduce example using Java

4.1 Build File : build.gradle

apply plugin: 'java'

description """Hadoop Joins examples Using Java"""

sourceCompatibility = 1.8
targetCompatibility = 1.8

/* In this section you declare where to find the dependencies of your project*/
repositories {
    jcenter()
    mavenCentral()
}

dependencies {
    
  compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.6.1'
  compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.6.1'
  compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.6.1'
     
}

4.2 Mapper1 Code: UserFileMapper.java

package com.javadeveloperzone.hadoop.reducesidejoin;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class UserFileMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

  private static final String fileTag = "UD~";

  private static final String DATA_SEPARATOR = ",";

  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    String values[] = value.toString().split(DATA_SEPARATOR);

    StringBuilder dataStringBuilder = new StringBuilder();

    for (int index = 0; index < values.length; index++) {

      if (index != 0) {

        dataStringBuilder.append(values[index].toString().trim() + DATA_SEPARATOR);

      } else {

        dataStringBuilder.append(fileTag);

      }

    }

    String dataString = dataStringBuilder.toString();

    if (dataString != null && dataString.length() > 1) {

      dataString = dataString.substring(0, dataString.length() - 1);

    }

    dataStringBuilder = null;

    context.write(new LongWritable(Long.parseLong(values[0])), new Text(dataString));

  }

}

4.3 Mapper2 Code: AddressFileMapper.java

package com.javadeveloperzone.hadoop.reducesidejoin;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AddressFileMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

  private static final String fileTag = "AD~";

  private static final String DATA_SEPARATOR = ",";

  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    String values[] = value.toString().split(DATA_SEPARATOR);

    StringBuilder dataStringBuilder = new StringBuilder();

    for (int index = 0; index < values.length; index++) {

      if (index != 0) {

        dataStringBuilder.append(values[index].toString().trim() + DATA_SEPARATOR);

      } else {

        dataStringBuilder.append(fileTag);

      }

    }

    String dataString = dataStringBuilder.toString();

    if (dataString != null && dataString.length() > 1) {

      dataString = dataString.substring(0, dataString.length() - 1);

    }

    dataStringBuilder = null;

    context.write(new LongWritable(Long.parseLong(values[0])), new Text(dataString));

  }

}

4.4 Reducer Code: UserDataReducer.java

package com.javadeveloperzone.hadoop.reducesidejoin;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class UserDataReducer extends Reducer<LongWritable, Text, LongWritable, Text> {

  public static final String TAG_SEPARATOR = "~";

  private static final String DATA_SEPARATOR = ",";

  @Override
  protected void reduce(LongWritable key, Iterable<Text> values,
      Reducer<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {

    String value;

    String[] spllitedValues;

    String tag;

    String data = null, userDetails = null, addressDetails = null;

    for (Text txtValue : values) {
      
      value = txtValue.toString();

      spllitedValues = value.split(TAG_SEPARATOR);

      tag = spllitedValues[0];

      if (tag.equalsIgnoreCase("UD")) {

        userDetails = spllitedValues[1];

      } else if (tag.equalsIgnoreCase("AD")) {

        addressDetails = spllitedValues[1];

      }

    }

    if (userDetails != null && addressDetails != null) {

      data = userDetails + DATA_SEPARATOR + addressDetails;

    } else if (userDetails == null) {

      data = addressDetails;

    } else if (addressDetails == null) {

      data = userDetails;

    }

    context.write(key, new Text(data));
  }

}

4.5 Driver Code: ReduceSideJoinDriver.java

package com.javadeveloperzone.hadoop.reducesidejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReduceSideJoinDriver extends Configured implements Tool {

  private static final String DATA_SEPARATOR = ",";

  public int run(String[] args) throws Exception {

    Configuration configuration = new Configuration();

    configuration.set("mapreduce.output.textoutputformat.separator", DATA_SEPARATOR);

    Job job = Job.getInstance(configuration);

    job.setJobName("Reduce Side Join Mapreduce example using Java");

    job.setJarByClass(ReduceSideJoinDriver.class);

    // Map

    job.setMapOutputKeyClass(LongWritable.class);

    job.setMapOutputValueClass(Text.class);

    // Job

    job.setOutputKeyClass(LongWritable.class);

    job.setOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);

    job.setOutputFormatClass(TextOutputFormat.class);

    job.setReducerClass(UserDataReducer.class);

    MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserFileMapper.class);

    MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AddressFileMapper.class);

    FileOutputFormat.setOutputPath(job, new Path(args[2]));

    job.waitForCompletion(true);

    return 0;

  }

  public static void main(String[] args) throws Exception {

    if (args.length == 3) {

      int result = ToolRunner.run(new Configuration(), new ReduceSideJoinDriver(), args);

      if (0 == result) {
        System.out.println("Reduce Side Join Mapreduce example using Java Job Completed Successfully...");

      } else {
        System.out.println("Reduce Side Join Mapreduce example using Java Job Failed...");

      }
    } else {
      System.out.println("USAGE <InputPath1><InputPath2><OutputPath>");
    }

  }

}

 

4.6 Copy files from local file system to HDFS

Make sure your Hadoop cluster is up and running.
I have used following commands to copy files from local file system HDFS.

hdfs dfs -copyFromLocal 4-UserDetails.csv /input/javadeveloperzone/reducesidejoin/
hdfs dfs -copyFromLocal 4-AddressDetails.csv /input/javadeveloperzone/reducesidejoin/

5. Build & Run Application

Now build the Jar file which we are going to submit to Hadoop cluster.
Once the Jar file building is completed, we can use following command to run job on Hadoop cluster.

hadoop jar HadoopJoins.jar com.javadeveloperzone.hadoop.reducesidejoin.ReduceSideJoinDriver /input/javadeveloperzone/reducesidejoin/4-UserDetails.csv /input/javadeveloperzone/reducesidejoin/4-AddressDetails.csv /output/javadeveloperzone/hadoop/reducesidejoin

6. Output

Once the job is completed successfully, you will get the output which looks like following output,

1,Shyama,Patni,Kolkata,W.Bengal,India
2,Paul,Kam,Atlanta,Georgia,USA
3,Jayesh,Patel,Rajkot,Gujarat,India
4,Sid,Dave,Mumbai,Maharashtra,India
5,Prakash,Aarya
6,Jastin,Cohelo,San Francisco,California,USA

7. Source Code

You can download the source code of Reduce Side Join Mapreduce example using Java at git repository, which can be boilerplate code for writing complex Hadoop MapReduce programs using Java.

Was this post helpful?
Let us know, if you liked the post. Only in this way, we can improve us.
Yes
No
Tags: ,

Leave a Reply

Your email address will not be published. Required fields are marked *