1. Overview

Sometimes we require that our Hadoop job write data to multiple output locations with a different compression method. Hadoop provides facility to write the output of a job at a different location based on our needs using MultipleOutputs class and we can create Custom Output Formats and configured it with MultipleOutputs to support multiple compression formats in a single job. In this article, we will discuss Hadoop MultipleOutputs with a different compression format with an example.

2. Development Environment

Hadoop: 3.1.1

Java: Oracle JDK 1.8

IDE: IntelliJ Idea 2018.3

3. Hadoop Compression Format

Hadoop framework supports many compression formats for both input and output data. A compression format or a codec(coder-decoder) is a set of compiled, ready to use Java libraries that we can invoke programmatically to perform data compression and decompression in MapReduce job.Each of these codec implements an algorithm for compression and decompression and also has different characteristics.

Here is the list of  supported compression format

  1. Gzip
  2. Snappy
  3. lzo
  4. bzip2

4. Steps to Create MultipleOutputs with Multiple Compress Format

MultipleOutputs class provide facility to write Hadoop map/reducer output to more than one folders. Basically, we can use MultipleOutputs when we want to write outputs other than map reduce job default output and write map reduce job output to different files provided by a user.

4.1 Create Custom Output Writer

The first step is to create custom output writer class by extending OutputFormat class and override getRecordWriter method.

4.2 Create Custom Record Writer

Next step is to create custom record writer class by extending classRecordWriter.

4.3 Set CustomOutput Writer as namedoutput

The final step is to configured custom output format class as MultipleOutputs named output. Refer below code snippet for more details.

MultipleOutputs.addNamedOutput(job,"AHMEDABAD", LzoOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"DELHI", DeflateOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"MUMBAI", BZipOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"OTHER", LzoOutputFormat.class,Text.class,Text.class);

5. Sample Input

1	John Cena,Sr.Software Engineer,TCS,Ahmedabad
2	Peter,Sr.Software Engineer,Infosys,Delhi
3	S. Mathur,Software Engineer,capgemini,Mumbai
4	Ranvir D.,Data scientiest,TCS,Ahmedabad
5	Kane Will,Technical Lead,TCS,Ahmedabad

6. Example

6.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>MultipleOutputsMultipleCompressionExample</groupId>
    <artifactId>MultipleOutputsMultipleCompressionExample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <description>MultipleOutputs with Multiple Compression Method Example</description>
    <build>
        <finalName>MultipleOutputsMultipleCompressionExample</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <useSystemClassLoader>false</useSystemClassLoader>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.1</version>
        </dependency>
        <!--
        hadoop-mapreduce-client-jobclient dependency for local debug
        -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.1.1</version>
        </dependency>
    </dependencies>
</project>

6.2 BZipOutputFormat

package com.javadeveloperzone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.DataOutputStream;
import java.io.IOException;

public class BZipOutputFormat<K,V> extends FileOutputFormat<K, V> {
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        //compress output logic
        Class<? extends CompressionCodec> codecClass = BZip2Codec.class;
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
        String extension = codec.getDefaultExtension();
        Path file = this.getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        FSDataOutputStream fileOut = fs.create(file, false);
        return new MultipleCompressionLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut)));
    }

}

6.3 DeflateOutputFormat

package com.javadeveloperzone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.DataOutputStream;
import java.io.IOException;
public class DeflateOutputFormat<K,V> extends FileOutputFormat<K, V> {
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        //compress output logic
        Class<? extends CompressionCodec> codecClass = DefaultCodec.class;
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
        String extension = codec.getDefaultExtension();
        Path file = this.getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        FSDataOutputStream fileOut = fs.create(file, false);
        return new MultipleCompressionLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut)));
    }
}

6.4 LzoOutputFormat

package com.javadeveloperzone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.DataOutputStream;
import java.io.IOException;
public class LzoOutputFormat<K,V> extends FileOutputFormat<K, V> {
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        //compress output logic
        Class<? extends CompressionCodec> codecClass = Lz4Codec.class;
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
        String extension = codec.getDefaultExtension();
        Path file = this.getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        FSDataOutputStream fileOut = fs.create(file, false);
        return new MultipleCompressionLineRecordWriter<>(new DataOutputStream(codec.createOutputStream(fileOut)));
    }
}

6.5 MultipleCompressionLineRecordWriter

package com.javadeveloperzone;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class MultipleCompressionLineRecordWriter<K, V> extends RecordWriter<K, V> {
    protected DataOutputStream out;
    private final byte[] recordSeprator;
    private final byte[] fieldSeprator;
    public MultipleCompressionLineRecordWriter(DataOutputStream out, String fieldSeprator, String recordSeprator) {
        this.out = out;
        this.fieldSeprator = fieldSeprator.getBytes(StandardCharsets.UTF_8);
        this.recordSeprator = recordSeprator.getBytes(StandardCharsets.UTF_8);
    }
    public MultipleCompressionLineRecordWriter(DataOutputStream out) {
        this(out, "\t","\n");
    }
    private void writeObject(Object o) throws IOException {
        if (o instanceof Text) {
            Text to = (Text)o;
            this.out.write(to.getBytes(), 0, to.getLength());
        } else {
            this.out.write(o.toString().getBytes(StandardCharsets.UTF_8));
        }
    }
    public synchronized void write(K key, V value) throws IOException {
        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (!nullKey || !nullValue) {
            if (!nullKey) {
                this.writeObject(key);
            }
            if (!nullKey && !nullValue) {
                this.out.write(this.fieldSeprator);
            }
            if (!nullValue) {
                this.writeObject(value);
            }
            this.out.write(recordSeprator);//write custom record separator instead of NEW_LINE
        }
    }
    public synchronized void close(TaskAttemptContext context) throws IOException {
        this.out.close();
    }
}

6.6 MultipleOutputsMapper

package com.javadeveloperzone;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
import static com.javadeveloperzone.MultipleOutputsMultipleCompressionDriver.*;
public class MultipleOutputsMapper extends Mapper<Text,Text,Text,Text> {
    MultipleOutputs multipleOutputs;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs(context);
    }
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        String empData[] = value.toString().split(",");
        if(empData[3].equalsIgnoreCase(AHMEDABAD)){
            multipleOutputs.write(AHMEDABAD,key,value, "AHMEDABAD/AHMEDABAD");
        }else if (empData[3].equalsIgnoreCase(DELHI)){
            multipleOutputs.write(DELHI,key,value,"DELHI/DELHI");
        }else if(empData[3].equalsIgnoreCase(MUMBAI)){
            multipleOutputs.write(MUMBAI,key,value,"MUMBAI/MUMBAI");
        }else {
            multipleOutputs.write(OTHER,key,value,"OTHER/OTHER");
        }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
}

6.6 MultipleOutputsMultipleCompressionDriver

package com.javadeveloperzone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipleOutputsMultipleCompressionDriver extends Configured implements Tool {

    public static final String OTHER = "OTHER";
    public static final String MUMBAI = "MUMBAI";
    public static final String DELHI = "DELHI";
    public static final String AHMEDABAD = "AHMEDABAD";
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(),
                new MultipleOutputsMultipleCompressionDriver(), args);
        System.exit(exitCode);
    }
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.println("Please provide two arguments :");
            System.out.println("[ 1 ] Input dir path");
            System.out.println("[ 2 ] Output dir path");
            return -1;
        }
        Configuration c = new Configuration();
        String[] files = new GenericOptionsParser(c,args).getRemainingArgs();
        Path input= new Path(files[0]);
        Path output= new Path(files[1]);
        Job job=Job.getInstance();
        job.setJarByClass(MultipleOutputsMultipleCompressionDriver.class);
        job.setMapperClass(MultipleOutputsMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        MultipleOutputs.addNamedOutput(job,"AHMEDABAD", LzoOutputFormat.class,Text.class,Text.class);
        MultipleOutputs.addNamedOutput(job,"DELHI", DeflateOutputFormat.class,Text.class,Text.class);
        MultipleOutputs.addNamedOutput(job,"MUMBAI", BZipOutputFormat.class,Text.class,Text.class);
        MultipleOutputs.addNamedOutput(job,"OTHER", LzoOutputFormat.class,Text.class,Text.class);
        boolean success = job.waitForCompletion(true);
        return (success?0:1);
    }
}

7. Steps To Run

7.1 Build & Prepare Jar

clean & install maven goals. If everything is going well, MultipleOutputsMultipleCompressionExample.jar file will be created under a target folder.

7.2 Copy Data to HDFS

here are the commands to create a new directory and copy a local file into HDFS for further processing.

hadoop fs -mkdir /user/data/multiplecompressioninput
hadoop fs -put MultipleCompressionInputFile.txt /user/data/multiplecompressioninput

7.3 Run

Now it’s time to run our Multiple Outputs with different compression method Example. 

Go to your MultipleOutpusMultipleCompressionExample.jar location and run below commands.

hadoop jar MultipleOutputsMultipleCompressionExample.jar com.javadeveloperzone.MultipleOutputsMultipleCompressionDriver /user/data/multiplecompressioninput /user/data/output

8. Output

Once we run above command, Map Reduce job started and output will be written in HDFS in city wise separate folder with LZO, Deflate and BZip Compression.

8.1 HDFS Output

8.1.1 LZO Compression 

Hadoop Multiple Compression method lz4 Compression

8.1.2 Deflate (Default) Compression 

Hadoop Multiple Compression method deflate Compression

8.1.3 BZip COmpression

Hadoop Multiple Compression method bz2 Compression

8.2 Local Debug Output

Hadoop Multiple Compression Local Debug Output

9. Conclusion

In this article, we have discussed Hadoop compression formats with an example. Here we have discussed how to create a custom Hadoop output format, record writer classes and set different compression format in a single map reduce job with an example.

10. References

11. Source Code

Multiple Outputs Multiple Compression Example

You can also check our Git repository for Hadoop MultipleOutputs With Different Compression Format and other useful examples.

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *