

Table of Contents
1. Overview
Elastic Search support bulk index, delete, update operation using Bulk API. In this article, we will discuss how to index documents from the database using Elastic Search Transport Java Client Bulk API. In this article, we will use the MYSQL database to demonstrate document indexing.
Bulk indexing helps us to improve indexing performance as we can be indexed multiple documents in a single API call.
2. Development Environment
Elastic Search: 6.6.0
Java: 1.8.0_65
IDE: IntelliJ Idea
Build Tool: Maven
3. Steps to Index Document From Database
Now we will discuss how to use Elastic Search Transport client bulk API with details explanations to index documents from mysql database.
Step 1: Create Table
Create a table in MySQL database. In this example, we have created a document table as below.
CREATE TABLE `document` ( `docId` int(11) NOT NULL, `docType` varchar(255) DEFAULT NULL, `docTitle` varchar(255) DEFAULT NULL, `docAuthor` varchar(255) DEFAULT NULL, `docLanguage` varchar(45) DEFAULT NULL, `numberOfPage` int(11) DEFAULT NULL, `lastIndexDate` datetime DEFAULT NULL, PRIMARY KEY (`docId`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
Step 2: Insert Records
Next step is to insert records in the newly created table or you can use existing tables if you already have some data into it for indexing.
Step 3: Create a Maven Project
Step 4: Add the required dependency in a project.
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
Step 5: Initialize transport client
TransportClient class is used to communicate with an Elastic Search cluster. It connects the ES cluster using a transport module. Here is the sample code to the initialize client.
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_1}"), {PORT})); //1st ES Node host and port
.addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_2"), {PORT})); //2nd ES Node host and portStep 6: create an index and Index type and prepare JSON
The next step is to create Index and Index type in Elastic Search. Transport Client automatically index and index type, if it does not exists when you submit any document for indexing.
Step 7: Open MySQL Database Connection
Next step is to open the MySQL database. Here we need to load Driver for MySQL database and prepare a connection. Refer below code for more details.
//Class.forName("com.mysql.jdbc.Driver");
Class.forName("com.mysql.cj.jdbc.Driver");
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bulkindexing","root","jdzone");
Step 8: Execute MySQL Database Query
Prepare query which gives a result that we want to index in Elastic Search. In our example, we have also added one column LastIndexDate to identify which records are pending.Once Indexing completed we will update LastIndexDate column to current date.
SELECT d.docId,d.docType,d.docTitle,d.docAuthor,d.docLanguage,d.numberOfPage FROM document d where lastIndexDate>'2019-02-01 00:00:00'
Step 9: Initialize BulkRequestBuilder
Elastic search bulk API provide BulkRequestBuilder class for bulk operation. TransportClient prepateBulk method is used to initialize BulkRequestBuilder
Step 10: Prepare JSON Document
As per the Elastic Search Index API Documentations, there are several ways to generate JSON document, out of these options in our example we have used JSON Builder to construct the document. Refer below syntax to construct JSON Document.
XContentBuilder builder = jsonBuilder()
.startObject()
.field("{FIELD_NAME_1}", "{FIELD_VALUE_1}")
.field("{FIELD_NAME_2}", "{FIELD_VALUE_2}")
.field("{FIELD_NAME_3}", "{FIELD_VALUE_3}")
.endObject()Step 11: Iterate esultSet
Iterate all the records from resultset, prepare JSON document and add it to a bulk request using addmethod.
Step 12: execute bulkRequest
After adding the certain number of documents in the bulk request call execute method to add all the document to Elastic Search.
4. Example
4.1 Sample Input

4.2 IndexDocumentDatabase
package com.javadeveloperzone;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.io.IOException;
import java.net.InetAddress;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class IndexDocumentDatabase {
String indexName,indexTypeName;
TransportClient client = null;
DatabaseHelper databaseHelper = null;
public static final String DOC_LANGUAGE = "docLanguage";
public static final String DOC_TITLE = "docTitle";
public static final String DOC_AUTHOR = "docAuthor";
public static final String DOC_TYPE = "docType";
public static final String DOC_ID = "docId";
public static void main(String[] args) {
IndexDocumentDatabase indexDocumentDatabase = new IndexDocumentDatabase();
indexDocumentDatabase.index();
}
public void index(){
try {
initEStransportClinet(); //init transport client
databaseHelper.openMySqlDbConnection(); //open MySQL database connection
databaseBulkImport(); //fetch data from database and send to elastic search
//using bulk import
//update records
databaseHelper.updateRecords();
refreshIndices(); //refresh indices
search(); //search indexed document
}catch (Exception e){
e.printStackTrace();
}finally {
databaseHelper.closeMySqlDbConnection();
closeTransportClient(); //close transport client
}
}
public IndexDocumentDatabase(){
indexName = "indexdatabaseexamaple";
indexTypeName = "indexdatabasemapping";
databaseHelper = new DatabaseHelper();
}
/*
Method used to init Elastic Search Transprt client,
Return true if it is successfully initialized otherwise false
*/
public boolean initEStransportClinet()
{
try {
// un-command this, if you have multiple node
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(
new TransportAddress(InetAddress.getByName("localhost"), 9300));
return true;
} catch (Exception ex) {
//log.error("Exception occurred while getting Client : " + ex, ex);
ex.printStackTrace();
return false;
}
}
public void databaseBulkImport() throws IOException, ExecutionException, InterruptedException, SQLException {
BulkRequestBuilder bulkRequest = client.prepareBulk(); //prepare bulk request
int count=0,noOfBatch=1;
int numberOfRecords = 1;
Connection connection = databaseHelper.getConnection();
Statement statement = connection.createStatement();
String query = "SELECT d.docId," +
"d.docType," +
"d.docTitle," +
"d.docAuthor," +
"d.docLanguage," +
"d.numberOfPage " +
"FROM document d where lastIndexDate>'2019-02-01 00:00:00'";
ResultSet resultSet =
statement.executeQuery(query);
while (resultSet.next()){ //next json array element
try {
XContentBuilder xContentBuilder = jsonBuilder()
.startObject()
.field(DOC_TYPE, resultSet.getString(DOC_TYPE))
.field(DOC_AUTHOR, resultSet.getString(DOC_AUTHOR))
.field(DOC_TITLE, resultSet.getString(DOC_TITLE))
.field(DOC_LANGUAGE, resultSet.getString(DOC_LANGUAGE))
.endObject();
bulkRequest.add(client.prepareIndex(indexName, indexTypeName, resultSet.getString(DOC_ID))
.setSource(xContentBuilder));
if (count==50_000) {
addDocumentToESCluser(bulkRequest, noOfBatch, count);
noOfBatch++;
count = 0;
}
}catch (Exception e) {
e.printStackTrace();
//skip records if wrong date in input file
}
numberOfRecords++;
count++;
}
if(count!=0){ //add remaining documents to ES
addDocumentToESCluser(bulkRequest,noOfBatch,count);
}
resultSet.close();
statement.close();
System.out.println("Total Document Indexed : "+numberOfRecords);
}
public void addDocumentToESCluser(BulkRequestBuilder bulkRequest,int noOfBatch,int count){
if(count==0){
//org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
return;
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
System.out.println("Bulk Indexing failed for Batch : "+noOfBatch);
// process failures by iterating through each bulk response item
int numberOfDocFailed = 0;
Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
while (iterator.hasNext()){
BulkItemResponse response = iterator.next();
if(response.isFailed()){
//System.out.println("Failed Id : "+response.getId());
numberOfDocFailed++;
}
}
System.out.println("Out of "+count+" documents, "+numberOfDocFailed+" documents failed");
System.out.println(bulkResponse.buildFailureMessage());
}else{
System.out.println("Bulk Indexing Completed for batch : "+noOfBatch);
}
}
public void refreshIndices(){
client.admin().indices()
.prepareRefresh(indexName)
.get(); //Refresh before search, so you will get latest indices result
}
public void search(){
SearchResponse response = client.prepareSearch(indexName)
.setTypes(indexTypeName)
.get();
//MatchAllDocQuery
System.out.println("Total Hits : "+response.getHits().getTotalHits());
System.out.println(response);
}
public void closeTransportClient(){
if(client!=null){
client.close();
}
}
}
4.3 DatabaseHelper
package com.javadeveloperzone;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class DatabaseHelper {
Connection connection;
public Connection getConnection() {
return connection;
}
/**
* CREATE TABLE `document` (
* `docId` int(11) NOT NULL,
* `docType` varchar(255) DEFAULT NULL,
* `docTitle` varchar(255) DEFAULT NULL,
* `docAuthor` varchar(255) DEFAULT NULL,
* `docLanguage` varchar(45) DEFAULT NULL,
* `numberOfPage` int(11) DEFAULT NULL,
* `lastIndexDate` datetime DEFAULT NULL,
* PRIMARY KEY (`docId`)
* ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
* */
public void openMySqlDbConnection() throws ClassNotFoundException, SQLException {
// Class.forName("com.mysql.jdbc.Driver");
Class.forName("com.mysql.cj.jdbc.Driver");
connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/bulkindexing","root","jdzone");
}
public void updateRecords() throws SQLException {
String query = "update document set lastIndexDate=now() where lastIndexDate>'2019-02-01 00:00:00'";
Statement statement = connection.createStatement();
int recordsUpdated = statement.executeUpdate(query);
System.out.println("Record Updated : "+recordsUpdated);
}
public void closeMySqlDbConnection() {
try{
if(connection!=null){
connection.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
4.4 Output
Bulk Indexing Completed for batch : 1
Total Document Indexed : 520
Record Updated : 520
Total Hits : 520
{
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"_clusters": {
"total": 0,
"successful": 0,
"skipped": 0
},
"hits": {
"total": 520,
"max_score": 1,
"hits": [
{
"_index": "indexdatabaseexamaple",
"_type": "indexdatabasemapping",
"_id": "4",
"_score": 1,
"_source": {
"docType": "docx",
"docAuthor": "Elastic Team",
"docTitle": "Elastic Search Guide",
"docLanguage": "en"
}
}
]
}
}
5. Conclusion
In this article, we have discussed how to index documents to Elastic Search using Elastic Search Transport Client Java API from MySql Database. We have discussed step by step API description of elastic search client api, MySql Java Driver, some admin operation like refresh indices and sample match all document query for basic QC.
6. References
Refer below links for more details:
7. Source Code
You can download source code of Elastic Search Index Document From Database from our git repository.
