
PairRDDs
So far we have seen basic RDD where elements have been words, numbers, or lines of text. We'll now discuss PairRDD, which are essentially datasets of key/value pairs. People who have been using MapReduce will be familiar with the concept of key/value pairs and their benefits during aggregation, joining, sorting, counting, and other ETL operations. The beauty of having key value pairs is that you can operate on data belonging to a particular key in parallel, which includes operations such as aggregation or joining. The simplest example could be retail store sales with StoreId
as the key, and the sales amount as the value. This helps you perform advanced analytics on StoreId
, which can be used to operate the data in parallel.
Creating PairRDDs
The first step in understanding PairRDDs is to understand how they are created. As we have seen previously, it is not necessary that we have the data available in key/value form upon ingestion and hence there is a need to transform the data using traditional RDD transformation functions into a PairRDD.
Converting a data set into key/value pairs requires careful consideration. The most important of those considerations include answering the following key questions:
- What should be the key?
- What should be the value?
The answer to this lies in the business problem that you are trying to solve. For example, if you had a text file with StoreId
(1000 different stores) and sales per month (000's of dollars), the key would be the StoreId
, and the value would be the sales. We are going to work through a comma-separated file, which has the StoreId
and the relevant sales values per month. The file storesales.csv
is available for you to be downloaded from the book's website.
Let's look at the structure of the file, before we make any decisions on a future course of action:

Figure 2.24: File structure of storesales.csv
The structure of the file indicates that we have a three digit StoreID
, followed by a comma and the value of store sales. Our objective is to calculate the total store sales per store, to understand how each store is doing. As discussed earlier, the first thing is to convert this into a PairRDD, which can be done using a map
(Python and Scala) and MapToPair
(Java) function. Let's have a quick look at the examples in Scala, Python, and Java:

Figure 2.25: Total sales per store using PairRDDs (Scala)
The code in Python looks similar as we can use a map
function to transform it to a PairRDD
:

Figure 2.26: Total sales per store using PairRDDs (Python)
In Java however, we have to use the mapToPair()
function to transform it into a PairRDD:

Figure 2.27: Total sales per store using PairRDDs (Java)
PairRDD transformations
Spark not only provides the construct of PairRDDs, but it also provides special transformations that are only applicable to PairRDDs in addition to the ones available to the standard RDDs. As the name PairRDD indicates, a pair of two elements (a,b) makes up an element; hence, you would see that the operations are applicable to pairs/tuples.
Note
Tuples are data structures consisting of multiple parts. In relational databases, it refers to an ordered set of data that constitutes a record.
The hint on which operations are designated to be used for PairRDDs can be taken from the fact that operations would either have a word key or value in them indicating that they should be used for PairRDDs:

We'll run the examples with sales data from regions within the United Kingdom using the following sample input data:
{(London, 23.4),(Manchester,19.8),(Leeds,14.7),(London,26.6)}
reduceByKey(func)
We have already seen examples of reduceByKey(func)
where it combines values for the same key, in our case, store sales based on the storeid(key)
. The function that has to be passed as an argument has to be an associative function, which applies to the source RDD and creates a new RDD with the resulting values. Since the data from various keys can be on various executors, this function might require the shuffling of data. The following examples demonstrate reduceByKey()
with Scala, Python, and Java:

Figure 2.28: Reduce by key (scala)
The code in Python looks very similar to Scala:

Figure 2.29: Reduce by key (Python)
As seen in previous examples, the code in Java looks a bit long winded where you have to create a list of Tuple2
objects before running the reduceByKey
action:

Figure 2.30: Reduce by key (Java)
GroupByKey(func)
As the name indicates, it groups the values based on the same key. For brevity's sake, we will leave the readers to play around with the dataset that you had loaded for reduceByKey()
, and apply the groupByKey()
to see the result.
reduceByKey vs. groupByKey - Performance Implications
reduceByKey()
and groupByKey()
can be used for similar results, but their execution is plan is different and can have a huge impact on your performance. Behind the scenes both of them use combineByKey
with different combine/merge implementations which is the key differentiator. groupByKey()
operation may be very expensive especially if you are grouping in order to perform an aggregation (such as a sum or average) over each key. In such a scenario, using aggregateByKey
or reduceByKey()
will provide much better performance. Key differences are:
reduceByKey()
: Offers a Map side combine, which means the amount of data shuffled will be lesser than thegroupByKey()
.groupByKey()
: Must be able to hold all key-value pairs for any key in memory. If a key has too many values (quite common with a popular e.g. stock trades), it can potentially result in anOutOfMemoryError
.groupByKey()
calls thecombineByKey()
with themapSideCombine
parameter as false, which results in exaggerated shuffling, thus impact performance.- In summary you should avoid
groupbyKey()
. For example, in the following scala example, the same aggregation which is being attempted bygroupByKey()
can be attempted withreduceByKey().
Example 2.19: groupByKey() and reduceByKey()
in Scala:
#Input Data val storeSales = sc.parallelize(Array(("London", 23.4),("Manchester",19.8),("Leeds",14.7),("London",26.6))) #GroupByKey storeSales.groupByKey().map(location=>(location._1,location._2.sum)).collect() #SampleResult #res2: Array[(String, Double)] = Array((Manchester,19.8), (London,50.0), (Leeds,14.7)) #ReduceByKey storeSales.reduceByKey(_+_).collect() #Sample Result #res1: Array[(String, Double)] = Array((Manchester,19.8), (London,50.0), (Leeds,14.7))
CombineByKey(func)
To better understand combinebykey()
, you'll need to understand the combiner functionality from Hadoop MapReduce. Combiner is basically a map side reduce function, to reduce the amount of data shuffled. combineByKey()
takes three different arguments:
CreateCombiner
: This is the first argument of thecombineByKey()
function: As the name indicates, it is used to create the combiner that will be used later. This is called for the first time when a key is found for a partition.MergeValue
: This is called when the key already has an accumulator.MergeCombiner
: This is called when more than one partition has an accumulator for the same key.
Let us consider the example of a dataset, with the following data:
("k1",10),("k2",5),("k1",6),("k3",4),("k2",1),("k3",4)
For simplicity's sake we have kept the data to three keys, (k1,k2,k3)
with at least two values each. Our objective is to calculate the average value for each key. Let's see the Python example first, as it is probably the simplest one:

Figure 2.31: CombineByKey (Python)
In the first step we parallelize the data across the cluster. Now let's go through each step of the combineByKey()
and see what it does to explain the semantics of the operation:
lambda val
:(val,1)
: As mentioned previously, this is the creation of a combiner, which will be called for each partition of the key. This will take the value data item from the (key, value) pair in the input dataset, and convert it to another pair, which will be(value,1)
. So, for example, when it comes across("k1",10)
, it will pick the value10
and convert it to10,1
pair. This is basically the initialization of the combiner. Whenever it comes across a new key partition, it will repeat the same process. The magic is hidden in the name of the operation, which iscombineByKey
.lambda valcntpair, val
:(valcntpair[0] + val, valcntpair[1] +1)
: This step is the merge value step, which means that this will come into play whenever we come across another key within the partition. When that happens (another key is found), it uses thevalcntpair
, (short for value-count pair) that we created in the create combiner process, and merges the new value and count with the existing one. In our case, the logic was very simple, which is to add the value to the existing aggregated value, and increment the counter by1
to indicate that we have found another key.lambda valcntpair,valcntpairnxt
:((valcntpair[0] + valcntpairnxt[0]), (valcntpair[1]+valcntpairnxt[1]))
: This step as mentioned previously, is the merge combiner step, which basically means that you might have multiple combiners across multiple partitions. Essentially, this is the global summation of all the occurrences of the values and their counts. The end result should be the total value and final count.
For those of you who prefer Scala, here's the same example in Scala:
Example 2.20: combineBeKey()
example in Scala:
val sampleData = sc.parallelize(Array(("k1",10),("k2",5),("k1",6),("k3",4),("k2",1),("k3",4))) val sumCount = sampleData.combineByKey(value => (value,1), (valcntpair: (Int,Int), value) => (valcntpair._1 + value, valcntpair._2+1), (valcntpair: (Int,Int), valcntpairnxt: (Int,Int)) => ((valcntpair._1 + valcntpairnxt._1),(valcntpair._2 + valcntpairnxt._2))) sumCount.take(3) val avgByKey = sumCount.map{case (label,value) => (label, value._1/value._2)} avgByKey.take(3)
Tip
We have now looked at the implementation of Average across a set of key value pairs. We have provided a very good dataset earlier in this chapter in the storesales.csv
file. As an exercise, can you implement the average sales per store in your favorite programming language?
There is nothing better than going through the examples with your own dataset to grasp a better understanding of the subject.
Transformations on two PairRDDs
So far the transformations that we have seen are the ones on a single PairRDD. However, it is important to look at two of the transformations that apply on more than one PairRDD, as in real-life scenarios you would be working with multiple datasets, which can be facts or dimensions and join them based on their key-values to get a final dataset.
The major two PairRDD transformations are similar to the pseudo set operations that we saw earlier. We have three join operations (join, leftOuterJoin
, rightOutJoin
) in addition to a subtraction and a cogrouping operation:
