1. Overview

Hadoop provides various writable classes like IntWritable, Text, LongWritable, ObjectWritable etc..It also provides flexibility to its developers to use APIs and extend some of the functionality to achieve organization goals. In this example, we will discuss how to create custom writable classes and use that classes as a map reduce program. In our previous post we have discussed how to use the custom writable class as a map-reduce value class. In this article, we will focus on custom writable which can be used as map-reduce key as well as value class.

2. Development Environment

Hadoop: 3.1.1

Java: Oracle JDK 1.8

IDE: IntelliJ Idea 2018.3

3. Steps for creating custom Key writable data types

Let’s discuss all the steps to create our own custom writable data types which can be used as MapReduce key class or value class. In this example we will create a custom writable datatype which used as MapReduce key class as well as value class.

3.1 Implement WritableComparable Interface

create custom writable should implement WritableComparable interface to use as a map reduce jobs key and value class.

3.2 write method

Override write method and add logic to write all the fields value. In case of list or collections, first write size of the collection variable and then write all the value into it.

3.3 readFields method

A read fields method will read all the fields value from input stream. We must follow the same order of read and write of a data members.

3.4 Add Default Constructor

The final step is to add one default constructor to allow serialization/deserialization of a custom data types.

3.5 equals and hashCode

As we use our custom writable as a map-reduce key class we need to override equals and hashcode method to get the desired result.

3.6 compareTo

Override compareTo method and write logic of our custom data type sorting. This method is very important as if we do mistake in this method we may get the wrong result, duplicate key in reducer etc…

4. Sample Input File

1	14.623801,75.621788,greenwich,1,2
2	14.623801,75.621788,greenwich,2,2
3	14.623801,75.621788,greenwich,3,2
4	14.623801,75.621788,greenwich,4,4
5	14.623801,75.621788,greenwich,10,3
6	9.383452,76.574059,little venice,11,2
7	9.383452,76.574059,little venice,18,1
8	28.440554,74.493011,covent garden,16,1
9	28.440554,74.493011,covent garden,23,2
10	28.440554,74.493011,covent garden,34,3
11	28.440554,74.493011,covent garden,56,1
12	24.882618,72.858894,london bridge,87,1
13	16.779877,74.556374,camden town,67,1
14	16.779877,74.556374,camden town,54,4
15	16.779877,74.556374,camden town,54,4
16	27.085251,88.700928,hampstead,34,3
17	12.715035,77.281296,city of london,23,2
18	12.715035,77.281296,city of london,22,2
19	13.432515,77.727478,notting hill,98,3

The entries in passengerRush.txt file are in the following format.

<Sequence>\t<Latitude>,<Longitude>,<Location>,<TaxiID>,<NoOfPassenger>

5. Example

5.1 Problem Statement

Consider one new Project where we got requirements from one taxi agency and they want to analyze passenger rush in the different location of London city. Based on the result they will analyze results and can reschedule taxis to gain more money.

Taxi agency will collect all the taxi data from past 2 months and our job is to parse this input data and give location names where more number of passenger uses a taxi.

5.2 Solution

As we see in sample input, we have latitude, longitude, location names with all the taxiID along with a number of passengers take a trip. We need to write a custom writable class of geolocation data and compare at which location more number of passengers uses taxis.

5.3 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HadoopCustomKeyWritableExample</groupId>
    <artifactId>HadoopCustomKeyWritableExample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <description>Hadoop Custom Key Writable Data Type,
        London City Passenger Rush Analysis
    </description>
    <build>
        <finalName>HadoopCustomKeyWritableDataTypeExample</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <useSystemClassLoader>false</useSystemClassLoader>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>7</source>
                    <target>7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.1</version>
        </dependency>

        <!--
        hadoop-mapreduce-client-jobclient dependency for local debug
        -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.1.1</version>
        </dependency>

    </dependencies>

</project>

5.4 GeoLocationWritable

package com.javadeveloperzone;

import org.apache.hadoop.io.*;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class GeoLocationWritable implements WritableComparable<GeoLocationWritable> {

    private DoubleWritable longitude,latitude;
    private Text locationName;

    //default constructor for (de)serialization
    public GeoLocationWritable() {
        longitude = new DoubleWritable(0.0d);
        latitude = new DoubleWritable(0.0d);
        locationName = new Text();
    }

    public GeoLocationWritable(DoubleWritable longitude, DoubleWritable latitude, Text locationName) {
        this.longitude = longitude;
        this.latitude = latitude;
        this.locationName = locationName;
    }

    public void write(DataOutput dataOutput) throws IOException {
        longitude.write(dataOutput);
        latitude.write(dataOutput);
        locationName.write(dataOutput);
    }

    public void readFields(DataInput dataInput) throws IOException {
        longitude.readFields(dataInput);
        latitude.readFields(dataInput);
        locationName.readFields(dataInput);
    }

    public DoubleWritable getLongitude() {
        return longitude;
    }

    public void setLongitude(DoubleWritable longitude) {
        this.longitude = longitude;
    }

    public DoubleWritable getLatitude() {
        return latitude;
    }

    public void setLatitude(DoubleWritable latitude) {
        this.latitude = latitude;
    }

    public int compareTo(GeoLocationWritable o) {
        int dist = Double.compare(this.getLatitude().get(),o.getLatitude().get());
        dist= dist==0 ? Double.compare(this.getLongitude().get(),o.getLongitude().get()):dist;
        if(dist==0){
            return this.getLocationName().toString().compareTo(o.locationName.toString());
        }else{
            return dist;
        }
    }

    public void setLocationName(Text locationName) {
        this.locationName = locationName;
    }

    public Text getLocationName() {
        return locationName;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        GeoLocationWritable that = (GeoLocationWritable) o;
        return Objects.equals(longitude, that.longitude) &&
                Objects.equals(latitude, that.latitude) &&
                Objects.equals(locationName, that.locationName);
    }

    @Override
    public int hashCode() {
        return Objects.hash(longitude, latitude, locationName);
    }
}

5.5 PassengerRushMapper

package com.javadeveloperzone;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PassengerRushMapper extends Mapper<Text,Text,GeoLocationWritable, IntWritable> {

    IntWritable intKey;
    GeoLocationWritable geoLocationWritable;

    @Override
    protected void setup(Context context){
        intKey = new IntWritable(0);
    }

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {

        String taxiData[] = value.toString().split(",");
        intKey.set(Integer.parseInt(taxiData[4]));
        geoLocationWritable = new GeoLocationWritable();
        geoLocationWritable.setLatitude(new DoubleWritable(Double.parseDouble(taxiData[0])));
        geoLocationWritable.setLongitude(new DoubleWritable(Double.parseDouble(taxiData[1])));
        geoLocationWritable.setLocationName(new Text(taxiData[2]));

        context.write(geoLocationWritable,intKey );
    }

}

5.6 PassengerRushReducer

package com.javadeveloperzone;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PassengerRushReducer extends Reducer<GeoLocationWritable, IntWritable, Text, IntWritable> {

    IntWritable totalPassenger;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        totalPassenger = new IntWritable(0);
    }

    @Override
    protected void reduce(GeoLocationWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        for(IntWritable passegder : values){
            sum+=passegder.get();
        }
        totalPassenger.set(sum);
        context.write(new Text(key.getLocationName()),totalPassenger);
    }
}

5.7 PassengerRushAnalysisDriver

package com.javadeveloperzone;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class PassengerRushAnalysisDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {

        int exitCode = ToolRunner.run(new Configuration(),
                new PassengerRushAnalysisDriver(), args);
        System.exit(exitCode);
    }

    public int run(String[] args) throws Exception {

        if (args.length != 2) {
            System.out.println("Please provide two arguments :");
            System.out.println("[ 1 ] Input dir path");
            System.out.println("[ 2 ] Output dir path");
            return -1;
        }

        Configuration c=new Configuration();
        String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
        Path input=new Path(files[0]);
        Path output=new Path(files[1]);
        Configuration conf=new Configuration();

        /*
        * UnComment below three lines to enable local debugging of map reduce job
         * */
        /*conf.set("fs.defaultFS", "local");
        conf.set("mapreduce.job.maps","1");
        conf.set("mapreduce.job.reduces","1");
        */

        Job job=Job.getInstance(conf,"Hadoop Custom Key Writable Example");
        job.setJarByClass(PassengerRushAnalysisDriver.class);
        job.setMapperClass(PassengerRushMapper.class);
        job.setReducerClass(PassengerRushReducer.class);
        job.setMapOutputKeyClass(GeoLocationWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(GeoLocationWritable.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setNumReduceTasks(1);
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        job.setSpeculativeExecution(false);
        boolean success = job.waitForCompletion(true);
        return (success?0:1);
    }
}

6. Build & Run

Hadoop Custom value writable is ready. Compile and create jar files, copy sample input file to HDFS and run below command.

hadoop jar HadoopCustomKeyWritableDataTypeExample.jar com.javadeveloperzone.PassengerRushAnalysisDriver /user/data/passengerrushinput /user/data/passengerrushoutput

7. Output

7.1 HDFS Output

little venice	3
city of london	4
notting hill	3
greenwich	13
camden town	9
london bridge	1
hampstead	3
covent garden	7

7.2 Local Debug Output

We have also run this example in local machine for debugging purpose. Here is the output

Custom Writable Key Local Output

8. Conclusion

Here, In this conclusion of Hadoop Create custom key writable article, we can say that many use cases where default writable classes are not best suited for some requirements. We can create our own custom writable classes by implementing WritableComparable class and override its method to achieve our goal. Apache Hadoop provides all the functionalities to solve any real word problems and helps to build word class analytics tools.

9. References

10. Source Code

Hadoop Create Custom Key Writable Example

You can also check our Git repository for Hadoop Custom Key Writable Example and other useful examples.

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *