1. Overview

Sometimes we require that our Hadoop job write data to multiple output locations. Hadoop provides facility to write the output of a job at a different location based on our needs using MultipleOutputs class. In this article, we will discuss Hadoop MultipleOutputs with an example.

2. Development Environment

Hadoop: 3.1.1

Java: Oracle JDK 1.8

IDE: IntelliJ Idea 2018.3

3. MultipleOutputs

MultipleOutputs class provide facility to write Hadoop map/reducer output to more than one folders. Basically, we can use MultipleOutputs when we want to write outputs other than map reduce job default output and write map reduce job output to different files provided by a user.

MultipleOutputs class has static method addNamedOutputwhich used to add named output to the given job. Here is the syntax and followed by an example:

public static void addNamedOutput(Job job,
                  String namedOutput,
                  Class<? extends OutputFormat> outputFormatClass,
                  Class<?> keyClass,
                  Class<?> valueClass)
MultipleOutputs.addNamedOutput(job,"AHMEDABAD", TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"DELHI", TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"MUMBAI", TextOutputFormat.class,Text.class,Text.class);

write method of multipleOutputs class is used to write output to different files. There are multiple forms of this method. In this article, we will discuss write method with named output and baseOutputPath form. Here is the syntax and followed by an example:

public <K, V> void write(String namedOutput, K key, V value, String baseOutputPath)
multipleOutputs.write("AHMEDABAD",key,value, "AHMEDABAD/AHMEDABAD");
multipleOutputs.write("DELHI",key,value, "DELHI/DELHI");

4. Sample Input 

1	John Cena,Sr.Software Engineer,TCS,Ahmedabad
2	Peter,Sr.Software Engineer,Infosys,Delhi
3	S. Mathur,Software Engineer,capgemini,Mumbai
4	Ranvir D.,Data scientiest,TCS,Ahmedabad
5	Kane Will,Technical Lead,TCS,Ahmedabad

5. Example

In this example, we will write city wise employee records in different HDFS folders using Hadoop MultipleOutputs feature.

5.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>MultipleOutputFormatExample</groupId>
    <artifactId>MultipleOutputFormatExample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <description>Hadoop MultipleOutputs Example</description>
    <build>
        <finalName>HadoopMultipleOutputs</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <useSystemClassLoader>false</useSystemClassLoader>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.1</version>
        </dependency>
    </dependencies>
</project>

5.2 MultipleOutputsDriver

package com.javadeveloperzone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipleOutputsDriver extends Configured implements Tool {

    public static final String OTHER = "OTHER";
    public static final String MUMBAI = "MUMBAI";
    public static final String DELHI = "DELHI";
    public static final String AHMEDABAD = "AHMEDABAD";
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(),
                new MultipleOutputsDriver(), args);
        System.exit(exitCode);
    }
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.println("Please provid two arguments :");
            System.out.println("[ 1 ] Input dir path");
            System.out.println("[ 2 ] Output dir path");
            return -1;
        }
        Configuration c=new Configuration();
        String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
        Path input=new Path(files[0]);
        Path output=new Path(files[1]);
        Job job=Job.getInstance();
        job.setJarByClass(MultipleOutputsDriver.class);
        job.setMapperClass(MultipleOutputMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        MultipleOutputs.addNamedOutput(job,"AHMEDABAD", TextOutputFormat.class,Text.class,Text.class);
        MultipleOutputs.addNamedOutput(job,"DELHI", TextOutputFormat.class,Text.class,Text.class);
        MultipleOutputs.addNamedOutput(job,"MUMBAI", TextOutputFormat.class,Text.class,Text.class);
        MultipleOutputs.addNamedOutput(job,"OTHER", TextOutputFormat.class,Text.class,Text.class);
        boolean success = job.waitForCompletion(true);
        return (success?0:1);
    }
}

 

5.3 MultipleOutputsMapper

package com.javadeveloperzone;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
import static com.javadeveloperzone.MultipleOutputsDriver.*;
public class MultipleOutputsMapper extends Mapper<Text,Text,Text,Text> {
    MultipleOutputs multipleOutputs;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs(context);
    }
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        String empData[] = value.toString().split(",");
        if(empData[3].equalsIgnoreCase(AHMEDABAD)){
            multipleOutputs.write(AHMEDABAD,key,value, "AHMEDABAD/AHMEDABAD");
        }else if (empData[3].equalsIgnoreCase(DELHI)){
            multipleOutputs.write(DELHI,key,value,"DELHI/DELHI");
        }else if(empData[3].equalsIgnoreCase(MUMBAI)){
            multipleOutputs.write(MUMBAI,key,value,"MUMBAI/MUMBAI");
        }else {
            multipleOutputs.write(OTHER,key,value,"OTHER/OTHER");
        }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
}

 

6. Steps To Run

6.1 Build & Prepare Jar

clean & install maven goals. If everything is going well, MultipleOutputsFormat.jar file will be created under target folder.

6.2 Copy Data to HDFS

here is the commands to create a new directory and copy a local file into HDFS for further processing.

hadoop fs -mkdir /user/data/input
hadoop fs -put MultipleOutputsInputFile.txt /user/data/input

6.3 Run

Now it’s time to run our Multiple Outputs Example. 

Go to your MultipleOutpus.jar location and run below commands.

hadoop jar HadoopMultipleOutputs.jar com.javadeveloperzone.MultipleOutputsDriver /user/data/input /user/data/output

7. Output

Once we run above command, Map Reduce job started and output will be written in HDFS in city wise separate folder.

Multiple Outputs Map Reduce

Once Map Reduce Job completed check HDFS /user/data/output folder. You will see three different folders as below.

Multiple Outputs HDFS

8. References

9. Source Code

Multiple Outputs Example

You can also check our Git repository for Hadoop Multiple Outputs Example and other useful examples.

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *