RDD Creation:
Now let us go and create RDD.
First method is using Parallelized Collections. Here RDD are created by using Spark Context parallelize method. We will call this method on an existing collection in our program. When we call this method than the elements in the collection will be copied to form a distributed dataset which will be operated in parallel. For example,
val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3) scala> val distData = sc.parallelize(data) distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26 scala> distData.reduce((a,b) => a+b) res0: Int = 6
So now we have a distributed dataset called ‘distData’ which can be operated in parallel. Now we can use distData.reduce((a, b) => a + b) to add up the elements of the array.
We can also divide the collection or dataset into number of partitions also. Spark will run 1 task for 1 partition. We can do this by using sc.parallelize(data, 10)). We can also set this by using spark.default.parallelism property. When we run the above property, then it will differ in local and cluster machine. In Local machine it will be the number of cores in the machine and in Cluster it will be the total number of cores in the entire cluster.
Now let us look at the second method of creating RDD which is by using External dataset. Spark can create RDD by using external datasets like HDFS, HBase, Amazon S3, Cassandra. etc. It also supports text files, SequenceFiles, any other Hadoop InputFormat.
scala> val distFile = sc.textFile("india.txt") distFile: org.apache.spark.rdd.RDD[String] = india.txt MapPartitionsRDD[2] at textFile at <console>:24
Here we are just creating a reference to the external dataset. We can also create this by using,
scala>val text: RDD[String] = sc.textFile(“india.txt”)
Here Spark uses TextInputFormat from the old MapReduce API to read the file. There is mostly 1 Spark partition per HDFS block. This can be changed by using,
sc.textFile(inputPath, 10)
We also have other formats like ‘wholeTextFiles’ which processes all text files as a whole file. All the files present in the path will be processed as a single text file. Since every file here loaded into memory we only use small files here. Here we have RDD string pairs as return type. First string is for file path and second string is for the file contents.
val files: RDD[(String, String)] = sc.wholeTextFiles(inputPath)
We also have SequenceFile input format.
sc.sequenceFile[IntWritable, Text](inputPath)
The last way of creating RDD is by transforming an existing RDD into another RDD. We will create multiple RDD by using transformations.