

Table of Contents
1. Overview
Hadoop provides various writable classes like IntWritable, Text, LongWritable, ObjectWritable etc..It also provides flexibility to its developers to use APIs and extend some of the functionality to achieve organization goals. In this example, we will discuss how to create custom writable classes and use that classes as a map reduce program.
2. Development Environment
Hadoop: 3.1.1
Java: Oracle JDK 1.8
IDE: IntelliJ Idea 2018.3
3. Steps for creating custom value writable data types
Let’s discuss all the steps to create our own custom writable data types which can be used as MapReduce key
class or value
class.In this example we will create a custom writable datatype which used as MapReduce value class
.
3.1 Implement Writable Interface
create custom writable should implement Writable interface to use as a map reduce jobs value class.
3.2 write method
Override write method and add logic to write all the fields value. In case of list or collections, first write size of the collection variable and then write all the value into it.
3.3 readFields method
A read fields method will read all the fields value from input stream. We must follow the same order of read and write of a data members.
3.4 Add Default Constructor
The final step is to add one default constructor to allow serialization/deserialization of a custom data types.
4. Sample Input File
1 John,35,1 2 Jinni,32,1 3 Ronan,3,1 4 Alex,40,2 5 Maria,36,2 6 Shira,4,2 7 Hugo,2,2 8 Erik,26,3 9 Robert,42,4 10 Anna,40,4 11 Antonio,6,4 12 Marco,4,4 13 Daniel,31,5 14 Milena,30,5 15 Brayden,2,5 16 Sergey,28,6 17 Elen,29,6 18 Eduard,32,7 19 Levon,34,8 20 Mark,30,9
The entries in family.txt file are in the following format.
<Sequence>\t<MemberName>,<Age>,<FamilyID>
5. Example
In this example, We have created custom FamilyWritable data types to aggregate all the family members and calculate their average age. We have tried to make this example as easy as possible for better understanding.
5.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>HadoopCustomWritableExample</groupId> <artifactId>HadoopCustomWritableExample</artifactId> <version>1.0-SNAPSHOT</version> <description>Hadoop Custom Value Writable Data Type Example</description> <build> <finalName>HadoopCustomValueWritableDataTypeExample</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <useSystemClassLoader>false</useSystemClassLoader> </configuration> </plugin> </plugins> </build> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.1</version> </dependency> <!-- hadoop-mapreduce-client-jobclient dependency for local debug --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>3.1.1</version> </dependency> </dependencies> </project>
5.2 FamilyWritable
package com.javadeveloperzone; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class FamilyWritable implements Writable { private IntWritable familyId; private IntWritable totalAge; private List<Text> familyMemberList; //default constructor for (de)serialization public FamilyWritable() { familyId = new IntWritable(0); familyMemberList = new ArrayList<Text>(); totalAge = new IntWritable(0); } public void write(DataOutput dataOutput) throws IOException { familyId.write(dataOutput); //write familyId totalAge.write(dataOutput); //write totalAge dataOutput.writeInt(familyMemberList.size()); //write size of list for(int index=0;index<familyMemberList.size();index++){ familyMemberList.get(index).write(dataOutput); //write all the value of list } } public void readFields(DataInput dataInput) throws IOException { familyId.readFields(dataInput); //read familyId totalAge.readFields(dataInput); //read totalAge int size = dataInput.readInt(); //read size of list familyMemberList = new ArrayList<Text>(size); for(int index=0;index<size;index++){ //read all the values of list Text text = new Text(); text.readFields(dataInput); familyMemberList.add(text); } } public IntWritable getTotalAge() { return totalAge; } public void setTotalAge(IntWritable totalAge) { this.totalAge = totalAge; } public IntWritable getFamilyId() { return familyId; } public void setFamilyId(IntWritable familyId) { this.familyId = familyId; } public List<Text> getFamilyMemberList() { return familyMemberList; } public void setFamilyMemberList(List<Text> familyMemberList) { this.familyMemberList = familyMemberList; } public FamilyWritable(IntWritable familyId, List<Text> familyMemberList) { this.familyId = familyId; this.familyMemberList = familyMemberList; } public void addFamilyMember(Text familyMember,int age){ this.familyMemberList.add(familyMember); this.totalAge.set(this.totalAge.get()+age); } public void addTotalAge(IntWritable totalAge) { this.totalAge.set(this.totalAge.get()+totalAge.get()); } @Override public String toString() { //average age, family member 1, family member 2... family member n return (float)totalAge.get()/ familyMemberList.size()+","+familyMemberList.toString() .replace("[","") .replace("]",""); } }
5.3 FamilyMapper
package com.javadeveloperzone; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import java.io.IOException; import static com.javadeveloperzone.HadoopCustomWritableDriver.*; public class FamilyMapper extends Mapper<Text,Text,IntWritable,FamilyWritable> { IntWritable intKey; FamilyWritable familyWritable; @Override protected void setup(Context context){ intKey = new IntWritable(0); } @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String familyData[] = value.toString().split(","); intKey.set(Integer.parseInt(familyData[2])); familyWritable = new FamilyWritable(); familyWritable.setFamilyId(intKey); familyWritable.addFamilyMember(new Text(familyData[0]),Integer.parseInt(familyData[1])); try { context.write(intKey, familyWritable); }catch (Exception e){ e.printStackTrace(); } } }
5.4 FamilyReducer
package com.javadeveloperzone; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FamilyReducer extends Reducer<IntWritable,FamilyWritable,IntWritable,FamilyWritable> { @Override protected void reduce(IntWritable key, Iterable<FamilyWritable> values, Context context) throws IOException, InterruptedException { FamilyWritable aggrigrateFamilyMembers = new FamilyWritable(); for(FamilyWritable familyWritable : values){ aggrigrateFamilyMembers.getFamilyMemberList().addAll(familyWritable.getFamilyMemberList()); aggrigrateFamilyMembers.addTotalAge(familyWritable.getTotalAge()); } aggrigrateFamilyMembers.setFamilyId(key); context.write(key,aggrigrateFamilyMembers); } }
5.5 HadoopCustomWritableDriver
package com.javadeveloperzone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HadoopCustomWritableDriver extends Configured implements Tool { public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new HadoopCustomWritableDriver(), args); System.exit(exitCode); } public int run(String[] args) throws Exception { if (args.length != 2) { System.out.println("Please provide two arguments :"); System.out.println("[ 1 ] Input dir path"); System.out.println("[ 2 ] Output dir path"); return -1; } Configuration c=new Configuration(); String[] files=new GenericOptionsParser(c,args).getRemainingArgs(); Path input=new Path(files[0]); Path output=new Path(files[1]); Configuration conf=new Configuration(); /* * UnComment below three lines to enable local debugging of map reduce job * */ // conf.set("fs.defaultFS", "local"); // conf.set("mapreduce.job.maps","1"); // conf.set("mapreduce.job.reduces","1"); Job job=Job.getInstance(conf,"Hadoop Custom Writable Key Example"); job.setJarByClass(HadoopCustomWritableDriver.class); job.setMapperClass(FamilyMapper.class); job.setReducerClass(FamilyReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(FamilyWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(FamilyWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); boolean success = job.waitForCompletion(true); return (success?0:1); } }
6. Build & Run
Hadoop Custom value writable is ready. Compile and create jar files, copy sample input file to HDFS and run below command.
hadoop jar HadoopCustomValueWritableDataTypeExample.jar com.javadeveloperzone.HadoopCustomWritableDriver /user/data/valuewritableinput /user/data/valuewritableoutput
7. Output
7.1 HDFS Output
1 23.333334,Jinni, Ronan, John 2 20.5,Alex, Maria, Shira, Hugo 3 26.0,Erik 4 23.0,Anna, Marco, Antonio, Robert 5 21.0,Daniel, Milena, Brayden 6 28.5,Sergey, Elen 7 32.0,Eduard 8 34.0,Levon 9 30.0,Mark
7.2 Local Debug Output
We have also run this example in local machine for debugging purpose. Here is the output

Custom Writable Value Local Output
8. Conclusion
In this example, we have discussed steps to create custom writable. Develop Family Writable which we have used to aggregate family members and calculate the average age of family members.
9. References
10. Source Code
Create Hadoop Custom Writable Value Example
You can also check our Git repository for Hadoop Create Custom Value Writable Example and other useful examples.