1. Overview

Elastic Search support bulk index, delete, update operation using Bulk API. In this article, we will discuss how to index documents from the database using Elastic Search Transport Java Client Bulk API. In this article, we will use the MYSQL database to demonstrate document indexing.

Bulk indexing helps us to improve indexing performance as we can be indexed multiple documents in a single API call.

2. Development Environment

Elastic Search: 6.6.0

Java: 1.8.0_65

IDE: IntelliJ Idea

Build Tool: Maven

3. Steps to Index Document From Database

Now we will discuss how to use Elastic Search Transport client bulk API with details explanations to index documents from mysql database.

Step 1: Create Table

Create a table in MySQL database. In this example, we have created a document table as below.

CREATE TABLE `document` (
  `docId` int(11) NOT NULL,
  `docType` varchar(255) DEFAULT NULL,
  `docTitle` varchar(255) DEFAULT NULL,
  `docAuthor` varchar(255) DEFAULT NULL,
  `docLanguage` varchar(45) DEFAULT NULL,
  `numberOfPage` int(11) DEFAULT NULL,
  `lastIndexDate` datetime DEFAULT NULL,
  PRIMARY KEY (`docId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

Step 2: Insert Records

Next step is to insert records in the newly created table or you can use existing tables if you already have some data into it for indexing.

Step 3: Create a Maven Project

Step 4: Add the required dependency in a project.

<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>6.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.15</version>
</dependency>

Step 5: Initialize transport client

TransportClient class is used to communicate with an Elastic Search cluster. It connects the ES cluster using a transport module. Here is the sample code to the initialize client.

TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_1}"), {PORT})); //1st ES Node host and port
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_2"), {PORT}));  //2nd ES Node host and port

Step 6: create an index and Index type and prepare JSON

The next step is to create Index and Index type in Elastic Search. Transport Client automatically index and index type, if it does not exists when you submit any document for indexing.

Step 7: Open MySQL Database Connection

Next step is to open the MySQL database. Here we need to load Driver for MySQL database and prepare a connection. Refer below code for more details.

//Class.forName("com.mysql.jdbc.Driver");
Class.forName("com.mysql.cj.jdbc.Driver");
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bulkindexing","root","jdzone");

Step 8: Execute MySQL Database Query

Prepare query which gives a result that we want to index in Elastic Search. In our example, we have also added one column LastIndexDate to identify which records are pending.Once Indexing completed we will update LastIndexDate column to current date.

SELECT d.docId,d.docType,d.docTitle,d.docAuthor,d.docLanguage,d.numberOfPage FROM document d where lastIndexDate>'2019-02-01 00:00:00'

Step 9: Initialize BulkRequestBuilder

Elastic search bulk API provide BulkRequestBuilder class for bulk operation. TransportClient prepateBulk method is used to initialize BulkRequestBuilder

Step 10: Prepare JSON Document

As per the Elastic Search Index API Documentations, there are several ways to generate JSON document, out of these options in our example we have used JSON Builder to construct the document. Refer below syntax to construct JSON Document.

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("{FIELD_NAME_1}", "{FIELD_VALUE_1}")
        .field("{FIELD_NAME_2}", "{FIELD_VALUE_2}")
        .field("{FIELD_NAME_3}", "{FIELD_VALUE_3}")
    .endObject()

Step 11: Iterate esultSet

Iterate all the records from resultset, prepare JSON document and add it to a bulk request using addmethod.

Step 12: execute bulkRequest

After adding the certain number of documents in the bulk request call execute method to add all the document to Elastic Search.

4. Example

4.1 Sample Input

Elastic Search Index Data From Database

4.2 IndexDocumentDatabase

package com.javadeveloperzone;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.IOException;
import java.net.InetAddress;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class IndexDocumentDatabase {

    String indexName,indexTypeName;
    TransportClient client = null;
    DatabaseHelper databaseHelper = null;

    public static final String DOC_LANGUAGE = "docLanguage";
    public static final String DOC_TITLE = "docTitle";
    public static final String DOC_AUTHOR = "docAuthor";
    public static final String DOC_TYPE = "docType";
    public static final String DOC_ID = "docId";

    public static void main(String[] args) {
        IndexDocumentDatabase indexDocumentDatabase = new IndexDocumentDatabase();
        indexDocumentDatabase.index();
    }

    public void index(){

        try {
            initEStransportClinet(); //init transport client

            databaseHelper.openMySqlDbConnection(); //open MySQL database connection
            databaseBulkImport(); //fetch data from database and send to elastic search
            //using bulk import
            //update records
            databaseHelper.updateRecords();

            refreshIndices(); //refresh indices

            search(); //search indexed document
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            databaseHelper.closeMySqlDbConnection();
            closeTransportClient(); //close transport client
        }
    }

    public IndexDocumentDatabase(){
        indexName = "indexdatabaseexamaple";
        indexTypeName = "indexdatabasemapping";
        databaseHelper = new DatabaseHelper();
    }

    /*
    Method used to init Elastic Search Transprt client,
    Return true if it is successfully initialized otherwise false
     */
    public boolean initEStransportClinet()
    {
        try {
            // un-command this, if you have multiple node
            client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(
                            new TransportAddress(InetAddress.getByName("localhost"), 9300));

            return true;
        } catch (Exception ex) {
            //log.error("Exception occurred while getting Client : " + ex, ex);
            ex.printStackTrace();
            return false;
        }
    }


    public void databaseBulkImport() throws IOException, ExecutionException, InterruptedException, SQLException {


        BulkRequestBuilder bulkRequest = client.prepareBulk(); //prepare bulk request
        int count=0,noOfBatch=1;
        int numberOfRecords = 1;
        Connection connection = databaseHelper.getConnection();
        Statement statement = connection.createStatement();
        String query = "SELECT d.docId," +
                "d.docType," +
                "d.docTitle," +
                "d.docAuthor," +
                "d.docLanguage," +
                "d.numberOfPage " +
                "FROM document d where lastIndexDate>'2019-02-01 00:00:00'";
        ResultSet resultSet =
                statement.executeQuery(query);

        while (resultSet.next()){ //next json array element
            try {
                XContentBuilder xContentBuilder = jsonBuilder()
                        .startObject()
                        .field(DOC_TYPE, resultSet.getString(DOC_TYPE))
                        .field(DOC_AUTHOR, resultSet.getString(DOC_AUTHOR))
                        .field(DOC_TITLE, resultSet.getString(DOC_TITLE))
                        .field(DOC_LANGUAGE, resultSet.getString(DOC_LANGUAGE))
                        .endObject();

                bulkRequest.add(client.prepareIndex(indexName, indexTypeName, resultSet.getString(DOC_ID))
                        .setSource(xContentBuilder));

                if (count==50_000) {
                    addDocumentToESCluser(bulkRequest, noOfBatch, count);
                    noOfBatch++;
                    count = 0;
                }
            }catch (Exception e) {
                e.printStackTrace();
                //skip records if wrong date in input file
            }
            numberOfRecords++;
            count++;
        }
        if(count!=0){ //add remaining documents to ES
            addDocumentToESCluser(bulkRequest,noOfBatch,count);
        }
        resultSet.close();
        statement.close();
        System.out.println("Total Document Indexed : "+numberOfRecords);
    }

    public void addDocumentToESCluser(BulkRequestBuilder bulkRequest,int noOfBatch,int count){

        if(count==0){
            //org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
            return;
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if (bulkResponse.hasFailures()) {
            System.out.println("Bulk Indexing failed for Batch : "+noOfBatch);

            // process failures by iterating through each bulk response item
            int numberOfDocFailed = 0;
            Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
            while (iterator.hasNext()){
                BulkItemResponse response = iterator.next();
                if(response.isFailed()){
                    //System.out.println("Failed Id : "+response.getId());
                    numberOfDocFailed++;
                }
            }
            System.out.println("Out of "+count+" documents, "+numberOfDocFailed+" documents failed");
            System.out.println(bulkResponse.buildFailureMessage());
        }else{
            System.out.println("Bulk Indexing Completed for batch : "+noOfBatch);
        }
    }


    public void refreshIndices(){
        client.admin().indices()
                .prepareRefresh(indexName)
                .get(); //Refresh before search, so you will get latest indices result
    }

    public void search(){

        SearchResponse response = client.prepareSearch(indexName)
                .setTypes(indexTypeName)
                .get();
        //MatchAllDocQuery
        System.out.println("Total Hits : "+response.getHits().getTotalHits());
        System.out.println(response);
    }

    public void closeTransportClient(){
        if(client!=null){
            client.close();
        }
    }

}

 

4.3 DatabaseHelper

package com.javadeveloperzone;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class DatabaseHelper {

    Connection connection;

    public Connection getConnection() {
        return connection;
    }

    /**
     * CREATE TABLE `document` (
     *   `docId` int(11) NOT NULL,
     *   `docType` varchar(255) DEFAULT NULL,
     *   `docTitle` varchar(255) DEFAULT NULL,
     *   `docAuthor` varchar(255) DEFAULT NULL,
     *   `docLanguage` varchar(45) DEFAULT NULL,
     *   `numberOfPage` int(11) DEFAULT NULL,
     *   `lastIndexDate` datetime DEFAULT NULL,
     *   PRIMARY KEY (`docId`)
     * ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
     * */

    public void openMySqlDbConnection() throws ClassNotFoundException, SQLException {
//        Class.forName("com.mysql.jdbc.Driver");
        Class.forName("com.mysql.cj.jdbc.Driver");
        connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/bulkindexing","root","jdzone");

    }

    public void updateRecords() throws SQLException {
        String query = "update document set lastIndexDate=now() where lastIndexDate>'2019-02-01 00:00:00'";
        Statement statement = connection.createStatement();
        int recordsUpdated = statement.executeUpdate(query);
        System.out.println("Record Updated : "+recordsUpdated);
    }
    public void closeMySqlDbConnection() {
        try{
            if(connection!=null){
                connection.close();
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

4.4 Output

Bulk Indexing Completed for batch : 1
Total Document Indexed : 520
Record Updated : 520
Total Hits : 520

{
  "took": 4,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "_clusters": {
    "total": 0,
    "successful": 0,
    "skipped": 0
  },
  "hits": {
    "total": 520,
    "max_score": 1,
    "hits": [
      {
        "_index": "indexdatabaseexamaple",
        "_type": "indexdatabasemapping",
        "_id": "4",
        "_score": 1,
        "_source": {
          "docType": "docx",
          "docAuthor": "Elastic Team",
          "docTitle": "Elastic Search Guide",
          "docLanguage": "en"
        }
      }
    ]
  }
}

 

5. Conclusion

In this article, we have discussed how to index documents to Elastic Search using Elastic Search Transport Client Java API from MySql Database. We have discussed step by step API description of elastic search client api, MySql Java Driver, some admin operation like refresh indices and sample match all document query for basic QC.

6. References

Refer below links for more details:

7. Source Code

You can download source code of Elastic Search Index Document From Database from our git repository.

Was this post helpful?
Let us know, if you liked the post. Only in this way, we can improve us.
Yes
No

Leave a Reply

Your email address will not be published. Required fields are marked *