

Table of Contents
1. Overview
In our previous article, we explained how to access database table using Hadoop MapReduce,
in this article we are going to explain how to write data to database using Hadoop.
Reading and/or writing to/from database is the foundation of building hadoop etl tools.
Consider a case where we are having csv text files in HDFS which contains the product details of an online
retail store and the files contains millions of rows, so now we are interested in loading those records into database.
Our HDFS csv files contains unique records of the products which we want to load into the RDBMS, so this article explains how to load csv files to MySql using Hadoop.
So it is Hadoop MapReduce tutorial which serves as a base for reading text files using Hadoop MapReduce and storing the data in database table.
2. Development environment
Java : Oracle JDK 1.8
Hadoop : Apache Hadoop 2.6.1
IDE : Eclipse
Build Tool: Maven
Database : MySql 5.6.33
3. Sample Input
The csv text files contains the records in following format,
<StockCode>,<Description>,<Quantity>
where Stock Code is the unique stock code of a product.
Description is the name/decription of product.
Quantiry is the quantity available for the specific product.
Following is list of sample rows available in csv files,
<StockCode>,<Description>,<Quantity> 85123A,WHITE HANGING HEART T-LIGHT HOLDER,100 71053,WHITE METAL LANTERN,116 84406B,CREAM CUPID HEARTS COAT HANGER,80 84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6123 84029E,RED WOOLLY HOTTIE WHITE HEART.,612 22752,SET 7 BABUSHKA NESTING BOXES,2345 21730,GLASS STAR FROSTED T-LIGHT HOLDER,786 22633,HAND WARMER UNION JACK,216 22632,HAND WARMER RED POLKA DOT,12656 84879,ASSORTED COLOUR BIRD ORNAMENT,322 22745,POPPY'S PLAYHOUSE BEDROOM ,986 22748,POPPY'S PLAYHOUSE KITCHEN,8716 22749,FELTCRAFT PRINCESS CHARLOTTE DOLL,1248 22310,IVORY KNITTED MUG COSY ,5326 84969,BOX OF 6 ASSORTED COLOUR TEASPOONS,2126 22623,BOX OF VINTAGE JIGSAW BLOCKS ,4213 22622,BOX OF VINTAGE ALPHABET BLOCKS,721 21754,HOME BUILDING BLOCK WORD,9823 21755,LOVE BUILDING BLOCK WORD,2653 21777,RECIPE BOX WITH METAL HEART,4635 48187,DOORMAT NEW ENGLAND,7864 22960,JAM MAKING SET WITH JARS,9876 22913,RED COAT RACK PARIS FASHION,2389 22912,YELLOW COAT RACK PARIS FASHION,7342 22914,BLUE COAT RACK PARIS FASHION,6423 21756,BATH BUILDING BLOCK WORD,435 22728,ALARM CLOCK BAKELIKE PINK,23674 22727,ALARM CLOCK BAKELIKE RED ,12234 22726,ALARM CLOCK BAKELIKE GREEN,5132
Create product table using the DDL query,
CREATE TABLE `retail`.`product` ( `stockcode` VARCHAR(45) NOT NULL, `description` VARCHAR(45) NOT NULL, `quantity` INT, PRIMARY KEY (`stockcode`));
4. Solution
Hadoop does provides various datatypes like IntWritable,FloatWritable, DoubleWritable etc,
but in our case we are going to implement custom OutputWritable which will enable us to write
the data rows to Database table.
In order to implement Custom Output writable, we have to implement DBWritable interface.
Reading RDBMS requires database connection from hadoop to MySql, so we need to place the relevant jar file to Hadoop’s lib folder,
so in our case we have copied the mysql-connector-java-5.1.5.jar file to HADOOP common’s lib folder.
We have to copy the jar file to Hadoop Task Tracker machines and at the machine from where we are launching job from.
Once you copied the jar file, you need to restart the cluster.
We are going to use following 4 Java files for this example,
4.1 Build File : pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jdz</groupId> <artifactId>HadoopDBOutputWritable</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>HadoopDBOutputWritable</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop.version>2.6.0</hadoop.version> <jdk.version>1.7</jdk.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> </configuration> </plugin> </plugins> </build> </project>
4.2 Driver Code : DBDriver.java
package com.jdz; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DBDriver extends Configured implements Tool{ public int run(String[] args) throws Exception { Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", // driver class "jdbc:mysql://localhost:3306/retail", // db url "root", // user name "root"); //password Job job = Job.getInstance(conf); job.setJarByClass(DBDriver.class); job.setMapperClass(DBMapper.class); job.setReducerClass(DBReducer.class); // job.setReducerClass(DBReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(DBOutputWritable.class); job.setOutputValueClass(NullWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(DBOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); DBOutputFormat.setOutput(job, "product", //output table name new String[] {"stockcode","description","quantity"} //table columns ); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) { try{ int result = ToolRunner.run(new Configuration(), new DBDriver(), args); System.out.println("job status ::"+result); } catch(Exception exception) { exception.printStackTrace(); } } }
4.2 Mapper Code : DBMapper.java
package com.jdz; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DBMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable id, Text value, Context context) { try { String[] productValues = value.toString().split(","); context.write(new Text(productValues[0]),value); } catch (Exception exception) { exception.printStackTrace(); } } }
4.3 Custom Database Writable : DBOutputWritable.java
package com.jdz; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; public class DBOutputWritable implements Writable, DBWritable{ private int quantity; private String stockCode; private String description; public DBOutputWritable(){} public DBOutputWritable(String stockCode,String description, int quantity) { this.stockCode=stockCode; this.description = description; this.quantity=quantity; } public void write(PreparedStatement statement) throws SQLException { statement.setString(1, stockCode); statement.setString(2, description); statement.setInt(3, quantity); } public void readFields(ResultSet resultSet) throws SQLException { this.stockCode = resultSet.getString(1); this.description = resultSet.getString(2); this.quantity=resultSet.getInt(3); } public void write(DataOutput out) throws IOException { } public void readFields(DataInput in) throws IOException { } }
4.4 Reducer : DBReducer.java
package com.jdz; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class DBReducer extends Reducer<Text,Text,DBOutputWritable,NullWritable>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, DBOutputWritable, NullWritable>.Context context) throws IOException, InterruptedException { Text finalValue=null; for(Text value : values){ finalValue=value; } String[] productValues = finalValue.toString().split(","); DBOutputWritable productRecord = new DBOutputWritable(productValues[0], productValues[1], Integer.parseInt(productValues[2])); context.write(productRecord, NullWritable.get()); } }
4.5 Copy files from local file system to HDFS
Make sure your Hadoop cluster is up and running.
I have used following command to copy files from local file system HDFS.
hdfs dfs -copyFromLocal product.csv /input/javadeveloperzone/dbwritable
5. Build & Run Application
Now build the Jar file which we are going to submit to Hadoop cluster.
Once the Jar file building is completed, we can use following command to run job on Hadoop cluster.
hadoop jar HadoopDBOutputWritable.jar /input/javadeveloperzone/dbwritable
6. Output
Once the job is completed successfully, you may run SELECT query on product table that we created in step 3 and the output will look like,
7. Source Code
You can download the source code of tutorial how to write data to database using Hadoop at our git repository,
which can be baby step for writing complex Hadoop MapReduce programs using Java.
If you’re looking to get your hands on big data, Hadoop is the platform for you. Hadoop helps you assemble, process, and analyze Big data sets quickly and efficiently. With Hadoop, you can write code to interact with data stored in HDFS (Hadoop Distributed File System), MapReduce, or Sqoop. You can also use Hive to query data stored in Hadoop.