1. Overview

In our Apache Spark tutorial journey, we have learnt how to create Spark RDD using Java, Spark Transformations. In this article, we are going to explain Spark Actions.
Spark Actions are another type of operations which returns final values to Driver program or writes the data to external system.

You can find all the Java files of this article at our Git repository.

1.1 count()

count is an action which returns number of elements in the RDD, usually it is used to get an idea of RDD size before performing any operation on RDD.

JavaRDD<String> likes = javaSparkContext.parallelize(Arrays.asList("Java"));

JavaRDD<String> learn = javaSparkContext.parallelize(Arrays.asList("Spark","Scala"));

JavaRDD<String> likeToLearn = likes.union(learn);

//Learning::2
System.out.println("Learning::"+learn.count());

1.2 collect()

we can use collect action to retrieve the entire RDD. This can be useful if your program filters RDDs down to a very small size and you’d like to deal with it locally.

Note:

Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.

reusing the above example, we can print the likeToLearn RDD elements in system console using collect action like,

List<String> result = likeToLearn.collect();

//Prints I like [Java, Spark, Scala]
System.out.println("I like "+result.toString());

Now, moving forward we want continue the example by adding few other skills in our skillset.

We can use below snippet to add new learning skills in our learn RDD.

String[]  newlyLearningSkills = {"Elastic Search","Spring Boot"};

JavaRDD<String> learningSkills = learn.union(javaSparkContext.parallelize(Arrays.asList(newlyLearningSkills)));

//learningSkills::[Spark, Scala, Elastic Search, Spring Boot]
System.out.print("learningSkills::"+learningSkills.collect().toString());

Here, we are using union to add new skills in learn RDD, so as a result we can get new RDD i.e learningSkills RDD.

1.3 take(n)

we can use take spark action to retrieve a small number of elements in the RDD at the driver program. We then iterate over them locally to print out information at the driver.

countinuing our example, now we have added new skills and as a result we do have learningSkills Spark RDD.

Now, If we call take action on learningSkills RDD like,

List<String> learning4Skills =  learningSkills.take(4);

//Learning 4 Skills::[Spark, Scala, Elastic Search, Spring Boot]
System.out.println("Learning 4 Skills::"+learning4Skills.toString());

We will get 4 different skills from learningSkills RDD.

1.4 top(n)

Spark top action returns top(n) elements from RDD. In order to get top 2 learning skills from our learningSkills RDD, we can call top(2) action like,

List<String> learningTop2Skills =  learningSkills.top(2);

//Learning top 2 Skills::[Spring Boot, Spark]
System.out.println("Learning top 2 Skills::"+learningTop2Skills.toString());

So, as a result we will get new RDD i.e learningTop2Skills and we can print the top 2 learning skills as shown in code snippet.

Note: Here, we have not defined any ordering, so it uses default ordering. we can use

public static java.util.List<T> top(int num,java.util.Comparator<T> comp) method for specifying custom Comparator while using top action.

1.5 countByValue()

countByValue Spark action returns occurence of each element in the given RDD.

so, in case, if we call countByValue action on learningSkills RDD, It will return a Map<String,Long>where each element is stored as Key in Map and Value represents its count.

Map<String,Long> skillCountMap= learningSkills.countByValue();

for(Map.Entry<String,Long> entry: skillCountMap.entrySet()){
    System.out.println("key::"+entry.getKey()+"\t"+"value:"+entry.getValue());
}

Output

key::Scala	value:1
key::Spark	value:1
key::Elastic Search	value:1
key::Spring Boot	value:1

1.6 reduce()

The reduce action takes two elments as input and it returns one element as output. The output element must be of same type as input element. The simple example of such function is an addition function. We can add the elements of RDD, count the number of words. reduce action accepts commutative and associative operations as an argument.

So in our case lets take a list of integers and add them using reduce action as shown below.

The result will be sum of all the integers i.e 21.

JavaRDD<Integer> intRdd =  javaSparkContext.parallelize(Arrays.asList(1,2,3,4,5,6));

Integer result = intRdd.reduce((v1, v2) -> v1+v2);

System.out.println("result::"+result);

 

1.7 fold()

Spark fold action is similar to reduce action, apart from that it takes “Zero value” as input, so “Zero Value” is used for the initial call on each partition.

Note: Zero value is that it should be the identity element of that operation i.e 0 for Sum, 1 for Multiplication and division, empty list for concatenation etc. 

Key Difference: The key difference between fold() and reduce() is that, reduce() throws an exception for empty collection, but fold() is defined for empty collection.

Return type : The return type of fold() is same as that of the element of RDD we are operating on.

ProTip: You can minimize object creation in fold() by modifying and returning the first of the two parameters in place. However, you should not modify the second parameter.

JavaRDD<Integer> intRdd =  javaSparkContext.parallelize(Arrays.asList(1,2,3,4,5,6));

Integer foldResult = intRdd.fold(0,((v1, v2) -> (v1+v2)));

System.out.println("Fold result::"+foldResult);

1.8 aggregate()

fold() and reduce() spark actions works well for operations where we are returning the same return type as RDD type, but many times we want to return a different type.

For example, when computing a running average, we need to keep track of both the count so far and the number of elements, which requires us to return a pair. We could work around this
by first using map() where we transform every element into the element and the number 1, which is the type we want to return, so that the reduce() function can work on pairs.

The aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on.

Input: With aggregate() spark action like fold(), we have to supply,

  • An initial zero value of the type we want to return.
  • A function to combine the elements from our RDD with the accumulator.
  • We need to supply a second function to merge two accumulators, given that each node accumulates its own results locally.

We can use aggregate() to compute the average of an RDD, avoiding a map() before the fold().

For better explaination of aggregate spark action, lets consider an example where we are interested in calculating moving average of integer numbers,

So following code will calculate the moving average of integers from 1 to 10.

You can find the complete example of the Aggregate spark action at our git repository.

JavaRDD<Integer> intRDD = javaSparkContext.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));

Function2<AverageCount, Integer, AverageCount> addAndCount =
    new Function2<AverageCount, Integer, AverageCount>() {
        public AverageCount call(AverageCount a, Integer x) {
            a.total += x;
            a.num += 1;
            return a;
        }
    };

Function2<AverageCount, AverageCount, AverageCount> combine =
    new Function2<AverageCount, AverageCount, AverageCount>() {
        public AverageCount call(AverageCount a, AverageCount b) {
            a.total += b.total;
            a.num += b.num;
            return a;
        }
    };


AverageCount initial = new AverageCount(0, 0);
AverageCount currentMovingAverage = intRDD.aggregate(initial, addAndCount, combine);
System.out.println("Moving Average:"+currentMovingAverage.avg());

Now, if you run this code, you will get moving average as 5.5, lets add another 3 values, i.e 11,12,13 using following snippet.

JavaRDD<Integer> anotherIntRDD = javaSparkContext.parallelize(Arrays.asList(11,12,13));

JavaRDD<Integer> resultantRDD = intRDD.union(anotherIntRDD);

AverageCount newMovingAverage = resultantRDD.aggregate(initial, addAndCount, combine);

System.out.println("Changed Moving Average:"+newMovingAverage.avg());

Now if you run the program, you will get the changed moving average i.e 7.

1.9 foreach()

When we have a situation where we want to apply operation on each element of RDD, but it should not return value to the driver. In this case, foreach() function is useful. A good example of this would be
posting JSON to a webserver or inserting records into a database. In either case, the foreach() action lets us perform computations on each element in the RDD without bringing it back locally.

In our case we are simply printing each element of our previously derived RDD i.e learningSkills RDD.

So we can use following line to print all the elements of learningSkills RDD.

learningSkills.foreach(element -> System.out.println(element));

1.10 saveAsTextFile()

Outputting text files is also quite simple. The method saveAsTextFile(), we have already demonstrated in our passing function to spark example,
This spark action takes a path and will output the contents of the RDD to that file.
The path is treated as a directory and Spark will output multiple files underneath that directory. This allows Spark to write the output from multiple nodes.
With this method we don’t get to control which files end up with which segments of our data, but there are other output formats that do allow this.

we have used following code snippet to save the RDDs to textFiles.

adultMinorCountsRDD.saveAsTextFile(args[1]);

aggregatedFamilyPairsRDD.saveAsTextFile(args[2]);

2. API & References

We have used Spark API for Java for writing this article, you can find complete John Party problem solution at our Git repository.

3. Conclusion

Apache Spark computes the result when it encounters a Spark Action. Thus, this lazy evaluation decreases the overhead of computation and make the system more efficient.
If you have any query about Spark Actions, feel free to share with us. We will be happy to solve them.

 

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *