

Table of Contents
1. Overview
Hadoop provides various Output classes like DBOutputFormat
, SequenceFileOutputFormat
, TextOutputFormat
, etc..It also provides flexibility to its developers to use APIs and extend some of the functionality to achieve organization goals. MapReduce default Output Format is TextOutputFormat, which writes (key, value) pairs on individual lines of text files. By Default, in TextOutputFormat Each key-value pair is separated by a tab character
, which can be changed using mapReduce.output.textoutputformat.separator
property.
In this example, we will discuss how to create custom output classes and record writers and use that classes as a map reduce program.
2. Development Environment
Hadoop: 3.1.1
Java: Oracle JDK 1.8
IDE: IntelliJ Idea 2018.3
3. Steps for creating custom Output Format
Let’s discuss all the steps to create our own custom output format and record writer class.
3.1 extends OutputFormat class
The first step in creating a custom output format is to extend any inbuild output format classes. In our example, we are going to extend the FileOutputFormat class.
3.2 implements getRecordWriter method
Implement getRecordWriter method and write logic as per need. This method mainly contains the logic of building RecordWriter class as per our need.
3.3 Create Custom RecordWriter class
RecordWriter responsible for writing output key-value pairs from either from the mapper or the Reducer phase to output files. The final step is to create custom RecordWriter class by extending RecordWriter class.
4. Example
4.1 Problem Statement & Solution
As we know default record separator for TextOutputFormat is NEW_LINE. Consider a scenario where our mapper/reducer generate an output value which contains some new lines or our we want to configure different record separator other than NEW_LINE characters. In this example, for a better understanding purpose, we have use word count program with custom output format class. We will set custom record separator using custom configuration property mapreduce.output.textoutputformat.recordseparator
4.2 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>HadoopCustomOutputFormatExample</groupId> <artifactId>HadoopCustomOutputFormatExample</artifactId> <version>1.0-SNAPSHOT</version> <description>Hadoop Custom Output Format Example</description> <build> <finalName>HadoopCustomOutputFormatExample</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <useSystemClassLoader>false</useSystemClassLoader> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>7</source> <target>7</target> </configuration> </plugin> </plugins> </build> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.1</version> </dependency> <!-- hadoop-mapreduce-client-jobclient dependency for local debug --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>3.1.1</version> </dependency> </dependencies> </project>
4.3 WordCountOutputFormat
package com.javadeveloperzone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import java.io.DataOutputStream; import java.io.IOException; public class WordCountOutputFormat<K,V> extends FileOutputFormat<K, V> { public static String FIELD_SEPARATOR = "mapreduce.output.textoutputformat.separator"; public static String RECORD_SEPARATOR = "mapreduce.output.textoutputformat.recordseparator"; @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String fieldSeprator = conf.get(FIELD_SEPARATOR, "\t"); //custom record separator, \n used as a default String recordSeprator = conf.get(RECORD_SEPARATOR, "\n"); //compress output logic CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = this.getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); if(isCompressed){ return new WordCountLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut)), fieldSeprator,recordSeprator); }else{ return new WordCountLineRecordWriter<>(fileOut, fieldSeprator,recordSeprator); } } }
4.4 WordCountLineRecordWriter
package com.javadeveloperzone; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; public class WordCountLineRecordWriter<K, V> extends RecordWriter<K, V> { protected DataOutputStream out; private final byte[] recordSeprator; private final byte[] fieldSeprator; public WordCountLineRecordWriter(DataOutputStream out, String fieldSeprator,String recordSeprator) { this.out = out; this.fieldSeprator = fieldSeprator.getBytes(StandardCharsets.UTF_8); this.recordSeprator = recordSeprator.getBytes(StandardCharsets.UTF_8); } public WordCountLineRecordWriter(DataOutputStream out) { this(out, "\t","\n"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text)o; this.out.write(to.getBytes(), 0, to.getLength()); } else { this.out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (!nullKey || !nullValue) { if (!nullKey) { this.writeObject(key); } if (!nullKey && !nullValue) { this.out.write(this.fieldSeprator); } if (!nullValue) { this.writeObject(value); } this.out.write(recordSeprator);//write custom record separator instead of NEW_LINE } } public synchronized void close(TaskAttemptContext context) throws IOException { this.out.close(); } }
4.5 WordCountMapper
package com.javadeveloperzone; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /* * 1. convert to lower case * 2. replace all punctuation mark characters with SPACE * 3. Tokenize input line * 4. write it to HDFS * */ String line = value.toString().toLowerCase().replaceAll("\\p{Punct}"," "); StringTokenizer st = new StringTokenizer(line," "); //For each token, write a key value pair with //word and 1 as value to context while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word,one); } } }
4.6 WordCountReducer
package com.javadeveloperzone; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //For each key value pair, get the value and adds to the sum //to get the total occurrences of a word for(IntWritable value : values){ sum = sum + value.get(); } //Writes the word and total occurrences as key-value pair to the context context.write(key, new IntWritable(sum)); } }
4.7 CustomOutputFormatDriver
package com.javadeveloperzone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class CustomOutputFormatDriver extends Configured implements Tool { public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new CustomOutputFormatDriver(), args); System.exit(exitCode); } public int run(String[] args) throws Exception { if (args.length != 2) { System.out.println("Please provide two arguments :"); System.out.println("[ 1 ] Input dir path"); System.out.println("[ 2 ] Output dir path"); return -1; } Configuration c=new Configuration(); String[] files=new GenericOptionsParser(c,args).getRemainingArgs(); Path input=new Path(files[0]); Path output=new Path(files[1]); Configuration conf=new Configuration(); /* * UnComment below three lines to enable local debugging of map reduce job * */ /*conf.set("fs.defaultFS", "local"); conf.set("mapreduce.job.maps","1"); conf.set("mapreduce.job.reduces","1"); */ Job job=Job.getInstance(conf,"Hadoop Custom Output Format Example"); job.setJarByClass(CustomOutputFormatDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(WordCountOutputFormat.class); job.setNumReduceTasks(1); //Custom record separator, set in job configuration job.getConfiguration().set("mapreduce.output.textoutputformat.recordseparator","<==>"); //set field separator other than default \t character job.getConfiguration().set("mapreduce.output.textoutputformat.separator",";"); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); job.setSpeculativeExecution(false); boolean success = job.waitForCompletion(true); return (success?0:1); } }
5. Build & Run
Hadoop WordCount with Custom Output Format is ready. Compile and create jar files, copy sample input file to HDFS and run below command.
hadoop jar HadoopCustomOutputFormatExample.jar com.javadeveloperzone.CustomOutputFormatDriver /user/data/barakobamawebsite.txt /user/data/wordcountoutput
6. Output
6.1 HDFS Output
a;12<==>about;2<==>african;2<==>after;2<==>america;2<==>american;2<==>and;8<==>around;2<==>at;2<==>background;2<==>became;2<==>begin;2<==>belief;2<==>born;2<==>bring;2<==>by;2<==>career;2<==>chance;2<==>change;2<==>chicago;4<==>child;2<==>churches;2<==>citizenship;2<==>closure;2<==>college;2<==>communities;2<==>compelled;2<==>constitutional;2<==>devastated;2<==>devote;2<==>every;2<==>experience;2<==>father;2<==>few;2<==>first;2<==>from;6<==>gave;2<==>generosity;2<==>giving;2<==>grandparents;2<==>group;2<==>hard;2<==>harvard;2<==>have;2<==>hawaii;2<==>he;6<==>help;6<==>her;2<==>him;4<==>his;12<==>homespun;2<==>honed;2<==>house;2<==>illinois;4<==>improbable;2<==>in;14<==>innate;2<==>instilled;2<==>kansas;2<==>kenya;2<==>law;6<==>life;
6.2 Local Debug Output
7. Conclusion
Here, In this conclusion of Hadoop Create custom output format and record writer article, we can say that many use cases where default output and record writers classes are not best suited for some requirements. We can create our own custom output classes by extending FileOutputFormatClass class and override its method to achieve our goal.
8. References
9. Source Code
Hadoop Custom Output Format Example
You can also check our Git repository for Hadoop Custom Output Format Example and other useful examples.