When we persist an RDD, then each and every node stores its partitions and computes them in memory and reuses them in other actions of that dataset. We can persist RDD using persist() or cache() methods.
Here cache is fault-tolerant which means if any partition of RDD is lost then it is recomputed using the transformations that originally created it. For good results we can cache the dataset into memory.
scala> abc.cache() res1: abc.type = MappedRDD at map at <console>:18
Here we can see that log shows that the RDD number is 4. In spark by calling cache it does not cache the RDD into memory straightaway. It first marks the RDD with a flag indicating that it should be cached when the job is run. Here job means applying any action on it.
scala> abc.reduceByKey((a, b) => Math.max(a, b)).foreach(println(_)) Added rdd_4_0 in memory Added rdd_4_1 in memory
Now we can see that the partitions are kept in memory. The log lines show it has 2 partitions 0 and 1.
Now if we run another job or another action on it, then the dataset will be loaded from the memory. ‘locally’ means loaded from memory.
scala> abc.reduceByKey((a, b) => Math.min(a, b)).foreach(println(_)) Found block rdd_4_0 locally Found block rdd_4_1 locally
These cached RDD can be only by jobs in the same application. To share these datasets between the applications, then they must be written to external storage using one of the saveAs() methods (saveAsTextFile(), saveAsHadoopFile())in the first application, then loaded using the corresponding method in SparkContext (textFile(),hadoopFile(), etc.) in the second application.
Now let us discuss some of the storage levels for persistence. These storage levels are set by passing a StorageLevel object. The default storage level for cache() method is ‘Memory_Only’. Some of the storage levels are,
|MEMORY_ONLY||Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.|
|MEMORY_AND_DISK||Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.|
(Java and Scala)
|Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast_serializer, but more CPU-intensive to read.|
(Java and Scala)
|Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.|
|DISK_ONLY||Store the RDD partitions only on disk.|
|MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.||Same as the levels above, but replicate each partition on two cluster nodes.|
|OFF_HEAP (experimental)||Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.|
We can print the elements of RDD using rdd.foreach(println) or rdd.map(println). We can use these methods for generating the output on a single machine. In cluster mode we can use collect() method to first bring the RDD to the driver node and then print it using rdd.collect().foreach(println). This will consume the whole memory because collect() method fetches the entire RDD to a single machine. If we require to print only a few elements then we can use take() method as rdd.take(100).foreach(println).