

Table of Contents
1. Overview
Elastic Search support bulk index, delete, update operation using Bulk API. In this article, we will discuss how to index documents from the database using Elastic Search Transport Java Client
Bulk API. In this article, we will use the MYSQL database to demonstrate document indexing.
Bulk indexing helps us to improve indexing performance
as we can be indexed multiple documents in a single API call.
2. Development Environment
Elastic Search: 6.6.0
Java: 1.8.0_65
IDE: IntelliJ Idea
Build Tool: Maven
3. Steps to Index Document From Database
Now we will discuss how to use Elastic Search Transport client bulk API with details explanations to index documents from mysql database.
Step 1: Create Table
Create a table in MySQL
database. In this example, we have created a document
table as below.
CREATE TABLE `document` ( `docId` int(11) NOT NULL, `docType` varchar(255) DEFAULT NULL, `docTitle` varchar(255) DEFAULT NULL, `docAuthor` varchar(255) DEFAULT NULL, `docLanguage` varchar(45) DEFAULT NULL, `numberOfPage` int(11) DEFAULT NULL, `lastIndexDate` datetime DEFAULT NULL, PRIMARY KEY (`docId`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
Step 2: Insert Records
Next step is to insert records in the newly created table or you can use existing tables if you already have some data into it for indexing.
Step 3: Create a Maven Project
Step 4: Add the required dependency in a project.
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.6.0</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.15</version> </dependency>
Step 5: Initialize transport client
TransportClient class is used to communicate with an Elastic Search cluster. It connects the ES cluster using a transport module. Here is the sample code to the initialize client.
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_1}"), {PORT})); //1st ES Node host and port .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_2"), {PORT})); //2nd ES Node host and port
Step 6: create an index and Index type and prepare JSON
The next step is to create Index and Index type in Elastic Search. Transport Client automatically index and index type, if it does not exists when you submit any document for indexing.
Step 7: Open MySQL Database Connection
Next step is to open the MySQL database. Here we need to load Driver for MySQL database and prepare a connection. Refer below code for more details.
//Class.forName("com.mysql.jdbc.Driver"); Class.forName("com.mysql.cj.jdbc.Driver"); connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bulkindexing","root","jdzone");
Step 8: Execute MySQL Database Query
Prepare query which gives a result that we want to index in Elastic Search. In our example, we have also added one column LastIndexDate
to identify which records are pending.Once Indexing completed we will update LastIndexDate column to current date.
SELECT d.docId,d.docType,d.docTitle,d.docAuthor,d.docLanguage,d.numberOfPage FROM document d where lastIndexDate>'2019-02-01 00:00:00'
Step 9: Initialize BulkRequestBuilder
Elastic search bulk API provide BulkRequestBuilder
class for bulk operation. TransportClient
prepateBulk
method is used to initialize BulkRequestBuilder
Step 10: Prepare JSON Document
As per the Elastic Search Index API Documentations, there are several ways to generate JSON document, out of these options in our example we have used JSON Builder
to construct the document. Refer below syntax to construct JSON Document.
XContentBuilder builder = jsonBuilder() .startObject() .field("{FIELD_NAME_1}", "{FIELD_VALUE_1}") .field("{FIELD_NAME_2}", "{FIELD_VALUE_2}") .field("{FIELD_NAME_3}", "{FIELD_VALUE_3}") .endObject()
Step 11: Iterate esultSet
Iterate all the records from resultset, prepare JSON document and add it to a bulk request using add
method.
Step 12: execute bulkRequest
After adding the certain number of documents in the bulk request call execute
method to add all the document to Elastic Search.
4. Example
4.1 Sample Input
4.2 IndexDocumentDatabase
package com.javadeveloperzone; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.io.IOException; import java.net.InetAddress; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Iterator; import java.util.concurrent.ExecutionException; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public class IndexDocumentDatabase { String indexName,indexTypeName; TransportClient client = null; DatabaseHelper databaseHelper = null; public static final String DOC_LANGUAGE = "docLanguage"; public static final String DOC_TITLE = "docTitle"; public static final String DOC_AUTHOR = "docAuthor"; public static final String DOC_TYPE = "docType"; public static final String DOC_ID = "docId"; public static void main(String[] args) { IndexDocumentDatabase indexDocumentDatabase = new IndexDocumentDatabase(); indexDocumentDatabase.index(); } public void index(){ try { initEStransportClinet(); //init transport client databaseHelper.openMySqlDbConnection(); //open MySQL database connection databaseBulkImport(); //fetch data from database and send to elastic search //using bulk import //update records databaseHelper.updateRecords(); refreshIndices(); //refresh indices search(); //search indexed document }catch (Exception e){ e.printStackTrace(); }finally { databaseHelper.closeMySqlDbConnection(); closeTransportClient(); //close transport client } } public IndexDocumentDatabase(){ indexName = "indexdatabaseexamaple"; indexTypeName = "indexdatabasemapping"; databaseHelper = new DatabaseHelper(); } /* Method used to init Elastic Search Transprt client, Return true if it is successfully initialized otherwise false */ public boolean initEStransportClinet() { try { // un-command this, if you have multiple node client = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress( new TransportAddress(InetAddress.getByName("localhost"), 9300)); return true; } catch (Exception ex) { //log.error("Exception occurred while getting Client : " + ex, ex); ex.printStackTrace(); return false; } } public void databaseBulkImport() throws IOException, ExecutionException, InterruptedException, SQLException { BulkRequestBuilder bulkRequest = client.prepareBulk(); //prepare bulk request int count=0,noOfBatch=1; int numberOfRecords = 1; Connection connection = databaseHelper.getConnection(); Statement statement = connection.createStatement(); String query = "SELECT d.docId," + "d.docType," + "d.docTitle," + "d.docAuthor," + "d.docLanguage," + "d.numberOfPage " + "FROM document d where lastIndexDate>'2019-02-01 00:00:00'"; ResultSet resultSet = statement.executeQuery(query); while (resultSet.next()){ //next json array element try { XContentBuilder xContentBuilder = jsonBuilder() .startObject() .field(DOC_TYPE, resultSet.getString(DOC_TYPE)) .field(DOC_AUTHOR, resultSet.getString(DOC_AUTHOR)) .field(DOC_TITLE, resultSet.getString(DOC_TITLE)) .field(DOC_LANGUAGE, resultSet.getString(DOC_LANGUAGE)) .endObject(); bulkRequest.add(client.prepareIndex(indexName, indexTypeName, resultSet.getString(DOC_ID)) .setSource(xContentBuilder)); if (count==50_000) { addDocumentToESCluser(bulkRequest, noOfBatch, count); noOfBatch++; count = 0; } }catch (Exception e) { e.printStackTrace(); //skip records if wrong date in input file } numberOfRecords++; count++; } if(count!=0){ //add remaining documents to ES addDocumentToESCluser(bulkRequest,noOfBatch,count); } resultSet.close(); statement.close(); System.out.println("Total Document Indexed : "+numberOfRecords); } public void addDocumentToESCluser(BulkRequestBuilder bulkRequest,int noOfBatch,int count){ if(count==0){ //org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added; return; } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { System.out.println("Bulk Indexing failed for Batch : "+noOfBatch); // process failures by iterating through each bulk response item int numberOfDocFailed = 0; Iterator<BulkItemResponse> iterator = bulkResponse.iterator(); while (iterator.hasNext()){ BulkItemResponse response = iterator.next(); if(response.isFailed()){ //System.out.println("Failed Id : "+response.getId()); numberOfDocFailed++; } } System.out.println("Out of "+count+" documents, "+numberOfDocFailed+" documents failed"); System.out.println(bulkResponse.buildFailureMessage()); }else{ System.out.println("Bulk Indexing Completed for batch : "+noOfBatch); } } public void refreshIndices(){ client.admin().indices() .prepareRefresh(indexName) .get(); //Refresh before search, so you will get latest indices result } public void search(){ SearchResponse response = client.prepareSearch(indexName) .setTypes(indexTypeName) .get(); //MatchAllDocQuery System.out.println("Total Hits : "+response.getHits().getTotalHits()); System.out.println(response); } public void closeTransportClient(){ if(client!=null){ client.close(); } } }
4.3 DatabaseHelper
package com.javadeveloperzone; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; public class DatabaseHelper { Connection connection; public Connection getConnection() { return connection; } /** * CREATE TABLE `document` ( * `docId` int(11) NOT NULL, * `docType` varchar(255) DEFAULT NULL, * `docTitle` varchar(255) DEFAULT NULL, * `docAuthor` varchar(255) DEFAULT NULL, * `docLanguage` varchar(45) DEFAULT NULL, * `numberOfPage` int(11) DEFAULT NULL, * `lastIndexDate` datetime DEFAULT NULL, * PRIMARY KEY (`docId`) * ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; * */ public void openMySqlDbConnection() throws ClassNotFoundException, SQLException { // Class.forName("com.mysql.jdbc.Driver"); Class.forName("com.mysql.cj.jdbc.Driver"); connection = DriverManager.getConnection( "jdbc:mysql://localhost:3306/bulkindexing","root","jdzone"); } public void updateRecords() throws SQLException { String query = "update document set lastIndexDate=now() where lastIndexDate>'2019-02-01 00:00:00'"; Statement statement = connection.createStatement(); int recordsUpdated = statement.executeUpdate(query); System.out.println("Record Updated : "+recordsUpdated); } public void closeMySqlDbConnection() { try{ if(connection!=null){ connection.close(); } }catch (Exception e) { e.printStackTrace(); } } }
4.4 Output
Bulk Indexing Completed for batch : 1 Total Document Indexed : 520 Record Updated : 520 Total Hits : 520 { "took": 4, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "_clusters": { "total": 0, "successful": 0, "skipped": 0 }, "hits": { "total": 520, "max_score": 1, "hits": [ { "_index": "indexdatabaseexamaple", "_type": "indexdatabasemapping", "_id": "4", "_score": 1, "_source": { "docType": "docx", "docAuthor": "Elastic Team", "docTitle": "Elastic Search Guide", "docLanguage": "en" } } ] } }
5. Conclusion
In this article, we have discussed how to index documents to Elastic Search using Elastic Search Transport Client Java API from MySql Database. We have discussed step by step API description of elastic search client api, MySql Java Driver, some admin operation like refresh indices and sample match all document query for basic QC.
6. References
Refer below links for more details:
7. Source Code
You can download source code of Elastic Search Index Document From Database from our git repository.