« Back
in spark Scala mapreduce read.

Running A Count With MapReduce in Apache Spark.

Apache Spark Snippet - Counts

This is the first in a series of snippets on Apache Spark programs. In a previous post I ran a machine learning algorithm through Spark and will be following a similar setup using the Hortonworks Sandbox. In the future I'll do some snippets on AWS' Elastic MapReduce. These snippets run small datasets just for learning. Normally you would only run these on very large files that just don't fit on one server.

Channels with Programs Airing for more than 24 Hours

What we have is a bunch of .tab files containing channel listings information and the aim is to get out the number of channels with a count on the number of programs that run longer than 24 hours.

The general steps are:

  1. Upload tab delimited files to HDFS
  2. Write the MapReduce program in Scala
  3. Submit the job to Apache Spark
  4. Collect and save the results back to HDFS

Uploading the Files

Recall that the VM has a web interface to HDFS which we can upload the files to:
saved channel listings to HDFS

MapReduce in Scala

What we want to do in the scala program is take all the input files and check if column 23 is greater than 24. We do so with a try catch in case the conversion to Int doesn't go well. Ideally the files would be cleaned up beforehand.

The important thing to note is that Spark chunks the files into multiple parts for many mappers to run (line 13). It might not necessarily be the one node that runs one file. You can get a count if you open up spark-shell and just run:

scala>  val dataFile = sc.textFile("/user/hue/BT8626/*.tab")  
scala> df.count()  

FindXColMins.scala:

import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by ttruong on 12/11/2015.
  */
object FindXColMins {  
  def main(args: Array[String]) {
    val sparkConfig = new SparkConf().setAppName("Find X Column in Minutes")
    val sc = new SparkContext(sparkConfig)

    val dataFile = sc.textFile("/user/hue/BT8626/*.tab")

    val channels_with_x_greater_than_24 = dataFile.map(f=>{
      val line = f.split("\t")
      //If the tab files are opened in excel, 
      // they're in column X which happens to be index 23
      try
      {
        val col_x = line(23).toInt

        if (col_x > 24)
        {
          (line(0),1)
        }
        else
        {
          (line(0),0)
        }
      }
      catch
      {
        case _:Exception => ("Exception",1)
      }

    })

    val reduced = channels_with_x_greater_than_24
      .reduceByKey(_+_)
      .coalesce(1,true)
      .sortBy(k=>k._1)
      .saveAsTextFile("/user/hue/BT8626_output")
  }
}

line(0) happens to be the channel name and line(23) is the running time in hours.

Internally when we run the mapper, the nodes should have something like this in their collection:

(ESPN,1)
(ESPN,1)
(FOX SPORTS,1)
(Showcase,1)
...

We assigned each channel a value of 1 when an event has a running time of greater than 24. When we run a reduceByKey(), it will group all the channel names together and sum up their values which is 1 in this case. This gives us the count.

Finally we collect all the jobs onto one reducer(because the data set is small), sort it by channel name and then spit it back out to HDFS.

Run the Spark Job

Compile and package the program into a fat jar:

$> sbt compile
$> sbt assembly

Like last time, I shared a projects folder with the VM and navigate to that folder.
SSH into the VM and navigate to the jar folder and then submit the job to Spark.

[[email protected]]# spark-submit --class "FindXColMins" --master local[4] YourJar.jar

Final Results on HDFS

Channels and Number of Programs airing for more than 24 hours

Note: If you try to read from an S3 bucket, this packaged version of hadoop does not have the jar file for the s3n scheme. This article is using HDP 2.3 Technical Preview, Spark 1.3.1 and hadoop 2.7.1.

comments powered by Disqus