1. Overview

Elastic Search support bulk index, delete, update operation using Bulk API. In this article, we will discuss how to do bulk indexing in details. In our previous article we have discussed how to use Elastic Seach Transport Client Java API to index single document same way in this post we will cover bulk indexing functionality.

Bulk indexing helps us to improve indexing performance as we can be indexed multiple documents in a single API call.

2. Development Environment

Elastic Search: 6.2.4

Java: 1.8.0_65

IDE: IntelliJ Idea

Build Tool: Maven

3. Steps to Index Multiple Documents using Transport Client

Now we will discuss how to use Elastic Search Transport client bulk API with details explanations.

Step 1: Create Maven Project

Step 2: Add elastic-search-transport-client dependency in a project.

Elastic Search team provides client APIs to communicate with the elastic search for Java, C# .NET, Python etc… In this article, we will discuss Java client of Elastic Search. Add below dependency in your project.

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>6.2.4</version>
</dependency>

Step 3: Initialize transport client

TransportClient class is used to communicate with Elastic Search cluster. It connects ES cluster using transport module. Here is the sample code to the initialize client.

TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_1}"), {PORT})); //1st ES Node host and port
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_2"), {PORT}));  //2nd ES Node host and port

Step 4: create an index and Index type and prepare JSON

The next step is to create Index and Index type in Elastic Search. Transport Client automatically index and index type, if it does not exists when you submit any document for indexing.

Step 5: Initialize BulkRequestBuilder

Elastic search bulk API provide BulkRequestBuilder class for bulk operation. TransportClient prepateBulk method is used to initialize BulkRequestBuilder

Step 6: Prepare JSON Document

As per the Elastic Search Index API Documentations there are several ways to generate JSON document, out of these options in our example we have used JSON Builder to construct the document. Refer below syntax to construct JSON Document.

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("{FIELD_NAME_1}", "{FIELD_VALUE_1}")
        .field("{FIELD_NAME_2}", "{FIELD_VALUE_2}")
        .field("{FIELD_NAME_3}", "{FIELD_VALUE_3}")
    .endObject()

Step 7: add to bulkRequest

Iterate over all the lines in csv file, prepate json document and add it to bulk request using addmethod.

Step 8: execute bulkRequest

After adding the certain number of documents in bulk request call execute method to add all the document to Elastic Search.

4. Example

Here is the complete example of import CSV file into ElasticSearch using Elastic Search Transport Client.

Input File

Refer below CSV file which we have used in this example. We have indexed four fields, docTitle, docType,docPage and docModifiedDate using Bulk API.

Id,docType,docTitle,docPage
1,pdf,Elastic Search example,3
2,doc,Java developer zone,1
3,msg,Elastic Search Indexing example,3
4,pdf,Bulk Import Example,6
5,pdf,ES Reference guilde,8
6,msg,Solr reference guide,4
7,msg,Java developer Elastic search blog,5
8,msg,Java developer solr blogs,6
9,doc,submit resume,3
10,xlsx,Elastic Search example,8

Project Structure

Elastic Search Bulk Indexing Example Project Structure

Elastic Search Bulk Indexing Example Project Structure

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ElasticSearchBulkIndexingExample</groupId>
    <artifactId>ElasticSearchBulkIndexingExample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <description>Elastic Search Bulk Indexing Example</description>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.2.4</version>
        </dependency>
    </dependencies>

</project>

ESBulkIndexingExample.java

package com.javadeveloperzone;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;

import java.util.concurrent.ExecutionException;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
 * Created by JavaDeveloperZone on 6/16/2018.
 */
public class ESBulkIndexingExample {

    String indexName,indexTypeName;
    TransportClient client = null;

    public static void main(String[] args) {
        ESBulkIndexingExample esExample = new ESBulkIndexingExample();
        try {
            esExample.initEStransportClinet(); //init transport client

            esExample.CSVbulkImport(true); //index multiple  document

            esExample.refreshIndices();

            esExample.search(); //search indexed document
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            esExample.closeTransportClient(); //close transport client
        }
    }

    public ESBulkIndexingExample(){
        indexName = "document";
        indexTypeName = "bulkindexing";
    }
    
    /*
    Method used to init Elastic Search Transprt client,
    Return true if it is succesfully intialized otherwise false
     */
    public boolean initEStransportClinet()
    {
        try {
            client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));

            return true;
        } catch (Exception ex) {
            //log.error("Exception occurred while getting Client : " + ex, ex);
            ex.printStackTrace();
            return false;
        }
    }

    public void CSVbulkImport(boolean isHeaderIncluded) throws IOException, ExecutionException, InterruptedException {

        BulkRequestBuilder bulkRequest = client.prepareBulk();

        File file = new File("G:\\study\\Blogs\\Elastic Search\\Bulk Indexing\\Input.csv");
        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
        String line = null;
        int count=0,noOfBatch=1;
        if(bufferedReader!=null && isHeaderIncluded){
            bufferedReader.readLine();//ignore first line
        }
        while ((line = bufferedReader.readLine())!=null){

            if(line.trim().length()==0){
                continue;
            }
            String data [] = line.split(",");
            if(data.length==5){

                try {
                    XContentBuilder xContentBuilder = jsonBuilder()
                            .startObject()
                            .field("docType", data[1])
                            .field("docTitle", data[2])
                            .field("docPage", data[3])
                            .endObject();

                    bulkRequest.add(client.prepareIndex(indexName, indexTypeName, data[0])
                            .setSource(xContentBuilder));

                    if ((count+1) % 500 == 0) {
                        count = 0;
                        addDocumentToESCluser(bulkRequest, noOfBatch, count);
                        noOfBatch++;
                    }
                }catch (Exception e) {
                    e.printStackTrace();
                    //skip records if wrong date in input file
                }
            }else{
                System.out.println("Invalid data : "+line);
            }
            count++;
        }
        bufferedReader.close();
        addDocumentToESCluser(bulkRequest,noOfBatch,count);
    }

    public void addDocumentToESCluser(BulkRequestBuilder bulkRequest,int noOfBatch,int count){

        if(count==0){
            //org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
            return;
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if (bulkResponse.hasFailures()) {
            System.out.println("Bulk Indexing failed for Batch : "+noOfBatch);

            // process failures by iterating through each bulk response item
            int numberOfDocFailed = 0;
            Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
            while (iterator.hasNext()){
                BulkItemResponse response = iterator.next();
                if(response.isFailed()){
                    //System.out.println("Failed Id : "+response.getId());
                    numberOfDocFailed++;
                }
            }
            System.out.println("Out of "+count+" documents, "+numberOfDocFailed+" documents failed");
            System.out.println(bulkResponse.buildFailureMessage());
        }else{
            System.out.println("Bulk Indexing Completed for batch : "+noOfBatch);
        }
    }

    public void refreshIndices(){
        client.admin().indices()
                .prepareRefresh(indexName)
                .get(); //Refresh before search, so you will get latest indices result
    }
    
    public void search(){
        
        SearchResponse response = client.prepareSearch(indexName)
                .setTypes(indexTypeName)
                .get();
        //MatchAllDocQuery
        System.out.println("Total Hits : "+response.getHits().getTotalHits());
        System.out.println(response);
    }

    public void closeTransportClient(){
        if(client!=null){
            client.close();
        }
    }
}

 

Output

Bulk Indexing Completed for batch : 1
Total Hits : 10

Query Elastic Search using Google chrome extension

Here we have used google chrome Elastic Search toolbox extension to query elastic search and verify that record indexed properly or not. Refer below screenshot to build the simple query using Elastic Search toolbox.

Elastic Search Bulk Indexing Example Query

Elastic Search Bulk Indexing Example Query

once we execute above query it will return all the documents which docType is pdf as below.

{
  "took": 22,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": 1.2039728,
    "hits": [
      {
        "_index": "document",
        "_type": "bulkindexing",
        "_id": "5",
        "_score": 1.2039728,
        "_source": {
          "docType": "pdf",
          "docTitle": "ES Reference guide",
          "docPage": "3"
        }
      },
      {
        "_index": "document",
        "_type": "bulkindexing",
        "_id": "4",
        "_score": 0.9808292,
        "_source": {
          "docType": "pdf",
          "docTitle": "Elastic Search example",
          "docPage": "3"
        }
      },
      {
        "_index": "document",
        "_type": "bulkindexing",
        "_id": "1",
        "_score": 0.6931472,
        "_source": {
          "docType": "pdf",
          "docTitle": "Elastic Search example",
          "docPage": "3"
        }
      }
    ]
  }
}

5. Conclusion

In this article, we have discussed how to import CSV file in Elastic Search using Bulk API. We have discussed step by step API description for better indexing.

6. References

Refer below links for more details:

7. Source Code

Elastic Search Bulk Indexing Example

Was this post helpful?
Let us know, if you liked the post. Only in this way, we can improve us.
Yes
No

Leave a Reply

Your email address will not be published. Required fields are marked *