1. Overview

Hadoop provides various writable classes like IntWritable, Text, LongWritable, ObjectWritable etc..It also provides flexibility to its developers to use APIs and extend some of the functionality to achieve organization goals. In this example, we will discuss how to create custom writable classes and use that classes as a map reduce program.

2. Development Environment

Hadoop: 3.1.1

Java: Oracle JDK 1.8

IDE: IntelliJ Idea 2018.3

3. Steps for creating custom value writable data types

Let’s discuss all the steps to create our own custom writable data types which can be used as MapReduce key class or value class.In this example we will create a custom writable datatype which used as MapReduce value class.

3.1 Implement Writable Interface

create custom writable should implement Writable interface to use as a map reduce jobs value class.

3.2 write method

Override write method and add logic to write all the fields value. In case of list or collections, first write size of the collection variable and then write all the value into it.

3.3 readFields method

A read fields method will read all the fields value from input stream. We must follow the same order of read and write of a data members.

3.4 Add Default Constructor

The final step is to add one default constructor to allow serialization/deserialization of a custom data types.

4. Sample Input File

1	John,35,1
2	Jinni,32,1
3	Ronan,3,1
4	Alex,40,2
5	Maria,36,2
6	Shira,4,2
7	Hugo,2,2
8	Erik,26,3
9	Robert,42,4
10	Anna,40,4
11	Antonio,6,4
12	Marco,4,4
13	Daniel,31,5
14	Milena,30,5
15	Brayden,2,5
16	Sergey,28,6
17	Elen,29,6
18	Eduard,32,7
19	Levon,34,8
20	Mark,30,9

The entries in family.txt file are in the following format.

<Sequence>\t<MemberName>,<Age>,<FamilyID>

5. Example

In this example, We have created custom FamilyWritable data types to aggregate all the family members and calculate their average age. We have tried to make this example as easy as possible for better understanding.

5.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HadoopCustomWritableExample</groupId>
    <artifactId>HadoopCustomWritableExample</artifactId>
    <version>1.0-SNAPSHOT</version>

    <description>Hadoop Custom Value Writable Data Type Example</description>
    <build>
        <finalName>HadoopCustomValueWritableDataTypeExample</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <useSystemClassLoader>false</useSystemClassLoader>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.1</version>
        </dependency>

        <!--
        hadoop-mapreduce-client-jobclient dependency for local debug
        -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.1.1</version>
        </dependency>

    </dependencies>
</project>

5.2 FamilyWritable

package com.javadeveloperzone;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class FamilyWritable implements Writable {

    private IntWritable familyId;
    private IntWritable totalAge;
    private List<Text> familyMemberList;

    //default constructor for (de)serialization
    public FamilyWritable() {
        familyId = new IntWritable(0);
        familyMemberList = new ArrayList<Text>();
        totalAge = new IntWritable(0);
    }

    public void write(DataOutput dataOutput) throws IOException {
        familyId.write(dataOutput); //write familyId
        totalAge.write(dataOutput); //write totalAge
        dataOutput.writeInt(familyMemberList.size());  //write size of list

        for(int index=0;index<familyMemberList.size();index++){
            familyMemberList.get(index).write(dataOutput); //write all the value of list
        }
    }


    public void readFields(DataInput dataInput) throws IOException {
        familyId.readFields(dataInput); //read familyId
        totalAge.readFields(dataInput); //read totalAge
        int size = dataInput.readInt(); //read size of list
        familyMemberList = new ArrayList<Text>(size);
        for(int index=0;index<size;index++){ //read all the values of list
            Text text = new Text();
            text.readFields(dataInput);
            familyMemberList.add(text);
        }
    }


    public IntWritable getTotalAge() {
        return totalAge;
    }

    public void setTotalAge(IntWritable totalAge) {
        this.totalAge = totalAge;
    }

    public IntWritable getFamilyId() {
        return familyId;
    }

    public void setFamilyId(IntWritable familyId) {
        this.familyId = familyId;
    }

    public List<Text> getFamilyMemberList() {
        return familyMemberList;
    }

    public void setFamilyMemberList(List<Text> familyMemberList) {
        this.familyMemberList = familyMemberList;
    }

    public FamilyWritable(IntWritable familyId, List<Text> familyMemberList) {
        this.familyId = familyId;
        this.familyMemberList = familyMemberList;
    }

    public void addFamilyMember(Text familyMember,int age){
        this.familyMemberList.add(familyMember);
        this.totalAge.set(this.totalAge.get()+age);
    }

    public void addTotalAge(IntWritable totalAge) {
        this.totalAge.set(this.totalAge.get()+totalAge.get());
    }
    
    @Override
    public String toString() {
        //average age, family member 1, family member 2... family member n
        return (float)totalAge.get()/ familyMemberList.size()+","+familyMemberList.toString()
                .replace("[","")
                .replace("]","");
    }
}

 

5.3 FamilyMapper

package com.javadeveloperzone;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import java.io.IOException;

import static com.javadeveloperzone.HadoopCustomWritableDriver.*;

public class FamilyMapper extends Mapper<Text,Text,IntWritable,FamilyWritable> {

    IntWritable intKey;
    FamilyWritable familyWritable;

    @Override
    protected void setup(Context context){
        intKey = new IntWritable(0);
    }

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {

        String familyData[] = value.toString().split(",");
        intKey.set(Integer.parseInt(familyData[2]));
        familyWritable = new FamilyWritable();
        familyWritable.setFamilyId(intKey);
        familyWritable.addFamilyMember(new Text(familyData[0]),Integer.parseInt(familyData[1]));
        try {
            context.write(intKey, familyWritable);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

5.4 FamilyReducer

package com.javadeveloperzone;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FamilyReducer extends Reducer<IntWritable,FamilyWritable,IntWritable,FamilyWritable> {


    @Override
    protected void reduce(IntWritable key, Iterable<FamilyWritable> values, Context context) throws IOException, InterruptedException {

        FamilyWritable aggrigrateFamilyMembers = new FamilyWritable();
        for(FamilyWritable familyWritable : values){
            aggrigrateFamilyMembers.getFamilyMemberList().addAll(familyWritable.getFamilyMemberList());
            aggrigrateFamilyMembers.addTotalAge(familyWritable.getTotalAge());
        }
        aggrigrateFamilyMembers.setFamilyId(key);
        context.write(key,aggrigrateFamilyMembers);
    }
}

 

5.5 HadoopCustomWritableDriver

package com.javadeveloperzone;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class HadoopCustomWritableDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {

        int exitCode = ToolRunner.run(new Configuration(),
                new HadoopCustomWritableDriver(), args);
        System.exit(exitCode);
    }

    public int run(String[] args) throws Exception {

        if (args.length != 2) {
            System.out.println("Please provide two arguments :");
            System.out.println("[ 1 ] Input dir path");
            System.out.println("[ 2 ] Output dir path");
            return -1;
        }

        Configuration c=new Configuration();
        String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
        Path input=new Path(files[0]);
        Path output=new Path(files[1]);
        Configuration conf=new Configuration();

        /*
        * UnComment below three lines to enable local debugging of map reduce job
         * */
//        conf.set("fs.defaultFS", "local");
//        conf.set("mapreduce.job.maps","1");
//        conf.set("mapreduce.job.reduces","1");

        Job job=Job.getInstance(conf,"Hadoop Custom Writable Key Example");
        job.setJarByClass(HadoopCustomWritableDriver.class);
        job.setMapperClass(FamilyMapper.class);
        job.setReducerClass(FamilyReducer.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(FamilyWritable.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(FamilyWritable.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        boolean success = job.waitForCompletion(true);
        return (success?0:1);
    }
}

6. Build & Run

Hadoop Custom value writable is ready. Compile and create jar files, copy sample input file to HDFS and run below command.

hadoop jar HadoopCustomValueWritableDataTypeExample.jar com.javadeveloperzone.HadoopCustomWritableDriver /user/data/valuewritableinput /user/data/valuewritableoutput

7. Output

7.1 HDFS Output

1	23.333334,Jinni, Ronan, John
2	20.5,Alex, Maria, Shira, Hugo
3	26.0,Erik
4	23.0,Anna, Marco, Antonio, Robert
5	21.0,Daniel, Milena, Brayden
6	28.5,Sergey, Elen
7	32.0,Eduard
8	34.0,Levon
9	30.0,Mark

7.2 Local Debug Output

We have also run this example in local machine for debugging purpose. Here is the output

Custom Writable Value Local Output

Custom Writable Value Local Output

8. Conclusion

In this example, we have discussed steps to create custom writable. Develop Family Writable which we have used to aggregate family members and calculate the average age of family members.

9. References

10. Source Code

Create Hadoop Custom Writable Value Example

You can also check our Git repository for Hadoop Create Custom Value Writable Example and other useful examples.

Was this post helpful?
Let us know, if you liked the post. Only in this way, we can improve us.
Yes
No