Transformations and Actions:
We have 2 operations in RDD, they are transformations and actions. Transformations will create a new dataset from an existing one and shows the result to the user or stores them to external storage when action is triggered.
For example, we have a map transformation which passes each element in the dataset through a function and creates a new RDD.
Actions will return the value to the driver program after running a computation on dataset. It returns the final result by aggregating all the elements of the RDD. Here in Spark some of the operations are Lazy in nature which means we do not get the result right away.
The Transformations are lazy in nature which means they are started when an action is triggered. This function makes Spark to run more efficiently.
For example, the following program will convert data into lowercases lines:
val text = sc.textFile(inputPath) val lower: RDD[String] = text.map(_.toLowerCase()) lower.foreach(println(_))
Here we have map() method which is a transformation, which will change the text into Lowercase when ‘foreach’ action is triggered. A job is started for reading the input file and converting that into lower case before writing the result on the console.
We can say that if the return type is RDD, then it is a transformation; otherwise, itis an action. We have more transformations and actions in PairRDDFunctions for working with RDD of key-value pairs.
We have transformations for mapping, grouping, aggregating, repartitioning, sampling, and joining RDDs, and for treating RDDs as sets.
scala> val filelength = inputFile.map(s => s.length) filelength: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at map at <console>:26 scala> val totallength = filelength.reduce((a,b) => a + b) totallength: Int = 100
The main operation of using key-value pairs is ‘Shuffle’ operation like grouping, aggregating the elements of a key.
Example,
val lines = sc.textFile("india.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)
Here reducebykey operation counts how many times each line of text occurs in a file by repeatedly applying a binary function to values in pairs until a single value is produced.
We can also use
val pairs: RDD[(String, Int)] = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5))) val sums: RDD[(String, Int)] = pairs.reduceByKey(_+_) assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))
Here the values of key ‘a’ are aggregated using the addition function (_+_) as (3 + 1) + 5 = 9, no aggregation is performed for ‘b’ as there are no values for it. These operations can be both cumulative and associative. Here Equals operator(‘===’) provides more informative failure messages compared to regular ‘==’ operator.
We could also use counts.sortByKey(), to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as an array of objects.
We have another operator called ‘foldByKey’ which also does the same operation as above.
val sums: RDD[(String, Int)] = pairs.foldByKey(0)(_+_) assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))
Here we should supply a ‘0’ value when using addition of integers, it will differ for other types and operations. This time values are aggregated as
((0 + 3) + 1) + 5) = 9 for a and For b it is 0 + 7 = 7.
Both the foldByKey() and reduceByKey() operations are similar and gives same result.