« Back
in spark Scala mapreduce read.

Predicting Movie Ratings with Apache Spark, and Hortonworks.

Today's goal is to make a prediction on a movie's rating based on its synopsis using machine learning in an environment that could scale out to hundreds or even thousands of nodes. As the title suggests, I'll be doing it on Apache Spark using MLlib written in Scala. I wanted to approach this from start to finish instead of just giving a chunk of code because piecing it together from a lot of Googling and tinkering was actually quite a bit of work. Hopefully this helps the next person who's venturing into the "Big Data" world.

The Goal

To implement the Naive Bayes supervised learning algorithm to predict the rating of a movie based on its synopsis.

Let's take this text file as an example of our movies naive_bayes_movie_classification.txt:

1;The movie has a lot of violence, sex, drugs and alcohol.  
1;A corrupt cop with an alcoholic problem doesn't hesitate to get violent to get the job done  
2;This is a fairy tale movie about princes, princesses and a wonderful adventure.  
2;Will prince charming ever find his princess?  

It's a semicolon delimited file. The first field is the rating and the second field is the synopsis.

1 = M for 'Mature' or Adult. Something children shouldn't see.

2 = PG for 'Parental Guidance' or kid friendly.

What we want to do is make the system learn from this and given another synopsis, tell us whether it's a 1 or a 2. We could have more labels but I wanted to keep the example small.

Requirements

Before going any further there are a few things you'll need.

  1. Scala installed on the computer

  2. SBT Installed with the assembly plugin. Assembly allows you to package your program into one fat jar. If you don't, Spark will try to call libraries and not be able to load the classes properly. In my previous post, I go through doing this and setting up my initial environment.

  3. Hortonworks sandbox image. I chose Hortonworks because they have everything setup in a nice Virtualbox image with everything configured instead of downloading a linux VM and configuring everything myself (HDFS, Spark etc). That and they gave me free lunch during a Sydney seminar :D.

  4. A SSH client. Though I just used git bash.

Firing Up the Sandbox Image

Before firing up the VM, I went into settings and gave it a shared folder. This shared folder is the location of the code I'll be working with. This saves a lot of hassle of moving files around between the host and the VM.

Hortonworks comes with a ton of stuff installed and configured which you can play with. They have nice tutorials and a web front for their sandbox when you start it up. You can see a lot of the exposed ports and web portals through the port forwarding settings of the VM:

Hortonworks sandbox port forwarding settings

Also, I don't really want to type in 127.0.0.1 each time so I edited C:\Windows\System32\drivers\etc\hosts and put in this entry:

    127.0.0.1       hadoop

That way my host machine can access the services via http://hadoop:[port]/.

SBT File Library Dependencies

name := "NaiveBayes_Document_Classifier"

version := "1.0"

scalaVersion := "2.11.6"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.3.1" % "provided"

libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.3.1" % "provided"

libraryDependencies += "org.apache.lucene" % "lucene-analyzers-common" % "5.1.0"

It's important that "provided" is in the dependencies otherwise running sbt assembly will actually error out. Spark core and MLlib are already apart of the Spark container so they don't need to be packaged up when we run the code on our Spark cluster.

Using Lucene As A Word Stemmer

In a recent post, I talked about using Lucene for better searching. We'll use its English Analyzer to take our synopsis and stem it. We don't want our learning algorithm to remember the difference between words which should be considered the same. For example, these words can be considered to have the same weight or meaning:
greet, greeting, greets, greeted The root word is greet so when we stem our synopsis we'll only get the root words. We'll also want to tokenize it into a Seq of Strings. Additionally, the English Analyzer will also remove stop words (common words that provide no real value or meaning to the document).

In our Stemmer.scala file:

import org.apache.lucene.analysis.en.EnglishAnalyzer  
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute  
import scala.collection.mutable.ArrayBuffer

object Stemmer {

  // Adopted from
  // https://chimpler.wordpress.com/2014/06/11/classifiying-documents-using-naive-bayes-on-apache-spark-mllib/

  def tokenize(content:String):Seq[String]={
    val analyzer=new EnglishAnalyzer()
    val tokenStream=analyzer.tokenStream("contents", content)
    //CharTermAttribute is what we're extracting
    val term=tokenStream.addAttribute(classOf[CharTermAttribute])

    tokenStream.reset() // must be called by the consumer before consumption to clean the stream

    var result = ArrayBuffer.empty[String]

    while(tokenStream.incrementToken()) {
        val termValue = term.toString
        if (!(termValue matches ".*[\\d\\.].*")) {
          result += term.toString
        }
    }
    tokenStream.end()
    tokenStream.close()
    result
  }
}

Naive Bayes

When I looked up how to use Naive Bayes in Spark, the example provided already had vectors of numbers already calculated. Training the model then was easy enough. But how do I make that set of vectors myself for the movie ratings file?

As it turns out, there is something in MLlib that can help us with that too!
I made the comments very verbose below to describe what's happening. There are actually two parts here. The first is getting our data from HDFS and vectorizing it. In other words, extracting the features out of it for our model. Then we train the model.

The second part is extracting a test sample with only synopses and using our model from the first part to predict the rating for each synopsis.
MovingRatingClassifier.scala:

import org.apache.spark.mllib.classification.NaiveBayes  
import org.apache.spark.mllib.regression.LabeledPoint  
import org.apache.spark.{SparkContext, SparkConf}  
import org.apache.spark.mllib.feature.{IDF, HashingTF}

object MovieRatingClassifier {  
  def main(args:Array[String])
    {

      val sparkConfig = new SparkConf().setAppName("Movie Rating Classifier")
      val sc = new SparkContext(sparkConfig)

      /*
        This loads the data from HDFS.
        HDFS is a distributed file storage system so this technically 
        could be a very large multi terabyte file
      */      
      val dataFile = sc.textFile("/user/hue/naive_bayes_movie_classification.txt")

      /*
        HashingTF and IDF are helpers in MLlib that helps us vectorize our
        synopsis instead of doing it manually
      */       
      val hashingTF = new HashingTF()

      /*
        Our ultimate goal is to get our data into a collection of type LabeledPoint.
        The MLlib implementation uses LabeledPoints to train the Naive Bayes model.
        First we parse the file for ratings and vectorize the synopses
       */

      val ratings=dataFile.map{x=>
        x.split(";")
        match {
          case Array(rating,synopsis) =>
            rating.toDouble
        }
      }

      val synopsis_frequency_vector=dataFile.map{x=>
        x.split(";")
        match {
          case Array(rating,synopsis) =>
            val stemmed=Stemmer.tokenize(synopsis)
            hashingTF.transform(stemmed)
        }
      }

      synopsis_frequency_vector.cache()

      /*
       http://en.wikipedia.org/wiki/Tf%E2%80%93idf
       https://spark.apache.org/docs/1.3.0/mllib-feature-extraction.html
      */
      val idf = new IDF().fit(synopsis_frequency_vector)
      val tfidf=idf.transform(synopsis_frequency_vector)

      /*produces (rating,vector) tuples*/
      val zipped=ratings.zip(tfidf)

      /*Now we transform them into LabeledPoints*/
      val labeledPoints = zipped.map{case (label,vector)=> LabeledPoint(label,vector)}

      val model = NaiveBayes.train(labeledPoints)

      /*--- Model is trained now we get it to classify our test file with only synopsis ---*/
      val testDataFile = sc.textFile("/user/hue/test.txt")

      /*We only have synopsis now. The rating is what we want to achieve.*/
      val testVectors=testDataFile.map{x=>
        val stemmed=Stemmer.tokenize(x)
        hashingTF.transform(stemmed)
      }
      testVectors.cache()

      val tfidf_test = idf.transform(testVectors)

      val result = model.predict(tfidf_test)

      result.collect.foreach(x=>println("Predicted rating for the movie is: "+x))

    }
}

Compile the program with sbt compile and then package it into a jar with sbt assembly.

This will produce /target/scala-2.11/yourfile-assembly.jar

Uploading Data To HDFS

Hortonworks has a nice web interface we can use to upload files to. Had we be running a multi node cluster it would partition the file out and spread it across the nodes. But this is just a single node cluster to run our test and the file is nowhere near large enough so we'll just use the web interface to do it.

HDFS is entirely its own thing. We can ssh into the machine and tell it to load a local file on the system into HDFS using the command line. For this tutorial though we won't need to. It's definitely worth looking into though for further understanding on how to store your excessive data.
Storing File on HDFS

I also uploaded a test.txt file which will contain the synopses only.
test.txt:

Violence, anger, sex  
The drug cartel was extremely violent and looking for more trouble  
The alcoholic prince playing cop was angry and violent towards the sexy, drugged up princess.  
The princely programmer completed his classifer and charmed everyone around him  

That third synopsis is pretty tricky eh. Hopefully our classifier knows enough to rate that as a 1 for M!

Running the Prediction on Spark

Finally, we can run our program and see how we go with the prediction.
First login to the VM. If you notice, the port forwarding image from earlier puts ssh on port 2222:

ssh r[email protected] -p 2222

The default password is "hadoop" it just happens to be the same name as the hostname I gave the VM.

Now we need to navigate to our shared folder that we setup earlier and go to the jar directory that we assembled. It's running RedHat linux so it should be in /media

cd /media/sf_projects/[Your Project Dir]/target/scala-2.11/

Now to submit the job to Spark and have it run:

spark-submit --class "MovieRatingClassifier" --master local[4] yourfile-assembly.jar

This will run on the local instances with 4 workers. Under the hood it uses Akka which is really sweet. You'll have to consult the documentation on Spark for all the different running options.

After running, this is the result:

Spark movie prediction results

[Predicted 1 for M]

  • Violence, anger, sex
  • The drug cartel was extremely violent and looking for more trouble
  • The alcoholic prince playing cop was angry and violent towards the sexy, drugged up princess. *phew

[Predicted 2 for PG]

  • The princely programmer completed his classifer and charmed everyone around him

If you recall we ran the predict method with a .collect on the result. If we hadn't, these results can show up out of order from the parallel processing. I did kind of keyword stuff these examples a bit to keep it small but it should get better with more data.

This was a ton of a fun and I hope you enjoy it too. Thanks for reading!

comments powered by Disqus