

Table of Contents
1. Overview
Sometimes we require that our Hadoop job write data to multiple output locations with a different compression method. Hadoop provides facility to write the output of a job at a different location based on our needs using MultipleOutputs class and we can create Custom Output Formats and configured it with MultipleOutputs to support multiple compression formats in a single job. In this article, we will discuss Hadoop MultipleOutputs with a different compression format with an example.
2. Development Environment
Hadoop: 3.1.1
Java: Oracle JDK 1.8
IDE: IntelliJ Idea 2018.3
3. Hadoop Compression Format
Hadoop framework supports many compression formats for both input and output data. A compression format or a codec(coder-decoder) is a set of compiled, ready to use Java libraries that we can invoke programmatically to perform data compression and decompression in MapReduce job.Each of these codec implements an algorithm for compression and decompression and also has different characteristics.
Here is the list of supported compression format
- Gzip
- Snappy
- lzo
- bzip2
4. Steps to Create MultipleOutputs with Multiple Compress Format
MultipleOutputs class provide facility to write Hadoop map/reducer output to more than one folders. Basically, we can use MultipleOutputs when we want to write outputs other than map reduce job default output and write map reduce job output to different files provided by a user.
4.1 Create Custom Output Writer
The first step is to create custom output writer class by extending OutputFormat
class and override getRecordWriter
method.
4.2 Create Custom Record Writer
Next step is to create custom record writer class by extending classRecordWriter
.
4.3 Set CustomOutput Writer as namedoutput
The final step is to configured custom output format class as MultipleOutputs
named output. Refer below code snippet for more details.
MultipleOutputs.addNamedOutput(job,"AHMEDABAD", LzoOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"DELHI", DeflateOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"MUMBAI", BZipOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"OTHER", LzoOutputFormat.class,Text.class,Text.class);
5. Sample Input
1 John Cena,Sr.Software Engineer,TCS,Ahmedabad 2 Peter,Sr.Software Engineer,Infosys,Delhi 3 S. Mathur,Software Engineer,capgemini,Mumbai 4 Ranvir D.,Data scientiest,TCS,Ahmedabad 5 Kane Will,Technical Lead,TCS,Ahmedabad
6. Example
6.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>MultipleOutputsMultipleCompressionExample</groupId> <artifactId>MultipleOutputsMultipleCompressionExample</artifactId> <version>1.0-SNAPSHOT</version> <description>MultipleOutputs with Multiple Compression Method Example</description> <build> <finalName>MultipleOutputsMultipleCompressionExample</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <useSystemClassLoader>false</useSystemClassLoader> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.1</version> </dependency> <!-- hadoop-mapreduce-client-jobclient dependency for local debug --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>3.1.1</version> </dependency> </dependencies> </project>
6.2 BZipOutputFormat
package com.javadeveloperzone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import java.io.DataOutputStream; import java.io.IOException; public class BZipOutputFormat<K,V> extends FileOutputFormat<K, V> { @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); //compress output logic Class<? extends CompressionCodec> codecClass = BZip2Codec.class; CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); String extension = codec.getDefaultExtension(); Path file = this.getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); return new MultipleCompressionLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut))); } }
6.3 DeflateOutputFormat
package com.javadeveloperzone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import java.io.DataOutputStream; import java.io.IOException; public class DeflateOutputFormat<K,V> extends FileOutputFormat<K, V> { @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); //compress output logic Class<? extends CompressionCodec> codecClass = DefaultCodec.class; CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); String extension = codec.getDefaultExtension(); Path file = this.getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); return new MultipleCompressionLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut))); } }
6.4 LzoOutputFormat
package com.javadeveloperzone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import java.io.DataOutputStream; import java.io.IOException; public class LzoOutputFormat<K,V> extends FileOutputFormat<K, V> { @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); //compress output logic Class<? extends CompressionCodec> codecClass = Lz4Codec.class; CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); String extension = codec.getDefaultExtension(); Path file = this.getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); return new MultipleCompressionLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut))); } }
6.5 MultipleCompressionLineRecordWriter
package com.javadeveloperzone; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; public class MultipleCompressionLineRecordWriter<K, V> extends RecordWriter<K, V> { protected DataOutputStream out; private final byte[] recordSeprator; private final byte[] fieldSeprator; public MultipleCompressionLineRecordWriter(DataOutputStream out, String fieldSeprator, String recordSeprator) { this.out = out; this.fieldSeprator = fieldSeprator.getBytes(StandardCharsets.UTF_8); this.recordSeprator = recordSeprator.getBytes(StandardCharsets.UTF_8); } public MultipleCompressionLineRecordWriter(DataOutputStream out) { this(out, "\t","\n"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text)o; this.out.write(to.getBytes(), 0, to.getLength()); } else { this.out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (!nullKey || !nullValue) { if (!nullKey) { this.writeObject(key); } if (!nullKey && !nullValue) { this.out.write(this.fieldSeprator); } if (!nullValue) { this.writeObject(value); } this.out.write(recordSeprator);//write custom record separator instead of NEW_LINE } } public synchronized void close(TaskAttemptContext context) throws IOException { this.out.close(); } }
6.6 MultipleOutputsMapper
package com.javadeveloperzone; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import java.io.IOException; import static com.javadeveloperzone.MultipleOutputsMultipleCompressionDriver.*; public class MultipleOutputsMapper extends Mapper<Text,Text,Text,Text> { MultipleOutputs multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs(context); } @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String empData[] = value.toString().split(","); if(empData[3].equalsIgnoreCase(AHMEDABAD)){ multipleOutputs.write(AHMEDABAD,key,value, "AHMEDABAD/AHMEDABAD"); }else if (empData[3].equalsIgnoreCase(DELHI)){ multipleOutputs.write(DELHI,key,value,"DELHI/DELHI"); }else if(empData[3].equalsIgnoreCase(MUMBAI)){ multipleOutputs.write(MUMBAI,key,value,"MUMBAI/MUMBAI"); }else { multipleOutputs.write(OTHER,key,value,"OTHER/OTHER"); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } }
6.6 MultipleOutputsMultipleCompressionDriver
package com.javadeveloperzone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MultipleOutputsMultipleCompressionDriver extends Configured implements Tool { public static final String OTHER = "OTHER"; public static final String MUMBAI = "MUMBAI"; public static final String DELHI = "DELHI"; public static final String AHMEDABAD = "AHMEDABAD"; public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new MultipleOutputsMultipleCompressionDriver(), args); System.exit(exitCode); } public int run(String[] args) throws Exception { if (args.length != 2) { System.out.println("Please provide two arguments :"); System.out.println("[ 1 ] Input dir path"); System.out.println("[ 2 ] Output dir path"); return -1; } Configuration c = new Configuration(); String[] files = new GenericOptionsParser(c,args).getRemainingArgs(); Path input= new Path(files[0]); Path output= new Path(files[1]); Job job=Job.getInstance(); job.setJarByClass(MultipleOutputsMultipleCompressionDriver.class); job.setMapperClass(MultipleOutputsMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); MultipleOutputs.addNamedOutput(job,"AHMEDABAD", LzoOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"DELHI", DeflateOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"MUMBAI", BZipOutputFormat.class,Text.class,Text.class); MultipleOutputs.addNamedOutput(job,"OTHER", LzoOutputFormat.class,Text.class,Text.class); boolean success = job.waitForCompletion(true); return (success?0:1); } }
7. Steps To Run
7.1 Build & Prepare Jar
clean & install maven goals. If everything is going well, MultipleOutputsMultipleCompressionExample.jar
file will be created under a target folder.
7.2 Copy Data to HDFS
here are the commands to create a new directory and copy a local file into HDFS
for further processing.
hadoop fs -mkdir /user/data/multiplecompressioninput hadoop fs -put MultipleCompressionInputFile.txt /user/data/multiplecompressioninput
7.3 Run
Now it’s time to run our Multiple Outputs with different compression method Example.
Go to your MultipleOutpusMultipleCompressionExample.jar
location and run below commands.
hadoop jar MultipleOutputsMultipleCompressionExample.jar com.javadeveloperzone.MultipleOutputsMultipleCompressionDriver /user/data/multiplecompressioninput /user/data/output
8. Output
Once we run above command, Map Reduce job started and output will be written in HDFS in city wise separate folder with LZO, Deflate and BZip Compression.
8.1 HDFS Output
8.1.1 LZO Compression
8.1.2 Deflate (Default) Compression
8.1.3 BZip COmpression
8.2 Local Debug Output
9. Conclusion
In this article, we have discussed Hadoop compression formats with an example. Here we have discussed how to create a custom Hadoop output format, record writer classes and set different compression format in a single map reduce job with an example.
10. References
11. Source Code
Multiple Outputs Multiple Compression Example
You can also check our Git repository for Hadoop MultipleOutputs With Different Compression Format and other useful examples.