1. Overview

In this article we are going to explain how to access Database using Hadoop MapReduce.
Nowadays, in ETL world, reading the Database and processing millions of records is perhaps the principle operation.
So in our hadoop tutorial journey, we are going to explain simple way to read database table using Hadoop MapReduce.

Consider a hypothetical case where we are having an online retail store’s database and we are having a User table which contains millions of rows and we are interested in reading that table using Mapreduce.

For sake of simplicity we are going to read the table and we will emmit the records in HDFS as key value pairs.
So this Hadoop MapReduce tutorial serves as a base for reading RDBMS using Hadoop MapReduce where our data source is MySQL database and sink is HDFS.

2. Development environment

Java : Oracle JDK 1.8
Hadoop : Apache Hadoop 2.6.1
IDE : Eclipse
Build Tool: Maven
Database : MySql 5.6.33

3. Sample Input

For this tutorial, we do have a MySql table named as user. The table contains following 3 columns,
user_id, user_name, department

Following is list of rows available in the user table,

user_id 	user_name      		department           
------- -------------- --------------------------------------------- 
1       	prashant khunt 		Big Data Development 
2       	Jayesh Patel   		HR Department        
3       	Pratik S       		Java                 
4       	Prakash P      		Web Development      
5       	Jagdish K      		Designing

DDL for user table looks like,

CREATE TABLE `retail`.`users` (
  `user_id` INT NOT NULL AUTO_INCREMENT,
  `user_name` VARCHAR(45) NOT NULL,
  `department` VARCHAR(45) NOT NULL,
  PRIMARY KEY (`user_id`));

4. Solution

Hadoop does provides various data types like IntWritable,FloatWritable, DoubleWritable etc,
but in our case we are going to implement custom InputWritable which will enable us to read the data rows from Database table.
In order to implement Custom input writable, we have to implement DBWritable interface.

In order to make the Database table reading simpler, we are just reading the User table using Mapper class and we are just writing the details to Text files in HDFS.

Reading RDBMS requires database connection from hadoop to MySql, so we need to place the relevant jar file to Hadoop’s lib folder,
so in our case we have copied the mysql-connector-java-5.1.5.jar file to HADOOP common’s lib folder.
We have to copy the jar file to Hadoop Task Tracker machines and at the machine from where we are launching job from.
Once you copied the jar file, you need to restart the cluster.

We are going to use following 3 Java files for this example,

DBDriver.java
DBInputWritable.java
DBMapper.java

Access Database using Hadoop MapReduce

4.1 Build File : pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.jdz</groupId>
  <artifactId>HadoopDBInputWritable</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>HadoopDBInputWritable</name>
  <url>http://maven.apache.org</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.6.1</hadoop.version>
    <jdk.version>1.7</jdk.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
          <source>${jdk.version}</source>
          <target>${jdk.version}</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

4.2 Custom Input Writable: DBInputWritable.java

package com.jdz;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
/*CREATE TABLE `retail`.`users` (
      `user_id` INT NOT NULL AUTO_INCREMENT,
      `user_name` VARCHAR(45) NOT NULL,
      `department` VARCHAR(45) NOT NULL,
      PRIMARY KEY (`user_id`));
*/
public class DBInputWritable implements Writable, DBWritable
{
  private int userId;
  
  private String userName,department;
  
  public void write(PreparedStatement statement) throws SQLException {
    statement.setInt(1, userId);
    statement.setString(2, userName);
    statement.setString(3, department);
    
  }
  public void readFields(ResultSet resultSet) throws SQLException {
    userId = resultSet.getInt(1);
    userName = resultSet.getString(2);
    department=resultSet.getString(3);
  }
  public void write(DataOutput out) throws IOException {
  }
  public void readFields(DataInput in) throws IOException {
  }
  public int getUserId() {
    return userId;
  }
  public String getUserName() {
    return userName;
  }
  public String getDepartment() {
    return department;
  }
}

4.3 Mapper Code: DBMapper.java

package com.jdz;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DBMapper extends Mapper<LongWritable, DBInputWritable, Text, NullWritable> {

  protected void map(LongWritable id, DBInputWritable value, Context ctx) {
    try 
    {
      String userDetails = value.getUserName()+","+value.getDepartment();
      ctx.write(new Text(userDetails), NullWritable.get());
    } catch (IOException ioException) {
      ioException.printStackTrace();
    } catch (InterruptedException interruptedException) {
      interruptedException.printStackTrace();
    }
  }
}

4.4 Driver Code: DBDriver.java

package com.jdz;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DBDriver extends Configured implements Tool {
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", // driver class
        "jdbc:mysql://localhost:3306/retail", // db url
        "root", // user name
        "root"); // password
    Job job = Job.getInstance(conf);
    job.setJarByClass(DBDriver.class);
    job.setMapperClass(DBMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    job.setInputFormatClass(DBInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    DBInputFormat.setInput(job, DBInputWritable.class, "users", // input table name
        null, null, new String[] { "user_id", "user_name", "department" } // table columns
    );
    FileOutputFormat.setOutputPath(job, new Path(args[0]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    return 0;
  }
  public static void main(String[] args) {
    try {
      int result = ToolRunner.run(new Configuration(), new DBDriver(), args);
      System.out.println("job status ::" + result);
      
    } catch (Exception exception) {
      exception.printStackTrace();
    }
  }
}

5. Build & Run Application

Now build the Jar file which we are going to submit to Hadoop cluster.
Once the Jar file building is completed, we can use following command to
run job on Hadoop cluster.

hadoop jar HadoopDBInputWritable.jar com.jdz.DBDriver /output/dbinputwritable

6. Output

Once the job is completed successfully, you will get the output which looks like following output,

Jagdish K,Designing
Jayesh Patel,HR Department
Prakash P,Web Development
Pratik S,Java
prashant khunt,Big Data Development

7. Source Code

You can download the source code of Access Database using Hadoop MapReduce example using Java at git repository, which can be boilerplate code for writing complex Hadoop MapReduce programs using Java.

 

 

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *