1. Overview

In our previous article, we explained Apache Spark Java example i.e WordCount, In this article we are going to visit another Apache Spark Java example – Spark Filter.

2. Development environment

Java : Oracle JDK 1.8
Spark : Apache Spark 2.0.0-bin-hadoop2.6
IDE : Eclipse
Build Tool: Gradle 4.4.1

3. Sample Input

In order to experience the power of Apache Spark as general processing engine, the input data size should be massive. But in our case, we are using small input file for learning.
For this tutorial, we are using standard log file of an application.

Sample Input:

2018-06-15 09:50:05.432 INFO [main] o.s.j.e.a.AnnotationMBeanExporter [MBeanExporter.java:431] - Registering beans for JMX exposure on startup
2018-06-15 09:50:05.452 INFO [main] o.s.c.s.DefaultLifecycleProcessor [DefaultLifecycleProcessor.java:343] - Starting beans in phase 2147483647
2018-06-15 09:50:05.452 INFO [main] s.d.s.w.p.DocumentationPluginsBootstrapper [DocumentationPluginsBootstrapper.java:147] - Context refreshed
2018-06-15 09:50:05.551 INFO [main] s.d.s.w.p.DocumentationPluginsBootstrapper [DocumentationPluginsBootstrapper.java:150] - Found 1 custom documentation plugin(s)
2018-06-15 09:50:06.040 INFO [main] s.d.s.w.s.ApiListingReferenceScanner [ApiListingReferenceScanner.java:41] - Scanning for api listing references
2018-06-15 09:50:07.747 INFO [main] s.d.s.w.r.o.CachingOperationNameGenerator [CachingOperationNameGenerator.java:40] - Generating unique operation named: loadInvestorAccountsUsingGET_1
2018-06-15 09:50:07.951 INFO [main] o.a.c.h.Http11NioProtocol [DirectJDKLog.java:179] - Initializing ProtocolHandler ["http-nio-5000"]
2018-06-15 09:50:07.984 INFO [main] o.a.c.h.Http11NioProtocol [DirectJDKLog.java:179] - Starting ProtocolHandler ["http-nio-5000"]
2018-06-15 09:50:08.016 INFO [main] o.a.t.u.n.NioSelectorPool [DirectJDKLog.java:179] - Using a shared selector for servlet write/read
2018-06-15 09:50:08.059 INFO [main] o.s.b.c.e.t.TomcatEmbeddedServletContainer [TomcatEmbeddedServletContainer.java:201] - Tomcat started on port(s): 5000 (http)
2018-06-15 09:50:08.065 INFO [main] c.b.i.InvestigatorMain [StartupInfoLogger.java:57] - Started InvestigatorMain in 21.298 seconds (JVM running for 25.551)
2018-06-15 09:50:42.150 WARN [http-nio-5000-exec-10] c.l.p.e.ElasticSearchIndexingException [ElasticSearchIndexingException.java:15] - Indexing failure
2018-06-15 09:51:17.777 WARN [http-nio-5000-exec-1] c.l.p.e.ElasticSearchIndexingException [ElasticSearchIndexingException.java:15] - Indexing failure
2018-06-16 08:13:13.581 INFO [Test worker] o.s.b.t.c.SpringBootTestContextBootstrapper [AbstractTestContextBootstrapper.java:319] - Neither xxx
2018-06-16 10:03:24.758 ERROR [http-nio-5000-exec-8] c.l.p.s.InvestorService [InvestorService.java:127] - Investor indexing failure had happened due to the data doesnot exist in [5ee01ad0-15fd-11e8-9cfb-9e4322fea8f0] xxx
2018-06-16 10:03:24.758 ERROR [http-nio-5000-exec-8] c.l.p.s.InvestorService [InvestorService.java:127] - Investor indexing failure had happened due to the data doesnot exist in [5ee01ad0-15fd-11e8-9cfb-9e4322fea8f0] xxx
2018-06-16 10:03:24.758 ERROR [http-nio-5000-exec-8] c.l.p.s.InvestorService [InvestorService.java:127] - Investor indexing failure had happened due to the data doesnot exist in [5ee01ad0-15fd-11e8-9cfb-9e4322fea8f0] xxx
2018-06-16 10:03:24.758 ERROR [http-nio-5000-exec-8] c.l.p.s.InvestorService [InvestorService.java:127] - Investor indexing failure had happened due to the data doesnot xxx
2018-06-16 10:03:24.758 ERROR [http-nio-5000-exec-8] c.l.p.s.InvestorService [InvestorService.java:127] - Investor indexing failure had happened due to the data doesnot xxx
2018-06-16 10:03:24.758 ERROR [http-nio-5000-exec-8] c.l.p.s.InvestorService [InvestorService.java:127] - Investor indexing failure had happened due to the data doesnot xxx
2018-06-16 10:03:24.758 ERROR [http-nio-5000-exec-8] c.l.p.s.InvestorService [InvestorService.java:127] - Investor indexing failure had happened due to the data doesnot xxx
2018-06-16 10:03:24.758 ERROR [http-nio-5000-exec-8] c.l.p.s.InvestorService [InvestorService.java:127] - Investor indexing failure had happened due to the data does xxx

4. Solution

We are creating a custom Spark filter by implementing Function interface which is available in Spark Java API(org.apache.spark.api.java.function.Function).

Using Spark Core

We are using following 1 file for this Apache Spark Java example – Spark Filter. We are using following one source file for completing Apache Spark Java example – Spark Filter.

Java source file

SparkFilterExample.java

4.1 Build File : build.gradle

apply plugin: 'java'

description """Apache Spark Java example - Spark Filter"""

sourceCompatibility = 1.8
targetCompatibility = 1.8

/* In this section you declare where to find the dependencies of your project*/
repositories {
    jcenter()
    mavenCentral()
    maven { url "https://mvnrepository.com/artifact/org.apache.spark/spark-core" }
}

dependencies {
    
    compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0'
       
}

4.2 Java Code: SparkFilterExample.java

package com.javadeveloperzone.spark.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

public class SparkFilterExample {

  /*	 
   Function<T, R> R call(T)
   Take in one input and return one output, for use with things
   like map and filter.
   
   Function2<T1, T2, R> R call(T1, T2)
   Take in two inputs and return one output, for use with things
   like aggregate or fold.
  
   FlatMapFunction<T, R> Iterable<R> call(T) Take in one input and return zero or more outputs, 
   for use with things like flatMap.
   */
  
  static class ContainsFunction implements Function<String,Boolean>{

    
    private static final long serialVersionUID = 1L;
    
    private String query;
    
    public ContainsFunction(String query){
      this.query = query;
    }
    
    @Override
    public Boolean call(String input) throws Exception {
      return input.contains(query);
    }
    
  }
  
  public static void main(String[] args) {
    
    SparkConf sparkConf = new SparkConf().setAppName("Apache Spark Java example - Spark Filter");
    
    /*Setting Master for running it from IDE.*/
    sparkConf.setMaster("local[2]");

    JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
    
    /*Passing Text file as first argument*/
    JavaRDD<String> inputRdd = sparkContext.textFile(args[0]);
    
    /*Creating Info and Error RDD*/
    JavaRDD<String> infoRdd = inputRdd.filter(new ContainsFunction("INFO"));
    
    JavaRDD<String> errorRdd = inputRdd.filter(new ContainsFunction("ERROR"));
    
    /*Java 8 lambda expression to create an inline filter function*/ 
    JavaRDD<String> warningRdd = inputRdd.filter(s -> s.contains("WARNING"));
    
    /*Union the Error and Info RDD content*/
    JavaRDD<String> infoWarningRdd = infoRdd.union(warningRdd);
    
    /*Saving the info,warning RDD to location specified as argument 1*/
    infoWarningRdd.saveAsTextFile(args[1]);
    
    /*Saving the info,warning RDD to location specified as argument 2*/
    errorRdd.saveAsTextFile(args[2]);
    
    /*Stopping Spark Context*/
    sparkContext.stop();
    
    sparkContext.close();
    
  }
}

5. Build & Run Spark Filter Example

We need to pass 3 arguments to run the program.
First argument will be input file path
Second argument will be output path for saving combined log files of info and warning logs.
Third argument will be output path for saving error logs.
Output path(folder) must not exist at the location, Spark will create it for us.

6. Output(Portion)

Once the job is completed successfully, you will get the output which looks like following output,

2018-06-15 09:50:05.432 INFO  [main] o.s.j.e.a.AnnotationMBeanExporter [MBeanExporter.java:431] - Registering beans for JMX exposure on startup
2018-06-15 09:50:05.452 INFO  [main] o.s.c.s.DefaultLifecycleProcessor [DefaultLifecycleProcessor.java:343] - Starting beans in phase 2147483647
2018-06-15 09:50:05.452 INFO  [main] s.d.s.w.p.DocumentationPluginsBootstrapper [DocumentationPluginsBootstrapper.java:147] - Context refreshed
2018-06-15 09:50:05.551 INFO  [main] s.d.s.w.p.DocumentationPluginsBootstrapper [DocumentationPluginsBootstrapper.java:150] - Found 1 custom documentation plugin(s)
2018-06-15 09:50:06.040 INFO  [main] s.d.s.w.s.ApiListingReferenceScanner [ApiListingReferenceScanner.java:41] - Scanning for api listing references
2018-06-15 09:50:07.747 INFO  [main] s.d.s.w.r.o.CachingOperationNameGenerator [CachingOperationNameGenerator.java:40] - Generating unique operation named: loadInvestorAccountsUsingGET_1
2018-06-15 09:50:07.951 INFO  [main] o.a.c.h.Http11NioProtocol [DirectJDKLog.java:179] - Initializing ProtocolHandler ["http-nio-5000"]
2018-06-15 09:50:07.984 INFO  [main] o.a.c.h.Http11NioProtocol [DirectJDKLog.java:179] - Starting ProtocolHandler ["http-nio-5000"]
2018-06-15 09:50:08.016 INFO  [main] o.a.t.u.n.NioSelectorPool [DirectJDKLog.java:179] - Using a shared selector for servlet write/read
2018-06-15 09:50:08.059 INFO  [main] o.s.b.c.e.t.TomcatEmbeddedServletContainer [TomcatEmbeddedServletContainer.java:201] - Tomcat started on port(s): 5000 (http)
2018-06-15 09:50:08.065 INFO  [main] c.b.i.InvestigatorMain [StartupInfoLogger.java:57] - Started InvestigatorMain in 21.298 seconds (JVM running for 25.551)

7. Source Code

You can download the source code of Apache Spark Java example – Spark Filter from our git repository, which can give you basic understanding for writing Apache Spark programs using Java.

 

Was this post helpful?
Let us know, if you liked the post. Only in this way, we can improve us.
Yes
No

Leave a Reply

Your email address will not be published. Required fields are marked *