/    /  Spark-Shared Variables

Shared Variables:

In Spark while doing any operations or functions, it works on different variables used in that function. Generally multiple copies of the same variables are copied to the slave nodes and the update to that variables never returns to the driver program. This will be an inefficient way as the data transfer rate will be huge.

So we have shared variables which help in reducing the data transfer rate. We have 2 types of shared variables Broadcast variables and Accumulators

Broadcast variables:

A broadcast variable, as the name suggests, is broadcast from a master node to its slaves. Broadcast variables avoid the network bottlenecks when aggregating data. These variables let slaves quickly access RDD data and send the results back to the master. Broadcast variables are frequently used for mapping operations.

We can cache a read-only variable on each machine using Broadcast variables. This is efficient because we don’t need to ship each variable with task. We can also distribute broadcast variables using efficient broadcast algorithms.

Suppose if we have a large dataset then instead of transferring a copy of that data set for each task we can use broadcast variable, which can be copied to each node at a time and we can share that same data for each and every task in the node. For that we first need to create a broadcast variable using SparkContext.broadcast.

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value

res0: Array[Int] = Array(1, 2, 3)

We can use value method to access the shared value.

As the name suggests, broadcast variables are sent one way, from driver to task and there is no way to update a broadcast variable and have the update propagate back to the driver.

Accumulator:

Accumulators are just simple variables which are used to count something. Accumulators have shared variables which are used to update the variables in parallel during execution and share the results from workers to the driver program running on master node. They are similar to counters in map-reduce.

Accumulators are immutable which means the accumulator variable data cannot be changed by slave nodes. The slave nodes can’t even see the accumulator; they will just fetch the data. First we need to accumulator variables using accumulator() method on SparkContext.

val count: Accumulator[Int] = sc.accumulator(0)

val result = sc.parallelize(Array(1, 2, 3))

.map(i => { count += 1; i })

.reduce((x, y) => x + y)

assert(count.value === 3)

assert(result === 6)

Spark natively supports accumulators of numeric types, and programmers can add support for new types.

We can create named or unnamed accumulators which we can see on web UI.

We can create numeric accumulator using SparkContext.longAccumulator() or SparkContext.doubleAccumulator() to accumulate values of type Long or Double, respectively.

Tasks running on a cluster can then add to it using the add method. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

scala> val accum = sc.longAccumulator("My Accumulator")

accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

scala> accum.value

res2: Long = 10