

Table of Contents
1. Overview
- In this article, We are going to explain Reduce Side Join MapReduce example using Java.
- Joining two or more data sets, is perhaps the most common problem in Bigdata world.
- Hadoop shines, when it comes to process petabytes scale data using Distributed processing frameworks.
- To experience true computing power of Hadoop, we should process huge amount of data using MapReduce,
but in our article, we are going to use small amount of data to explain the Reduce side Joins in Hadoop.
2. Development environment
Java : Oracle JDK 1.8
Hadoop : Apache Hadoop 2.6.1
IDE : Eclipse
Build Tool: Gradle 4.4.1
3. Sample Input
For this tutorial, we are using below text files(UTF-8 format) as input,
Input File 1 : 4-UserDetails.csv
This file contains the user related data, where data is present in following format,
<UserID><FirstName><LastName>
i.e
1,Shyama,Patni 2,Paul,Kam 3,Jayesh,Patel 4,Sid,Dave 5,Prakash,Aarya 6,Jastin,Cohelo
Input File 2 : 4-AddressDetails.csv
This file contains the User’s Address related data, where data is present in following format,
<UserID><City><State><Country>
i.e
1,Kolkata,W.Bengal,India 2,Atlanta,Georgia,USA 3,Rajkot,Gujarat,India 4,Mumbai,Maharashtra,India 6,San Francisco,California,USA
4. Solution
We are going to use following 4 Java files for this example,
UserFileMapper.java AddressFileMapper.java UserDataReducer.java ReduceSideJoinDriver.java

Project Architecture – Reduce Side Join Mapreduce example using Java
4.1 Build File : build.gradle
apply plugin: 'java' description """Hadoop Joins examples Using Java""" sourceCompatibility = 1.8 targetCompatibility = 1.8 /* In this section you declare where to find the dependencies of your project*/ repositories { jcenter() mavenCentral() } dependencies { compile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.6.1' compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.6.1' compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.6.1' }
4.2 Mapper1 Code: UserFileMapper.java
package com.javadeveloperzone.hadoop.reducesidejoin; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class UserFileMapper extends Mapper<LongWritable, Text, LongWritable, Text> { private static final String fileTag = "UD~"; private static final String DATA_SEPARATOR = ","; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String values[] = value.toString().split(DATA_SEPARATOR); StringBuilder dataStringBuilder = new StringBuilder(); for (int index = 0; index < values.length; index++) { if (index != 0) { dataStringBuilder.append(values[index].toString().trim() + DATA_SEPARATOR); } else { dataStringBuilder.append(fileTag); } } String dataString = dataStringBuilder.toString(); if (dataString != null && dataString.length() > 1) { dataString = dataString.substring(0, dataString.length() - 1); } dataStringBuilder = null; context.write(new LongWritable(Long.parseLong(values[0])), new Text(dataString)); } }
4.3 Mapper2 Code: AddressFileMapper.java
package com.javadeveloperzone.hadoop.reducesidejoin; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class AddressFileMapper extends Mapper<LongWritable, Text, LongWritable, Text> { private static final String fileTag = "AD~"; private static final String DATA_SEPARATOR = ","; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String values[] = value.toString().split(DATA_SEPARATOR); StringBuilder dataStringBuilder = new StringBuilder(); for (int index = 0; index < values.length; index++) { if (index != 0) { dataStringBuilder.append(values[index].toString().trim() + DATA_SEPARATOR); } else { dataStringBuilder.append(fileTag); } } String dataString = dataStringBuilder.toString(); if (dataString != null && dataString.length() > 1) { dataString = dataString.substring(0, dataString.length() - 1); } dataStringBuilder = null; context.write(new LongWritable(Long.parseLong(values[0])), new Text(dataString)); } }
4.4 Reducer Code: UserDataReducer.java
package com.javadeveloperzone.hadoop.reducesidejoin; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class UserDataReducer extends Reducer<LongWritable, Text, LongWritable, Text> { public static final String TAG_SEPARATOR = "~"; private static final String DATA_SEPARATOR = ","; @Override protected void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { String value; String[] spllitedValues; String tag; String data = null, userDetails = null, addressDetails = null; for (Text txtValue : values) { value = txtValue.toString(); spllitedValues = value.split(TAG_SEPARATOR); tag = spllitedValues[0]; if (tag.equalsIgnoreCase("UD")) { userDetails = spllitedValues[1]; } else if (tag.equalsIgnoreCase("AD")) { addressDetails = spllitedValues[1]; } } if (userDetails != null && addressDetails != null) { data = userDetails + DATA_SEPARATOR + addressDetails; } else if (userDetails == null) { data = addressDetails; } else if (addressDetails == null) { data = userDetails; } context.write(key, new Text(data)); } }
4.5 Driver Code: ReduceSideJoinDriver.java
package com.javadeveloperzone.hadoop.reducesidejoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReduceSideJoinDriver extends Configured implements Tool { private static final String DATA_SEPARATOR = ","; public int run(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.set("mapreduce.output.textoutputformat.separator", DATA_SEPARATOR); Job job = Job.getInstance(configuration); job.setJobName("Reduce Side Join Mapreduce example using Java"); job.setJarByClass(ReduceSideJoinDriver.class); // Map job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // Job job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setReducerClass(UserDataReducer.class); MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserFileMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AddressFileMapper.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { if (args.length == 3) { int result = ToolRunner.run(new Configuration(), new ReduceSideJoinDriver(), args); if (0 == result) { System.out.println("Reduce Side Join Mapreduce example using Java Job Completed Successfully..."); } else { System.out.println("Reduce Side Join Mapreduce example using Java Job Failed..."); } } else { System.out.println("USAGE <InputPath1><InputPath2><OutputPath>"); } } }
4.6 Copy files from local file system to HDFS
Make sure your Hadoop cluster is up and running.
I have used following commands to copy files from local file system HDFS.
hdfs dfs -copyFromLocal 4-UserDetails.csv /input/javadeveloperzone/reducesidejoin/ hdfs dfs -copyFromLocal 4-AddressDetails.csv /input/javadeveloperzone/reducesidejoin/
5. Build & Run Application
Now build the Jar file which we are going to submit to Hadoop cluster.
Once the Jar file building is completed, we can use following command to run job on Hadoop cluster.
hadoop jar HadoopJoins.jar com.javadeveloperzone.hadoop.reducesidejoin.ReduceSideJoinDriver /input/javadeveloperzone/reducesidejoin/4-UserDetails.csv /input/javadeveloperzone/reducesidejoin/4-AddressDetails.csv /output/javadeveloperzone/hadoop/reducesidejoin
6. Output
Once the job is completed successfully, you will get the output which looks like following output,
1,Shyama,Patni,Kolkata,W.Bengal,India 2,Paul,Kam,Atlanta,Georgia,USA 3,Jayesh,Patel,Rajkot,Gujarat,India 4,Sid,Dave,Mumbai,Maharashtra,India 5,Prakash,Aarya 6,Jastin,Cohelo,San Francisco,California,USA
7. Source Code
You can download the source code of Reduce Side Join Mapreduce example using Java at git repository, which can be boilerplate code for writing complex Hadoop MapReduce programs using Java.