1. Overview

Hadoop provides various Output classes like DBOutputFormat, SequenceFileOutputFormat, TextOutputFormat, etc..It also provides flexibility to its developers to use APIs and extend some of the functionality to achieve organization goals. MapReduce default Output Format is TextOutputFormat, which writes (key, value) pairs on individual lines of text files. By Default, in TextOutputFormat Each key-value pair is separated by a tab character, which can be changed using mapReduce.output.textoutputformat.separator property.

In this example, we will discuss how to create custom output classes and record writers and use that classes as a map reduce program.

2. Development Environment

Hadoop: 3.1.1

Java: Oracle JDK 1.8

IDE: IntelliJ Idea 2018.3

3. Steps for creating custom Output Format

Let’s discuss all the steps to create our own custom output format and record writer class.

3.1 extends OutputFormat class

The first step in creating a custom output format is to extend any inbuild output format classes. In our example, we are going to extend the FileOutputFormat class.

3.2 implements getRecordWriter method

Implement getRecordWriter method and write logic as per need. This method mainly contains the logic of building RecordWriter class as per our need.

3.3 Create Custom RecordWriter class

RecordWriter responsible for writing output key-value pairs from either from the mapper or the Reducer phase to output files. The final step is to create custom RecordWriter class by extending RecordWriter class.

4. Example

4.1 Problem Statement & Solution

As we know default record separator for TextOutputFormat is NEW_LINE. Consider a scenario where our mapper/reducer generate an output value which contains some new lines or our we want to configure different record separator other than NEW_LINE characters. In this example, for a better understanding purpose, we have use word count program with custom output format class. We will set custom record separator using custom configuration property mapreduce.output.textoutputformat.recordseparator

4.2 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HadoopCustomOutputFormatExample</groupId>
    <artifactId>HadoopCustomOutputFormatExample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <description>Hadoop Custom Output Format Example</description>
    <build>
        <finalName>HadoopCustomOutputFormatExample</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <useSystemClassLoader>false</useSystemClassLoader>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>7</source>
                    <target>7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.1</version>
        </dependency>

        <!--
        hadoop-mapreduce-client-jobclient dependency for local debug
        -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.1.1</version>
        </dependency>


    </dependencies>

</project>

4.3 WordCountOutputFormat

package com.javadeveloperzone;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.DataOutputStream;
import java.io.IOException;


public class WordCountOutputFormat<K,V> extends FileOutputFormat<K, V> {
    public static String FIELD_SEPARATOR = "mapreduce.output.textoutputformat.separator";
    public static String RECORD_SEPARATOR = "mapreduce.output.textoutputformat.recordseparator";

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        boolean isCompressed = getCompressOutput(job);
        String fieldSeprator = conf.get(FIELD_SEPARATOR, "\t");
        //custom record separator, \n used as a default
        String recordSeprator = conf.get(RECORD_SEPARATOR, "\n");

        //compress output logic
        CompressionCodec codec = null;
        String extension = "";
        if (isCompressed) {
            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
            codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
            extension = codec.getDefaultExtension();
        }

        Path file = this.getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        FSDataOutputStream fileOut = fs.create(file, false);
        if(isCompressed){
            return new WordCountLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut)), fieldSeprator,recordSeprator);
        }else{
            return new WordCountLineRecordWriter<>(fileOut, fieldSeprator,recordSeprator);
        }
    }


}

4.4 WordCountLineRecordWriter

package com.javadeveloperzone;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class WordCountLineRecordWriter<K, V> extends RecordWriter<K, V> {
    protected DataOutputStream out;
    private final byte[] recordSeprator;
    private final byte[] fieldSeprator;

    public WordCountLineRecordWriter(DataOutputStream out, String fieldSeprator,String recordSeprator) {
        this.out = out;
        this.fieldSeprator = fieldSeprator.getBytes(StandardCharsets.UTF_8);
        this.recordSeprator = recordSeprator.getBytes(StandardCharsets.UTF_8);
    }

    public WordCountLineRecordWriter(DataOutputStream out) {
        this(out, "\t","\n");
    }

    private void writeObject(Object o) throws IOException {
        if (o instanceof Text) {
            Text to = (Text)o;
            this.out.write(to.getBytes(), 0, to.getLength());
        } else {
            this.out.write(o.toString().getBytes(StandardCharsets.UTF_8));
        }

    }

    public synchronized void write(K key, V value) throws IOException {
        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (!nullKey || !nullValue) {
            if (!nullKey) {
                this.writeObject(key);
            }

            if (!nullKey && !nullValue) {
                this.out.write(this.fieldSeprator);
            }

            if (!nullValue) {
                this.writeObject(value);
            }

            this.out.write(recordSeprator);//write custom record separator instead of NEW_LINE
        }
    }

    public synchronized void close(TaskAttemptContext context) throws IOException {
        this.out.close();
    }
}

4.5 WordCountMapper

package com.javadeveloperzone;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        /*
         * 1. convert to lower case
         * 2. replace all punctuation mark characters with SPACE
         * 3. Tokenize input line
         * 4. write it to HDFS
         *  */
        String line = value.toString().toLowerCase().replaceAll("\\p{Punct}"," ");
        StringTokenizer st = new StringTokenizer(line," ");
        //For each token, write a key value pair with
        //word and 1 as value to context
        while(st.hasMoreTokens()){
            word.set(st.nextToken());
            context.write(word,one);
        }
    }
}

4.6 WordCountReducer

package com.javadeveloperzone;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        //For each key value pair, get the value and adds to the sum
        //to get the total occurrences of a word
        for(IntWritable value : values){
            sum = sum + value.get();
        }

        //Writes the word and total occurrences as key-value pair to the context
        context.write(key, new IntWritable(sum));
    }
}

4.7 CustomOutputFormatDriver

package com.javadeveloperzone;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CustomOutputFormatDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {

        int exitCode = ToolRunner.run(new Configuration(),
                new CustomOutputFormatDriver(), args);
        System.exit(exitCode);
    }

    public int run(String[] args) throws Exception {

        if (args.length != 2) {
            System.out.println("Please provide two arguments :");
            System.out.println("[ 1 ] Input dir path");
            System.out.println("[ 2 ] Output dir path");
            return -1;
        }

        Configuration c=new Configuration();
        String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
        Path input=new Path(files[0]);
        Path output=new Path(files[1]);
        Configuration conf=new Configuration();

        /*
        * UnComment below three lines to enable local debugging of map reduce job
         * */
        /*conf.set("fs.defaultFS", "local");
        conf.set("mapreduce.job.maps","1");
        conf.set("mapreduce.job.reduces","1");
        */

        Job job=Job.getInstance(conf,"Hadoop Custom Output Format Example");
        job.setJarByClass(CustomOutputFormatDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(WordCountOutputFormat.class);
        job.setNumReduceTasks(1);
        //Custom record separator, set in job configuration
        job.getConfiguration().set("mapreduce.output.textoutputformat.recordseparator","<==>");
        //set field separator other than default \t character 
        job.getConfiguration().set("mapreduce.output.textoutputformat.separator",";");
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        job.setSpeculativeExecution(false);
        boolean success = job.waitForCompletion(true);
        return (success?0:1);
    }
}

5. Build & Run

Hadoop WordCount with Custom Output Format is ready. Compile and create jar files, copy sample input file to HDFS and run below command.

hadoop jar HadoopCustomOutputFormatExample.jar com.javadeveloperzone.CustomOutputFormatDriver /user/data/barakobamawebsite.txt /user/data/wordcountoutput

6. Output

6.1 HDFS Output

a;12<==>about;2<==>african;2<==>after;2<==>america;2<==>american;2<==>and;8<==>around;2<==>at;2<==>background;2<==>became;2<==>begin;2<==>belief;2<==>born;2<==>bring;2<==>by;2<==>career;2<==>chance;2<==>change;2<==>chicago;4<==>child;2<==>churches;2<==>citizenship;2<==>closure;2<==>college;2<==>communities;2<==>compelled;2<==>constitutional;2<==>devastated;2<==>devote;2<==>every;2<==>experience;2<==>father;2<==>few;2<==>first;2<==>from;6<==>gave;2<==>generosity;2<==>giving;2<==>grandparents;2<==>group;2<==>hard;2<==>harvard;2<==>have;2<==>hawaii;2<==>he;6<==>help;6<==>her;2<==>him;4<==>his;12<==>homespun;2<==>honed;2<==>house;2<==>illinois;4<==>improbable;2<==>in;14<==>innate;2<==>instilled;2<==>kansas;2<==>kenya;2<==>law;6<==>life;

6.2 Local Debug Output

Custom Output Format Output

7. Conclusion

Here, In this conclusion of Hadoop Create custom output format and record writer article, we can say that many use cases where default output and record writers classes are not best suited for some requirements. We can create our own custom output classes by extending FileOutputFormatClass class and override its method to achieve our goal.

8. References

9. Source Code

Hadoop Custom Output Format Example

You can also check our Git repository for Hadoop Custom Output Format Example and other useful examples.

Was this post helpful?
Let us know, if you liked the post. Only in this way, we can improve us.
Yes
No

Leave a Reply

Your email address will not be published. Required fields are marked *