1. Overview

In one of our previous article, we have explained Spark RDD example, in this article we are going to explain Spark Transformations.
Spark transformation is an operation on RDD which returns a new RDD as a result. Transformed RDDs are evaluated lazily when they are used in Action.
There are multiple transformations which are element-wise transformations and they work on one element at a time.
Based on dependencies between the RDDs, we can classify operations in two categories.

  • Narrow Transformation
  • Wide Transformation

Narrow Transformation

Each partition of the parent RDD is used by at most one partition of the child RDD.
So, RDD operations like, map, filter and union which operates on a single parent partition are referred as Narrow Operation.
Narrow Operations doesn’t require the data to be distributed across the partitions.

Spark-Narrow-Transformations

Spark Narrow Transformations

map()

The map method converts each element of the source RDD into a single element of the result RDD by applying a function, you can find how to use map function in in spark using java at our Git repository.

JavaRDD<String> guestsRDD =  sparkContext.textFile(args[0]);

JavaRDD<String> adultMinorRDD = guestsRDD.map(new Function<String,String>(){
    @Override
    public String call(String input){

        int age = Integer.parseInt(input.split(",")[2]);
            return ( age >= 18 ? "Adult" : "Minor");
    }
});

Protip: Whenever you have heavyweight initialization that should be done once for many RDD elements rather than once per RDD element, and if this initialization, such as creation of objects from a third-party library, cannot be serialized (so that Spark can transmit it across the cluster to the worker nodes), use mapPartitions() instead of map()mapPartitions() provides for the initialization to be done once per worker task/thread/partition instead of once per RDD data element.

filter()

filter transformation(function) returns a new RDD which satisfies the specified Predicate. you can read our Apache Spark Java example – Spark Filter for more details on applying filter function in spark using Java .

We have to pass a Function object while using the filter transformation.

/*Passing Text file as first argument*/
JavaRDD<String> inputRdd = sparkContext.textFile(args[0]);

JavaRDD<String> infoRdd = inputRdd.filter(new Function<String,Boolean>(){

    @Override
    public Boolean call(String input){
    return input.contains("INFO");
    }

});

Note:
In above example, we have to keep in mind that filter() operation won’t mutate the existing RDD. Instead it will create a new RDD and it will return the pointer to newly created RDD.
We also explained the detailed example here

flatMap()

flatmap transformation is similar to map() but it may return list of elements for a single input element from input RDD.

/*Reading input file whose path was specified as args[0]*/
JavaRDD<String> textFile = sparkContext.textFile(args[0]);


/*Creating RDD of words from each line of input file*/
JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
  @Override
    public Iterator<String> call(String line) throws Exception {
    return Arrays.asList(line.split(" ")).iterator();
    }
});

For complete flatMap example kindly read our previous article, Spark WordCount Example.

Note:
Map and flatmap are identical in way that they take one line from input RDD, applies a function on the line.
The key difference between map() and flatmap() is, map() returns one element while flatmap() returns list of elements.
Simplest use case of flatmap() is to extract the words.

union()

union transformation is little bit different from filter() as union operates on two RDDs. union() will return an RDD consisting the data from both source RDDs. You can find union example in Spark at our git repository.

public static void main(String[] args){

    SparkConf sparkConf = new SparkConf();

    sparkConf.setMaster("local[1]");

    sparkConf.setAppName("Union Example");

    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

    JavaRDD<String> likes = javaSparkContext.parallelize(Arrays.asList("Java"));

    JavaRDD<String> learn = javaSparkContext.parallelize(Arrays.asList("Spark","Scala"));

    JavaRDD<String> likeToLearn = likes.union(learn);

    System.out.println("I like "+likeToLearn.collect().toString());

    javaSparkContext.stop();

    javaSparkContext.close();

}

Note: If there are duplicates available in input RDDs, resultant RDD will also contain duplicate elements.

mapToPair(): In our classic example of John’s party problem, we clearly explained the use of mapToPair().
mapToPair() accepts a PairFunction and returns a JavaPairRDD as a result.
We can pass our custom PairFunction while dealing with mapToPair() function.

/*Below code generates Pair of Word with count as one 
*similar to Mapper in Hadoop MapReduce*/
JavaPairRDD<String, Integer> pairs = words.mapToPair(
  new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) {
      return new Tuple2<String, Integer>(s, 1);
    }
});

 

Wide Transformations

Spark-Wide-Transformation

Spark Wide Transformations

reduceByKey()

Spark RDD reduceByKey function merges the values for each key using an associative reduce function.

reduceByKey is an specialization of aggregateByKey aggregateByKey takes 2 functions: one that is applied to each partition (sequentially) and one that is applied among the results of each partition (in parallel). reduceByKey uses the same associative function on both cases: to do a sequential computing on each partition and then combine those results in a final result as we have illustrated here.

In spark, data is distributed into partitions. For the next illustration, (4) partitions are to the left, enclosed in thin lines. First, we apply the function locally to each partition, sequentially in the partition, but we run all 4 partitions in parallel. Then, the result of each local computation are aggregated by applying the same function again and finally come to a result.

Associativity

One requirement for the reduceByKey function is that is must be associative. To build some intuition on how reduceByKey works, let’s first see how an associative associative function helps us in a parallel computation.

Associativity lets us use that same function in sequence and in parallel. reduceByKey uses that property to compute a result out of an RDD, which is a distributed collection consisting of partitions.

SparkConf sparkConf = new SparkConf();

sparkConf.setMaster("local[1]");

sparkConf.setAppName(" Spark Reduce By Key Example");

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

List<Integer> range = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());

JavaRDD<Integer> intRange = javaSparkContext.parallelize(range,4);

JavaPairRDD<String,Integer> intPairedRDD= intRange.mapToPair(new PairFunction<Integer, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(Integer value) throws Exception {
        Tuple2<String, Integer> tuple2 = new Tuple2<>("key",value);
        return tuple2;
    }
});

JavaPairRDD<String,Integer> result = intPairedRDD.reduceByKey((v1, v2) -> (v1+v2));

List<Tuple2<String,Integer>> collectedResult=result.collect();

//Prints  collected result
System.out.print(collectedResult);

//Iterating through all the elements, i.e here we do have only 1 element
//below code is for understanding purpose.
for(Tuple2<String,Integer> element : collectedResult){
    System.out.println("key::"+element._1()+" val::"+element._2());
}

javaSparkContext.stop();

javaSparkContext.close();

You can find reduceByKey example in Spark at our Git repository.

ProtipDeeper, truer if you want

All that being said, that is a simplified version of what happens as there are some optimizations that are done here. This operation is associative, so the spark engine will perform these reductions locally first (often termed map-side reduce) and then once again at the driver. This saves network traffic; instead of sending all the data and performing the operation, it can reduce it as small as it can and then send that reduction over the wire.

 

Was this post helpful?

Leave a Reply

Your email address will not be published. Required fields are marked *