1. Overview

In our previous article, we explained how to access database table using Hadoop MapReduce,
in this article we are going to explain how to write data to database using Hadoop.
Reading and/or writing to/from database is the foundation of building hadoop etl tools.

Consider a case where we are having csv text files in HDFS which contains the product details of an online
retail store and the files contains millions of rows, so now we are interested in loading those records into database.

Our HDFS csv files contains unique records of the products which we want to load into the RDBMS, so this article explains how to load csv files to MySql using Hadoop.

So it is Hadoop MapReduce tutorial which serves as a base for reading text files using Hadoop MapReduce and storing the data in database table.

2. Development environment

Java : Oracle JDK 1.8
Hadoop : Apache Hadoop 2.6.1
IDE : Eclipse
Build Tool: Maven
Database : MySql 5.6.33

3. Sample Input

The csv text files contains the records in following format,
<StockCode>,<Description>,<Quantity>
where Stock Code is the unique stock code of a product.
Description is the name/decription of product.
Quantiry is the quantity available for the specific product.

Following is list of sample rows available in csv files,

<StockCode>,<Description>,<Quantity>
85123A,WHITE HANGING HEART T-LIGHT HOLDER,100
71053,WHITE METAL LANTERN,116
84406B,CREAM CUPID HEARTS COAT HANGER,80
84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6123
84029E,RED WOOLLY HOTTIE WHITE HEART.,612
22752,SET 7 BABUSHKA NESTING BOXES,2345
21730,GLASS STAR FROSTED T-LIGHT HOLDER,786
22633,HAND WARMER UNION JACK,216
22632,HAND WARMER RED POLKA DOT,12656
84879,ASSORTED COLOUR BIRD ORNAMENT,322
22745,POPPY'S PLAYHOUSE BEDROOM ,986
22748,POPPY'S PLAYHOUSE KITCHEN,8716
22749,FELTCRAFT PRINCESS CHARLOTTE DOLL,1248
22310,IVORY KNITTED MUG COSY ,5326
84969,BOX OF 6 ASSORTED COLOUR TEASPOONS,2126
22623,BOX OF VINTAGE JIGSAW BLOCKS ,4213
22622,BOX OF VINTAGE ALPHABET BLOCKS,721
21754,HOME BUILDING BLOCK WORD,9823
21755,LOVE BUILDING BLOCK WORD,2653
21777,RECIPE BOX WITH METAL HEART,4635
48187,DOORMAT NEW ENGLAND,7864
22960,JAM MAKING SET WITH JARS,9876
22913,RED COAT RACK PARIS FASHION,2389
22912,YELLOW COAT RACK PARIS FASHION,7342
22914,BLUE COAT RACK PARIS FASHION,6423
21756,BATH BUILDING BLOCK WORD,435
22728,ALARM CLOCK BAKELIKE PINK,23674
22727,ALARM CLOCK BAKELIKE RED ,12234
22726,ALARM CLOCK BAKELIKE GREEN,5132

Create product table using the DDL query,

CREATE TABLE `retail`.`product` (
      `stockcode` VARCHAR(45) NOT NULL,
      `description` VARCHAR(45) NOT NULL,
      `quantity` INT,
      PRIMARY KEY (`stockcode`));

4. Solution

Hadoop does provides various datatypes like IntWritable,FloatWritable, DoubleWritable etc,
but in our case we are going to implement custom OutputWritable which will enable us to write
the data rows to Database table.
In order to implement Custom Output writable, we have to implement DBWritable interface.

Reading RDBMS requires database connection from hadoop to MySql, so we need to place the relevant jar file to Hadoop’s lib folder,
so in our case we have copied the mysql-connector-java-5.1.5.jar file to HADOOP common’s lib folder.
We have to copy the jar file to Hadoop Task Tracker machines and at the machine from where we are launching job from.
Once you copied the jar file, you need to restart the cluster.

We are going to use following 4 Java files for this example,

project-write-data-database-hadoop

4.1 Build File : pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.jdz</groupId>
  <artifactId>HadoopDBOutputWritable</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>HadoopDBOutputWritable</name>
  <url>http://maven.apache.org</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.6.0</hadoop.version>
    <jdk.version>1.7</jdk.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
          <source>${jdk.version}</source>
          <target>${jdk.version}</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

4.2 Driver Code : DBDriver.java

package com.jdz;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DBDriver extends Configured implements Tool{
  public int run(String[] args) throws Exception {
    
    Configuration conf = new Configuration();
    
       DBConfiguration.configureDB(conf,
       "com.mysql.jdbc.Driver",   // driver class
       "jdbc:mysql://localhost:3306/retail", // db url
       "root",    // user name
       "root"); //password
       Job job = Job.getInstance(conf);
       job.setJarByClass(DBDriver.class);
       job.setMapperClass(DBMapper.class);
       job.setReducerClass(DBReducer.class);
       
    //   job.setReducerClass(DBReducer.class);
       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(Text.class);
      
       job.setOutputKeyClass(DBOutputWritable.class);
       job.setOutputValueClass(NullWritable.class);
      
       job.setInputFormatClass(TextInputFormat.class);
       job.setOutputFormatClass(DBOutputFormat.class);
       FileInputFormat.setInputPaths(job, new Path(args[0]));
       
       DBOutputFormat.setOutput(job, 
      		 "product", //output table name 
      		 new String[] {"stockcode","description","quantity"} //table columns
       );
       System.exit(job.waitForCompletion(true) ? 0 : 1);
    
       return 0;
  }
  
  public static void main(String[] args) {
    
    try{
    
    int result = ToolRunner.run(new Configuration(), new DBDriver(), args);
    
    System.out.println("job status ::"+result);
    }
    catch(Exception exception)
    {
      exception.printStackTrace();
    }
      
  }
}

4.2 Mapper Code : DBMapper.java

package com.jdz;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DBMapper extends Mapper<LongWritable, Text, Text, Text> {
  @Override
  protected void map(LongWritable id, Text value, Context context) {
    try 
    {
      String[] productValues = value.toString().split(",");
      
      context.write(new Text(productValues[0]),value);	
    
    } catch (Exception exception) {
      exception.printStackTrace();
    }
  }
}

4.3 Custom Database Writable : DBOutputWritable.java

package com.jdz;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DBOutputWritable implements Writable, DBWritable{
  
  private int quantity;
  private String stockCode;
  private String description;
  
  public DBOutputWritable(){}
  public DBOutputWritable(String stockCode,String description, int quantity)
  {
    this.stockCode=stockCode;
    this.description = description;
    this.quantity=quantity;
  }
  public void write(PreparedStatement statement) throws SQLException {
    
    statement.setString(1, stockCode);
    statement.setString(2, description);
    statement.setInt(3, quantity);
  }
  public void readFields(ResultSet resultSet) throws SQLException {
    
    this.stockCode = resultSet.getString(1);
    this.description = resultSet.getString(2);
    this.quantity=resultSet.getInt(3);
  }
  public void write(DataOutput out) throws IOException {
  }
  public void readFields(DataInput in) throws IOException {
  }
}

4.4 Reducer : DBReducer.java

package com.jdz;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DBReducer extends Reducer<Text,Text,DBOutputWritable,NullWritable>{
  
  @Override
  protected void reduce(Text key, Iterable<Text> values,
      Reducer<Text, Text, DBOutputWritable, NullWritable>.Context context) throws IOException, InterruptedException {
    
    Text finalValue=null;
    
    for(Text value : values){
      finalValue=value;
    }
    
    String[] productValues = finalValue.toString().split(",");
    
    DBOutputWritable productRecord = new DBOutputWritable(productValues[0], productValues[1], Integer.parseInt(productValues[2]));
    
    context.write(productRecord, NullWritable.get());
  }
}

4.5 Copy files from local file system to HDFS

Make sure your Hadoop cluster is up and running.
I have used following command to copy files from local file system HDFS.

hdfs dfs -copyFromLocal product.csv /input/javadeveloperzone/dbwritable

5. Build & Run Application

Now build the Jar file which we are going to submit to Hadoop cluster.
Once the Jar file building is completed, we can use following command to run job on Hadoop cluster.

hadoop jar HadoopDBOutputWritable.jar /input/javadeveloperzone/dbwritable

6. Output

Once the job is completed successfully, you may run SELECT query on product table that we created in step 3 and the output will look like,

7-1-write-data-to-database-using-hadoop

7. Source Code

You can download the source code of tutorial how to write data to database using Hadoop at our git repository,
which can be baby step for writing complex Hadoop MapReduce programs using Java.

If you’re looking to get your hands on big data, Hadoop is the platform for you. Hadoop helps you assemble, process, and analyze Big data sets quickly and efficiently. With Hadoop, you can write code to interact with data stored in HDFS (Hadoop Distributed File System), MapReduce, or Sqoop. You can also use Hive to query data stored in Hadoop.

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *