

Table of Contents
1. Overview
Elastic Search support bulk index, delete, update operation using Bulk API. In this article, we will discuss how to index a large JSON file using GSON and Elastic Search Bulk API. In this article, we will use GSON streaming API to parse large JSON file efficiently and also to avoid OOM error.
Bulk indexing helps us to improve indexing performance
as we can be indexed multiple documents in a single API call.
2. Development Environment
Elastic Search: 6.6.0
Java: 1.8.0_65
IDE: IntelliJ Idea
Build Tool: Maven
3. Steps to Index Large JSON File
Now we will discuss how to use Elastic Search Transport client bulk API with details explanations.
Step 1: Create a Maven Project
Step 2: Add the required dependency in a project.
Elastic Search team provides client APIs to communicate with the elastic search for Java, C# .NET, Python etc… In this article, we will discuss the Java client of Elastic Search and GSON library for JSON streaming API. Add below dependencies in your project.
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.6.0</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.2.4</version> </dependency>
Step 3: Initialize transport client
TransportClient class is used to communicate with an Elastic Search cluster. It connects the ES cluster using a transport module. Here is the sample code to the initialize client.
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_1}"), {PORT})); //1st ES Node host and port .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_2"), {PORT})); //2nd ES Node host and port
Step 4: create an index and Index type and prepare JSON
The next step is to create Index and Index type in Elastic Search. Transport Client automatically index and index type, if it does not exists when you submit any document for indexing.
Step 5: Initialize BulkRequestBuilder
Elastic search bulk API provide BulkRequestBuilder
class for bulk operation. TransportClient
prepateBulk
method is used to initialize BulkRequestBuilder
Step 6: Prepare JSON Document
As per the Elastic Search Index API Documentations, there are several ways to generate JSON document, out of these options in our example we have used JSON Builder
to construct the document. Refer below syntax to construct JSON Document.
XContentBuilder builder = jsonBuilder() .startObject() .field("{FIELD_NAME_1}", "{FIELD_VALUE_1}") .field("{FIELD_NAME_2}", "{FIELD_VALUE_2}") .field("{FIELD_NAME_3}", "{FIELD_VALUE_3}") .endObject()
Step 7: GSON Streaming API
Use GSON Streaming API to iterate all the records from a large JSON file, prepare JSON document and add it to a bulk request using add
method.
Step 8: execute bulkRequest
After adding the certain number of documents in the bulk request call execute
method to add all the document to Elastic Search.
4. Example
4.1 Sample Input
I have 1.5GB of JSON file which contains document and it’s related metadata information. This file contains around 2.5M records and I want to index this file in Elastic Search for some analytics. Here are few records from an input file.
[ { "documentId": "1", "docType": "pdf", "docAuthor": "Java Developer Zone", "docTitle": "Java Blog", "isParent": true, "parentDocId": 0, "docLanguage": [ "en", "fr" ] }, { "documentId": "2", "docType": "pdf", "docAuthor": "Java Developer Zone", "docTitle": "Spring boot Blog", "isParent": true, "parentDocId": 0, "docLanguage": [ "en", "fr" ] }, { "documentId": "5", "docType": "pdf", "docAuthor": "Java Developer Zone", "docTitle": "Solr Blog", "isParent": false, "parentDocId": 1, "docLanguage": [ "fr", "slovak" ] }, { "documentId": "8", "docType": "pdf", "docAuthor": "Java Developer Zone", "docTitle": "Elastic Search Blog", "isParent": false, "parentDocId": 1, "docLanguage": [ "en", "czech" ] } ]
4.2 IndexLargeJsonFileExample
package com.javadeveloperzone; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.stream.JsonReader; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.io.*; import java.net.InetAddress; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.concurrent.ExecutionException; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public class IndexLargeJsonFileExample { String indexName,indexTypeName; TransportClient client = null; public static final String DOC_LANGUAGE = "docLanguage"; public static final String PARENT_DOC_ID = "parentDocId"; public static final String DOC_TITLE = "docTitle"; public static final String IS_PARENT = "isParent"; public static final String DOC_AUTHOR = "docAuthor"; public static final String DOC_TYPE = "docType"; public static void main(String[] args) { IndexLargeJsonFileExample esExample = new IndexLargeJsonFileExample(); try { esExample.initEStransportClinet(); //init transport client esExample.JsonBulkImport(); //index multiple document esExample.refreshIndices(); //refresh indices esExample.search(); //search indexed document }catch (Exception e){ e.printStackTrace(); }finally { esExample.closeTransportClient(); //close transport client } } public IndexLargeJsonFileExample(){ indexName = "document"; indexTypeName = "bulkindexing"; } /* Method used to init Elastic Search Transprt client, Return true if it is succesfully intialized otherwise false */ public boolean initEStransportClinet() { try { // un-command this, if you have multiple node client = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress( new TransportAddress(InetAddress.getByName("localhost"), 9300)); return true; } catch (Exception ex) { //log.error("Exception occurred while getting Client : " + ex, ex); ex.printStackTrace(); return false; } } public void JsonBulkImport() throws IOException, ExecutionException, InterruptedException { BulkRequestBuilder bulkRequest = client.prepareBulk(); File jsonFilePath = new File("H:\\Work\\Data\\sample.json"); int count=0,noOfBatch=1; //initialize jsonReader class by passing reader JsonReader jsonReader = new JsonReader( new InputStreamReader( new FileInputStream(jsonFilePath), StandardCharsets.UTF_8)); Gson gson = new GsonBuilder().create(); jsonReader.beginArray(); //start of json array int numberOfRecords = 1; while (jsonReader.hasNext()){ //next json array element Document document = gson.fromJson(jsonReader, Document.class); //do something real try { XContentBuilder xContentBuilder = jsonBuilder() .startObject() .field(DOC_TYPE, document.getDocType()) .field(DOC_AUTHOR, document.getDocAuthor()) .field(DOC_TITLE, document.getDocTitle()) .field(IS_PARENT, document.isParent()) .field(PARENT_DOC_ID, document.getParentDocId()) .field(DOC_LANGUAGE, document.getDocLanguage()) .endObject(); bulkRequest.add(client.prepareIndex(indexName, indexTypeName, String.valueOf(numberOfRecords)) .setSource(xContentBuilder)); if (count==50_000) { addDocumentToESCluser(bulkRequest, noOfBatch, count); noOfBatch++; count = 0; } }catch (Exception e) { e.printStackTrace(); //skip records if wrong date in input file } numberOfRecords++; count++; } jsonReader.endArray(); if(count!=0){ //add remaining documents to ES addDocumentToESCluser(bulkRequest,noOfBatch,count); } System.out.println("Total Document Indexed : "+numberOfRecords); } public void addDocumentToESCluser(BulkRequestBuilder bulkRequest,int noOfBatch,int count){ if(count==0){ //org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added; return; } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { System.out.println("Bulk Indexing failed for Batch : "+noOfBatch); // process failures by iterating through each bulk response item int numberOfDocFailed = 0; Iterator<BulkItemResponse> iterator = bulkResponse.iterator(); while (iterator.hasNext()){ BulkItemResponse response = iterator.next(); if(response.isFailed()){ //System.out.println("Failed Id : "+response.getId()); numberOfDocFailed++; } } System.out.println("Out of "+count+" documents, "+numberOfDocFailed+" documents failed"); System.out.println(bulkResponse.buildFailureMessage()); }else{ System.out.println("Bulk Indexing Completed for batch : "+noOfBatch); } } public void refreshIndices(){ client.admin().indices() .prepareRefresh(indexName) .get(); //Refresh before search, so you will get latest indices result } public void search(){ SearchResponse response = client.prepareSearch(indexName) .setTypes(indexTypeName) .get(); //MatchAllDocQuery System.out.println("Total Hits : "+response.getHits().getTotalHits()); System.out.println(response); } public void closeTransportClient(){ if(client!=null){ client.close(); } } }
4.3 Document
A pojo class which used in GSON streaming API.
package com.javadeveloperzone; import java.util.List; public class Document { int documentId,parentDocId; String docType,docAuthor,docTitle; boolean isParent; List<String> docLanguage; public int getDocumentId() { return documentId; } @Override public String toString() { return "Document{" + "documentId=" + documentId + ", parentDocId=" + parentDocId + ", docType='" + docType + '\'' + ", docAuthor='" + docAuthor + '\'' + ", docTitle='" + docTitle + '\'' + ", isParent=" + isParent + ", docLanguage=" + docLanguage + '}'; } public void setDocumentId(int documentId) { this.documentId = documentId; } public int getParentDocId() { return parentDocId; } public void setParentDocId(int parentDocId) { this.parentDocId = parentDocId; } public String getDocType() { return docType; } public void setDocType(String docType) { this.docType = docType; } public String getDocAuthor() { return docAuthor; } public void setDocAuthor(String docAuthor) { this.docAuthor = docAuthor; } public String getDocTitle() { return docTitle; } public void setDocTitle(String docTitle) { this.docTitle = docTitle; } public boolean isParent() { return isParent; } public void setParent(boolean parent) { isParent = parent; } public List<String> getDocLanguage() { return docLanguage; } public void setDocLanguage(List<String> docLanguage) { this.docLanguage = docLanguage; } }
4.3 Output
Total Document Indexed : 2567800 Total Hits : 2567800 { "took": 730, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "_clusters": { "total": 0, "successful": 0, "skipped": 0 }, "hits": { "total": 2567800, "max_score": 1, "hits": [ { "_index": "document", "_type": "bulkindexing", "_id": "14", "_score": 1, "_source": { "docType": "pdf", "docAuthor": "Jason White", "docTitle": "Java Blog", "isParent": true, "parentDocId": 0, "docLanguage": [ "en", "fr" ] } } ] } }
5. Conclusion
In this article, we have discussed how to index a large JSON file in Elastic Search using Bulk API and GSON streaming API. We have discussed step by step API description of elastic search client api, GSON streaming API, some admin operation like refresh indices and sample match all document query for basic QC.
6. References
Refer below links for more details:
7. Source Code
Elastic Search Index Large Json File Example
You can also check our Git repository for Elastic Search Index Large JSON File Example and other useful examples.