1. Overview

Elastic Search support bulk index, delete, update operation using Bulk API. In this article, we will discuss how to index a large JSON file using GSON and Elastic Search Bulk API. In this article, we will use GSON streaming API to parse large JSON file efficiently and also to avoid OOM error.

Bulk indexing helps us to improve indexing performance as we can be indexed multiple documents in a single API call.

2. Development Environment

Elastic Search: 6.6.0

Java: 1.8.0_65

IDE: IntelliJ Idea

Build Tool: Maven

3. Steps to Index Large JSON File

Now we will discuss how to use Elastic Search Transport client bulk API with details explanations.

Step 1: Create a Maven Project

Step 2: Add the required dependency in a project.

Elastic Search team provides client APIs to communicate with the elastic search for Java, C# .NET, Python etc… In this article, we will discuss the Java client of Elastic Search and GSON library for JSON streaming API. Add below dependencies in your project.

<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>6.6.0</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.2.4</version>
</dependency>

Step 3: Initialize transport client

TransportClient class is used to communicate with an Elastic Search cluster. It connects the ES cluster using a transport module. Here is the sample code to the initialize client.

TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_1}"), {PORT})); //1st ES Node host and port
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("{HostName_2"), {PORT}));  //2nd ES Node host and port

Step 4: create an index and Index type and prepare JSON

The next step is to create Index and Index type in Elastic Search. Transport Client automatically index and index type, if it does not exists when you submit any document for indexing.

Step 5: Initialize BulkRequestBuilder

Elastic search bulk API provide BulkRequestBuilder class for bulk operation. TransportClient prepateBulk method is used to initialize BulkRequestBuilder

Step 6: Prepare JSON Document

As per the Elastic Search Index API Documentations, there are several ways to generate JSON document, out of these options in our example we have used JSON Builder to construct the document. Refer below syntax to construct JSON Document.

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("{FIELD_NAME_1}", "{FIELD_VALUE_1}")
        .field("{FIELD_NAME_2}", "{FIELD_VALUE_2}")
        .field("{FIELD_NAME_3}", "{FIELD_VALUE_3}")
    .endObject()

Step 7: GSON Streaming API

Use GSON Streaming API to iterate all the records from a large JSON file, prepare JSON document and add it to a bulk request using addmethod.

Step 8: execute bulkRequest

After adding the certain number of documents in the bulk request call execute method to add all the document to Elastic Search.

4. Example

4.1 Sample Input

I have 1.5GB of JSON file which contains document and it’s related metadata information. This file contains around 2.5M records and I want to index this file in Elastic Search for some analytics. Here are few records from an input file.

[
  {
    "documentId": "1",
    "docType": "pdf",
    "docAuthor": "Java Developer Zone",
    "docTitle": "Java Blog",
    "isParent": true,
    "parentDocId": 0,
    "docLanguage": [
      "en",
      "fr"
    ]
  },
  {
    "documentId": "2",
    "docType": "pdf",
    "docAuthor": "Java Developer Zone",
    "docTitle": "Spring boot Blog",
    "isParent": true,
    "parentDocId": 0,
    "docLanguage": [
      "en",
      "fr"
    ]
  },
  {
    "documentId": "5",
    "docType": "pdf",
    "docAuthor": "Java Developer Zone",
    "docTitle": "Solr Blog",
    "isParent": false,
    "parentDocId": 1,
    "docLanguage": [
      "fr",
      "slovak"
    ]
  },
  {
    "documentId": "8",
    "docType": "pdf",
    "docAuthor": "Java Developer Zone",
    "docTitle": "Elastic Search Blog",
    "isParent": false,
    "parentDocId": 1,
    "docLanguage": [
      "en",
      "czech"
    ]
  }
]

4.2 IndexLargeJsonFileExample

package com.javadeveloperzone;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.io.*;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class IndexLargeJsonFileExample {

    String indexName,indexTypeName;
    TransportClient client = null;
    public static final String DOC_LANGUAGE = "docLanguage";
    public static final String PARENT_DOC_ID = "parentDocId";
    public static final String DOC_TITLE = "docTitle";
    public static final String IS_PARENT = "isParent";
    public static final String DOC_AUTHOR = "docAuthor";
    public static final String DOC_TYPE = "docType";

    public static void main(String[] args) {
        IndexLargeJsonFileExample esExample = new IndexLargeJsonFileExample();
        try {
            esExample.initEStransportClinet(); //init transport client
            esExample.JsonBulkImport(); //index multiple  document
            esExample.refreshIndices(); //refresh indices
            esExample.search(); //search indexed document
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            esExample.closeTransportClient(); //close transport client
        }
    }

    public IndexLargeJsonFileExample(){
        indexName = "document";
        indexTypeName = "bulkindexing";
    }
    /*
    Method used to init Elastic Search Transprt client,
    Return true if it is succesfully intialized otherwise false
     */
    public boolean initEStransportClinet()
    {
        try {
            // un-command this, if you have multiple node
            client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(
                            new TransportAddress(InetAddress.getByName("localhost"), 9300));
            return true;
        } catch (Exception ex) {
            //log.error("Exception occurred while getting Client : " + ex, ex);
            ex.printStackTrace();
            return false;
        }
    }

    public void JsonBulkImport() throws IOException, ExecutionException, InterruptedException {
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        File jsonFilePath = new File("H:\\Work\\Data\\sample.json");
        int count=0,noOfBatch=1;
        //initialize jsonReader class by passing reader
        JsonReader jsonReader = new JsonReader(
                new InputStreamReader(
                        new FileInputStream(jsonFilePath), StandardCharsets.UTF_8));
        Gson gson = new GsonBuilder().create();
        jsonReader.beginArray(); //start of json array
        int numberOfRecords = 1;
        while (jsonReader.hasNext()){ //next json array element
            Document document = gson.fromJson(jsonReader, Document.class);
            //do something real
            try {
                XContentBuilder xContentBuilder = jsonBuilder()
                        .startObject()
                        .field(DOC_TYPE, document.getDocType())
                        .field(DOC_AUTHOR, document.getDocAuthor())
                        .field(DOC_TITLE, document.getDocTitle())
                        .field(IS_PARENT, document.isParent())
                        .field(PARENT_DOC_ID, document.getParentDocId())
                        .field(DOC_LANGUAGE, document.getDocLanguage())
                        .endObject();
                bulkRequest.add(client.prepareIndex(indexName, indexTypeName, String.valueOf(numberOfRecords))
                        .setSource(xContentBuilder));
                if (count==50_000) {
                    addDocumentToESCluser(bulkRequest, noOfBatch, count);
                    noOfBatch++;
                    count = 0;
                }
            }catch (Exception e) {
                e.printStackTrace();
                //skip records if wrong date in input file
            }
            numberOfRecords++;
            count++;
        }
        jsonReader.endArray();
        if(count!=0){ //add remaining documents to ES
            addDocumentToESCluser(bulkRequest,noOfBatch,count);
        }
        System.out.println("Total Document Indexed : "+numberOfRecords);
    }
    public void addDocumentToESCluser(BulkRequestBuilder bulkRequest,int noOfBatch,int count){
        if(count==0){
            //org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
            return;
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if (bulkResponse.hasFailures()) {
            System.out.println("Bulk Indexing failed for Batch : "+noOfBatch);
            // process failures by iterating through each bulk response item
            int numberOfDocFailed = 0;
            Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
            while (iterator.hasNext()){
                BulkItemResponse response = iterator.next();
                if(response.isFailed()){
                    //System.out.println("Failed Id : "+response.getId());
                    numberOfDocFailed++;
                }
            }
            System.out.println("Out of "+count+" documents, "+numberOfDocFailed+" documents failed");
            System.out.println(bulkResponse.buildFailureMessage());
        }else{
            System.out.println("Bulk Indexing Completed for batch : "+noOfBatch);
        }
    }

    public void refreshIndices(){
        client.admin().indices()
                .prepareRefresh(indexName)
                .get(); //Refresh before search, so you will get latest indices result
    }
    public void search(){
        SearchResponse response = client.prepareSearch(indexName)
                .setTypes(indexTypeName)
                .get();
        //MatchAllDocQuery
        System.out.println("Total Hits : "+response.getHits().getTotalHits());
        System.out.println(response);
    }
    public void closeTransportClient(){
        if(client!=null){
            client.close();
        }
    }
}

4.3 Document

A pojo class which used in GSON streaming API.

package com.javadeveloperzone;
import java.util.List;
public class Document {
    int documentId,parentDocId;
    String docType,docAuthor,docTitle;
    boolean isParent;
    List<String> docLanguage;
    public int getDocumentId() {
        return documentId;
    }
    @Override
    public String toString() {
        return "Document{" +
                "documentId=" + documentId +
                ", parentDocId=" + parentDocId +
                ", docType='" + docType + '\'' +
                ", docAuthor='" + docAuthor + '\'' +
                ", docTitle='" + docTitle + '\'' +
                ", isParent=" + isParent +
                ", docLanguage=" + docLanguage +
                '}';
    }
    public void setDocumentId(int documentId) {
        this.documentId = documentId;
    }
    public int getParentDocId() {
        return parentDocId;
    }
    public void setParentDocId(int parentDocId) {
        this.parentDocId = parentDocId;
    }
    public String getDocType() {
        return docType;
    }
    public void setDocType(String docType) {
        this.docType = docType;
    }
    public String getDocAuthor() {
        return docAuthor;
    }
    public void setDocAuthor(String docAuthor) {
        this.docAuthor = docAuthor;
    }
    public String getDocTitle() {
        return docTitle;
    }
    public void setDocTitle(String docTitle) {
        this.docTitle = docTitle;
    }
    public boolean isParent() {
        return isParent;
    }
    public void setParent(boolean parent) {
        isParent = parent;
    }
    public List<String> getDocLanguage() {
        return docLanguage;
    }
    public void setDocLanguage(List<String> docLanguage) {
        this.docLanguage = docLanguage;
    }
}

4.3 Output

Total Document Indexed : 2567800
Total Hits : 2567800
{
  "took": 730,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "_clusters": {
    "total": 0,
    "successful": 0,
    "skipped": 0
  },
  "hits": {
    "total": 2567800,
    "max_score": 1,
    "hits": [
      {
        "_index": "document",
        "_type": "bulkindexing",
        "_id": "14",
        "_score": 1,
        "_source": {
          "docType": "pdf",
          "docAuthor": "Jason White",
          "docTitle": "Java Blog",
          "isParent": true,
          "parentDocId": 0,
          "docLanguage": [
            "en",
            "fr"
          ]
        }
      }
    ]
  }
}

5. Conclusion

In this article, we have discussed how to index a large JSON file in Elastic Search using Bulk API and GSON streaming API. We have discussed step by step API description of elastic search client api, GSON streaming API, some admin operation like refresh indices and sample match all document query for basic QC.

6. References

Refer below links for more details:

7. Source Code

Elastic Search Index Large Json File Example

You can also check our Git repository for Elastic Search Index Large JSON File Example and other useful examples.

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *