

Table of Contents
1. Overview
Elastic Search support bulk index, delete, update operation using Bulk API. In this article, we will discuss how to do bulk indexing in details. In our previous article we have discussed how to use Elastic Seach Transport Client Java API
to index single document same way in this post we will cover bulk indexing functionality.
Bulk indexing helps us to improve indexing performance
as we can be indexed multiple documents in a single API call.
2. Development Environment
Elastic Search: 6.2.4
Java: 1.8.0_65
IDE: IntelliJ Idea
Build Tool: Maven
3. Steps to Index Multiple Documents using Transport Client
Now we will discuss how to use Elastic Search Transport client bulk API with details explanations.
Step 1: Create Maven Project
Step 2: Add elastic-search-transport-client dependency in a project.
Elastic Search team provides client APIs to communicate with the elastic search for Java, C# .NET, Python etc… In this article, we will discuss Java client of Elastic Search. Add below dependency in your project.
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.2.4</version> </dependency>
Step 3: Initialize transport client
TransportClient class is used to communicate with Elastic Search cluster. It connects ES cluster using transport module. Here is the sample code to the initialize client.
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_1}"), {PORT})); //1st ES Node host and port .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_2"), {PORT})); //2nd ES Node host and port
Step 4: create an index and Index type and prepare JSON
The next step is to create Index and Index type in Elastic Search. Transport Client automatically index and index type, if it does not exists when you submit any document for indexing.
Step 5: Initialize BulkRequestBuilder
Elastic search bulk API provide BulkRequestBuilder
class for bulk operation. TransportClient
prepateBulk
method is used to initialize BulkRequestBuilder
Step 6: Prepare JSON Document
As per the Elastic Search Index API Documentations there are several ways to generate JSON document, out of these options in our example we have used JSON Builder
to construct the document. Refer below syntax to construct JSON Document.
XContentBuilder builder = jsonBuilder() .startObject() .field("{FIELD_NAME_1}", "{FIELD_VALUE_1}") .field("{FIELD_NAME_2}", "{FIELD_VALUE_2}") .field("{FIELD_NAME_3}", "{FIELD_VALUE_3}") .endObject()
Step 7: add to bulkRequest
Iterate over all the lines in csv file, prepate json document and add it to bulk request using add
method.
Step 8: execute bulkRequest
After adding the certain number of documents in bulk request call execute
method to add all the document to Elastic Search.
4. Example
Here is the complete example of import CSV file into ElasticSearch using Elastic Search Transport Client.
Input File
Refer below CSV file which we have used in this example. We have indexed four fields, docTitle, docType,docPage and docModifiedDate using Bulk API.
Id,docType,docTitle,docPage 1,pdf,Elastic Search example,3 2,doc,Java developer zone,1 3,msg,Elastic Search Indexing example,3 4,pdf,Bulk Import Example,6 5,pdf,ES Reference guilde,8 6,msg,Solr reference guide,4 7,msg,Java developer Elastic search blog,5 8,msg,Java developer solr blogs,6 9,doc,submit resume,3 10,xlsx,Elastic Search example,8
Project Structure

Elastic Search Bulk Indexing Example Project Structure
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ElasticSearchBulkIndexingExample</groupId> <artifactId>ElasticSearchBulkIndexingExample</artifactId> <version>1.0-SNAPSHOT</version> <description>Elastic Search Bulk Indexing Example</description> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.2.4</version> </dependency> </dependencies> </project>
ESBulkIndexingExample.java
package com.javadeveloperzone; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.InetAddress; import java.util.Iterator; import java.util.concurrent.ExecutionException; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** * Created by JavaDeveloperZone on 6/16/2018. */ public class ESBulkIndexingExample { String indexName,indexTypeName; TransportClient client = null; public static void main(String[] args) { ESBulkIndexingExample esExample = new ESBulkIndexingExample(); try { esExample.initEStransportClinet(); //init transport client esExample.CSVbulkImport(true); //index multiple document esExample.refreshIndices(); esExample.search(); //search indexed document }catch (Exception e){ e.printStackTrace(); }finally { esExample.closeTransportClient(); //close transport client } } public ESBulkIndexingExample(){ indexName = "document"; indexTypeName = "bulkindexing"; } /* Method used to init Elastic Search Transprt client, Return true if it is succesfully intialized otherwise false */ public boolean initEStransportClinet() { try { client = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); return true; } catch (Exception ex) { //log.error("Exception occurred while getting Client : " + ex, ex); ex.printStackTrace(); return false; } } public void CSVbulkImport(boolean isHeaderIncluded) throws IOException, ExecutionException, InterruptedException { BulkRequestBuilder bulkRequest = client.prepareBulk(); File file = new File("G:\\study\\Blogs\\Elastic Search\\Bulk Indexing\\Input.csv"); BufferedReader bufferedReader = new BufferedReader(new FileReader(file)); String line = null; int count=0,noOfBatch=1; if(bufferedReader!=null && isHeaderIncluded){ bufferedReader.readLine();//ignore first line } while ((line = bufferedReader.readLine())!=null){ if(line.trim().length()==0){ continue; } String data [] = line.split(","); if(data.length==5){ try { XContentBuilder xContentBuilder = jsonBuilder() .startObject() .field("docType", data[1]) .field("docTitle", data[2]) .field("docPage", data[3]) .endObject(); bulkRequest.add(client.prepareIndex(indexName, indexTypeName, data[0]) .setSource(xContentBuilder)); if ((count+1) % 500 == 0) { count = 0; addDocumentToESCluser(bulkRequest, noOfBatch, count); noOfBatch++; } }catch (Exception e) { e.printStackTrace(); //skip records if wrong date in input file } }else{ System.out.println("Invalid data : "+line); } count++; } bufferedReader.close(); addDocumentToESCluser(bulkRequest,noOfBatch,count); } public void addDocumentToESCluser(BulkRequestBuilder bulkRequest,int noOfBatch,int count){ if(count==0){ //org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added; return; } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { System.out.println("Bulk Indexing failed for Batch : "+noOfBatch); // process failures by iterating through each bulk response item int numberOfDocFailed = 0; Iterator<BulkItemResponse> iterator = bulkResponse.iterator(); while (iterator.hasNext()){ BulkItemResponse response = iterator.next(); if(response.isFailed()){ //System.out.println("Failed Id : "+response.getId()); numberOfDocFailed++; } } System.out.println("Out of "+count+" documents, "+numberOfDocFailed+" documents failed"); System.out.println(bulkResponse.buildFailureMessage()); }else{ System.out.println("Bulk Indexing Completed for batch : "+noOfBatch); } } public void refreshIndices(){ client.admin().indices() .prepareRefresh(indexName) .get(); //Refresh before search, so you will get latest indices result } public void search(){ SearchResponse response = client.prepareSearch(indexName) .setTypes(indexTypeName) .get(); //MatchAllDocQuery System.out.println("Total Hits : "+response.getHits().getTotalHits()); System.out.println(response); } public void closeTransportClient(){ if(client!=null){ client.close(); } } }
Output
Bulk Indexing Completed for batch : 1 Total Hits : 10
Query Elastic Search using Google chrome extension
Here we have used google chrome Elastic Search toolbox extension to query elastic search and verify that record indexed properly or not. Refer below screenshot to build the simple query using Elastic Search toolbox.

Elastic Search Bulk Indexing Example Query
once we execute above query it will return all the documents which docType is pdf as below.
{ "took": 22, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 3, "max_score": 1.2039728, "hits": [ { "_index": "document", "_type": "bulkindexing", "_id": "5", "_score": 1.2039728, "_source": { "docType": "pdf", "docTitle": "ES Reference guide", "docPage": "3" } }, { "_index": "document", "_type": "bulkindexing", "_id": "4", "_score": 0.9808292, "_source": { "docType": "pdf", "docTitle": "Elastic Search example", "docPage": "3" } }, { "_index": "document", "_type": "bulkindexing", "_id": "1", "_score": 0.6931472, "_source": { "docType": "pdf", "docTitle": "Elastic Search example", "docPage": "3" } } ] } }
5. Conclusion
In this article, we have discussed how to import CSV file in Elastic Search using Bulk API. We have discussed step by step API description for better indexing.
6. References
Refer below links for more details:
7. Source Code
Elastic Search Bulk Indexing Example
You can also download the source code of CSV Bulk Import Example and other useful examples from our git repository.