« Back
in spark Scala read.

Getting Unique Combinations Of Products with Apache Spark.

This Apache Spark snippet looks at users who have rated a series of products and pulls out the unique combinations of the products rated by each user to start building a recommendation system. To get there though we will go through a multi-part MapReduce algorithm.

Some Terminology:

  • (k,v) denotes a tuple with key k and value v. It can have any number of members.
  • [] denotes a collection like a List or an Array.
  • The ratings are 1-5 stars.

RDD Transformation steps using MapReduce:

  1. (user, product, rating)
  2. (product, [ (user, rating) ])
  3. (user, (product, rating, # product ratings) )
  4. (user, (product1, product2) )

For step 4. though we do not want: (user, (product1,product2)) and (user,(product2,product1)) as this combination is the same thing.

We will start with a data set similar to this user_game_ratings.tab:
The general format here is (userid,game,rating) where game is the product in a tab delimited file.

1    Fallout 4   5  
1    Left4Dead   5  
1    Dragon Age Origins  5  
2    Metro: Last Light   4  
2    Left4Dead   5  
2    Dragon Age Origins  3  
3    Metro: Last Light   2  
4    Mass Effect 5  
5    Mass Effect 4  
3    Tera    1  
5    Ori and the Blind Forest    1  

Transforming the RDD using groupBy()

Our desired output is (game,[ (user,rating)] ). That is a scala.Tuple2 that contains a collection of users and ratings tuples for that game title.

    //userId,game,rating
    val user_game_ratings = sc.textFile("/user/hue/user_game_ratings.tab")

    val game_x_user_rating = user_game_ratings.map(f=>{
      val line=f.split("\t")
      //(game,[(user,rating)])
      (line(1),(line(0),line(2).toInt))
    }).groupBy(_._1)

Since we group by the first element which happens to be the game line(1), the user rating tuples will be a collection inside each game title.

FlatMap() RDD into Users and their Game ratings

What we want now is (user, (game, rating, # game ratings) ).

    val user_x_game_rating_numRaters = game_x_user_rating.flatMap(g=>{
      val numberOfRaters = g._2.size
      val game = g._1
      val userRatings=g._2

      val result = userRatings.map(ur=>{
        //ur._2=(String,Int) = (user,rating)
        (ur._2._1,(game,ur._2._2,numberOfRaters))
      })

      result
    })

Recall that g is (game,[(user,rating)]) so g._2 is the second tuple [(user,rating)] of type Iterable. We can call the size() method on that to get a count of the number of elements and thus the number of ratings for that game.

Using a Self join() and filter() to get all Game Combinations

In the Java documentation, the join() operation can only be performed on a JavaPairRDD. Scala will allow you to perform the join and detect the tuples automatically but you'll have to pull in the import import org.apache.spark.SparkContext._.

Using a self join on user_x_game_rating_numRaters on itself will get us (user,([game,rating,number of raters]):

      (user 1, (game 1, game 2))
      (user 1, (game 2, game 1))
...

Notice though that for all intents and purposes, order does not matter. So they are considered the same thing. What we need to do is pass it through a filter function and do a predicate comparison on the game titles.

    val user_x_game_ratings = user_x_game_rating_numRaters.join(user_x_game_rating_numRaters).filter(f=>{
      //f._2 = ((String,Int,Int),(String,Int,Int)) = game1 and game2
      val game1=f._2._1._1
      val game2=f._2._2._1
      game1.compareToIgnoreCase(game2) > 0
    })

Now we can access all the different combinations via the second tuple. To confirm, run the Spark program and add a collect().foreach(println) to the end of user_x_game_ratings. Collect will pull all the elements of the RDD into one reducer and then we can iterate over the elements and print them out.

(2,((Dragon Age Origins,3,2),(Metro: Last Light,4,2)))
(2,((Dragon Age Origins,3,2),(Left4Dead,5,2)))
(2,((Left4Dead,5,2),(Metro: Last Light,4,2)))
(5,((Mass Effect,4,2),(Ori and the Blind Forest,1,1)))
(3,((Metro: Last Light,2,2),(Tera,1,1)))
(1,((Fallout 4,5,1),(Left4Dead,5,2)))
(1,((Dragon Age Origins,5,2),(Fallout 4,5,1)))
(1,((Dragon Age Origins,5,2),(Left4Dead,5,2)))

Here's the whole thing in one Object:

import org.apache.spark._  
import org.apache.spark.SparkContext._

object SimilarGames {  
  def main(args: Array[String]) {
    val sparkConfig = new SparkConf().setAppName("Similar Games")
    val sc = new SparkContext(sparkConfig)

    //userId,game,rating
    val user_game_ratings = sc.textFile("/user/hue/user_game_ratings.tab")

    val game_x_user_rating = user_game_ratings.map(f=>{
      val line=f.split("\t")
      //(game,[(user,rating)])
      (line(1),(line(0),line(2).toInt))
    }).groupBy(_._1)


    //(user,(game,rating,number of raters))
    val user_x_game_rating_numRaters = game_x_user_rating.flatMap(g=>{
      val numberOfRaters = g._2.size
      val game = g._1
      val userRatings=g._2

      val result = userRatings.map(ur=>{
        //ur._2=(String,Int) = (user,rating)
        (ur._2._1,(game,ur._2._2,numberOfRaters))
      })

      result
    })

    /*
      perform self join to get:
      (user,([game,rating,number of raters])
      This will generate duplicates combinations e.g.
      (user 1, (game 1, game 2))
      (user 1, (game 2, game 1)) are the same thing

      So we filter and compare the strings to make sure they're different.
    */

    val user_x_game_ratings = user_x_game_rating_numRaters.join(user_x_game_rating_numRaters).filter(f=>{
      //f._2 = ((String,Int,Int),(String,Int,Int)) = game1 and game2
      val game1=f._2._1._1
      val game2=f._2._2._1
      game1.compareToIgnoreCase(game2) > 0
    }).collect().foreach(println)
}}
comments powered by Disqus