

Table of Contents
1. Overview
Sometimes we require that our Hadoop job write data to multiple output locations. Hadoop provides facility to write the output of a job at a different location based on our needs using MultipleOutputs class. In this article, we will discuss Hadoop MultipleOutputs with an example.
2. Development Environment
Hadoop: 3.1.1
Java: Oracle JDK 1.8
IDE: IntelliJ Idea 2018.3
3. MultipleOutputs
MultipleOutputs class provide facility to write Hadoop map/reducer output to more than one folders. Basically, we can use MultipleOutputs when we want to write outputs other than map reduce job default output and write map reduce job output to different files provided by a user.
MultipleOutputs class has static method addNamedOutput
which used to add named output to the given job. Here is the syntax and followed by an example:
public static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Class<?> keyClass, Class<?> valueClass)
MultipleOutputs.addNamedOutput(job,"AHMEDABAD", TextOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"DELHI", TextOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"MUMBAI", TextOutputFormat.class,Text.class,Text.class);
write method of multipleOutputs class is used to write output to different files. There are multiple forms of this method. In this article, we will discuss write method with named output and baseOutputPath form. Here is the syntax and followed by an example:
public <K, V> void write(String namedOutput, K key, V value, String baseOutputPath)
multipleOutputs.write("AHMEDABAD",key,value, "AHMEDABAD/AHMEDABAD"); multipleOutputs.write("DELHI",key,value, "DELHI/DELHI");
4. Sample Input
1 John Cena,Sr.Software Engineer,TCS,Ahmedabad 2 Peter,Sr.Software Engineer,Infosys,Delhi 3 S. Mathur,Software Engineer,capgemini,Mumbai 4 Ranvir D.,Data scientiest,TCS,Ahmedabad 5 Kane Will,Technical Lead,TCS,Ahmedabad
5. Example
In this example, we will write city wise employee records in different HDFS folders using Hadoop MultipleOutputs feature.
5.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>MultipleOutputFormatExample</groupId> <artifactId>MultipleOutputFormatExample</artifactId> <version>1.0-SNAPSHOT</version> <description>Hadoop MultipleOutputs Example</description> <build> <finalName>HadoopMultipleOutputs</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <useSystemClassLoader>false</useSystemClassLoader> </configuration> </plugin> </plugins> </build> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.1</version> </dependency> </dependencies> </project>
5.2 MultipleOutputsDriver
package com.javadeveloperzone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MultipleOutputsDriver extends Configured implements Tool { public static final String OTHER = "OTHER"; public static final String MUMBAI = "MUMBAI"; public static final String DELHI = "DELHI"; public static final String AHMEDABAD = "AHMEDABAD"; public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new MultipleOutputsDriver(), args); System.exit(exitCode); } public int run(String[] args) throws Exception { if (args.length != 2) { System.out.println("Please provid two arguments :"); System.out.println("[ 1 ] Input dir path"); System.out.println("[ 2 ] Output dir path"); return -1; } Configuration c=new Configuration(); String[] files=new GenericOptionsParser(c,args).getRemainingArgs(); Path input=new Path(files[0]); Path output=new Path(files[1]); Job job=Job.getInstance(); job.setJarByClass(MultipleOutputsDriver.class); job.setMapperClass(MultipleOutputMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); MultipleOutputs.addNamedOutput(job,"AHMEDABAD", TextOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"DELHI", TextOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"MUMBAI", TextOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"OTHER", TextOutputFormat.class,Text.class,Text.class); boolean success = job.waitForCompletion(true); return (success?0:1); } }
5.3 MultipleOutputsMapper
package com.javadeveloperzone; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import java.io.IOException; import static com.javadeveloperzone.MultipleOutputsDriver.*; public class MultipleOutputsMapper extends Mapper<Text,Text,Text,Text> { MultipleOutputs multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs(context); } @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String empData[] = value.toString().split(","); if(empData[3].equalsIgnoreCase(AHMEDABAD)){ multipleOutputs.write(AHMEDABAD,key,value, "AHMEDABAD/AHMEDABAD"); }else if (empData[3].equalsIgnoreCase(DELHI)){ multipleOutputs.write(DELHI,key,value,"DELHI/DELHI"); }else if(empData[3].equalsIgnoreCase(MUMBAI)){ multipleOutputs.write(MUMBAI,key,value,"MUMBAI/MUMBAI"); }else { multipleOutputs.write(OTHER,key,value,"OTHER/OTHER"); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } }
6. Steps To Run
6.1 Build & Prepare Jar
clean & install maven goals. If everything is going well, MultipleOutputsFormat.jar
file will be created under target folder.
6.2 Copy Data to HDFS
here is the commands to create a new directory and copy a local file into HDFS
for further processing.
hadoop fs -mkdir /user/data/input hadoop fs -put MultipleOutputsInputFile.txt /user/data/input
6.3 Run
Now it’s time to run our Multiple Outputs Example.
Go to your MultipleOutpus.jar location and run below commands.
hadoop jar HadoopMultipleOutputs.jar com.javadeveloperzone.MultipleOutputsDriver /user/data/input /user/data/output
7. Output
Once we run above command, Map Reduce job started and output will be written in HDFS in city wise separate folder.
Once Map Reduce Job completed check HDFS /user/data/output folder. You will see three different folders as below.
8. References
9. Source Code
You can also check our Git repository for Hadoop Multiple Outputs Example and other useful examples.