Hi, recently i was receiving classes from formacionhadoop.com, Master online big data expert, 150 hours, which means that i received notions about hadoop, spark and nosql databases. A good course, i recommend it to everyone to learn the basis of big data technology. I already have taken classes from Andrew Ng with its  Machine Learning coursera course.

Well, after this classes, i wanted to create my own version of a recommendation system after learning from another with their own version. I have learned from them a lot of things, but i missed that those versions does not use any broker message system in order to feed the machine learning algorithm as soon as a customer takes any action with the store, like buying or rating an article. So, after i saw clearly, i decided to code my own solution, also, the best way to learn something is to put hands dirty with code.

The idea is, somebody buys something or rate something from a store, i mean, you are in Amazon and you like something, providing a rating, that rating goes to an asynchronous broker message system, the backend system receives the rating with your id and product id, as soon as it arrives, the system is going to calculate some recommendations for you, so, the next time you arrive to the store, the system is going to show you something cool.

That is the basis, i coded the system with Apache kafka as the asynchronous message broker system, because it is scalable and pretty cool, i mean, probably there are another systems, like any another java message systems, but kafka is open source and scales pretty well adding more instances to the cluster. Apache Kafka needs Apache Spark in order to connect data from the outside and feeding the machine learning algorithms and feed the nosql or even a sql database with the results. The final step is to connect the front end with this database using the web socket technology in order to show the results as soon as possible.

In detail, kafka is connected with spark using the direct approach, i mean, the spark streaming process is living forever waiting for new entries from Apache Spark, that is the maindifference with the another approach.

Lets see the code:

val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    //this checkpointdir should be in a conf file, for now it is hardcoded!
    val streamingCheckpointDir = "/Users/aironman/my-recommendation-spark-engine/checkpoint"

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    println("Initialized Streaming Spark Context and kafka connector...")

    //create recomendation module
    println("Creating rating recommender module...")
    val ratingFile= "hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv"
    val recommender = new Recommender(sc,ratingFile)
    println("Initialized rating recommender module...")
//This is the most interesting part, receive the json message from kafka broker and feed the recommender system
    messages.foreachRDD(rdd => {
     val count = rdd.count()
     if (count > 0){
       val json= rdd.map(_._2)
       val dataFrame = sqlContext.read.json(json) //converts json to DF
       val myRow = dataFrame.select(dataFrame("userId"),dataFrame("productId"),dataFrame("rating")).take(count.toInt)
       println("myRow is: " + myRow)
       //case class AmazonRating(userId: String, productId: String, rating: Double)
       val myAmazonRating = AmazonRating(myRow(0).getString(0), myRow(0).getString(1), myRow(0).getDouble(2))
       println("myAmazonRating is: " + myAmazonRating.toString)
       val arrayAmazonRating = Array(myAmazonRating)
       //this method needs Seq[AmazonRating]
      case e: IllegalArgumentException => {println("illegal arg. exception")};
      case e: IllegalStateException    => {println("illegal state exception")};
      case e: ClassCastException       => {println("ClassCastException")};
      case e: Exception                => {println(" Generic Exception")};

      println("Finished taking data from kafka topic...")
def predict(ratings: Seq[AmazonRating]) = {
// train model
val myRatings = ratings.map(toSparkRating)
val myRatingRDD = sc.parallelize(myRatings)

val startAls = DateTime.now
val model = ALS.train((sparkRatings ++ myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)

val myProducts = myRatings.map(_.product).toSet
val candidates = sc.parallelize((0 until productDict.size).filterNot(myProducts.contains))

// get ratings of all products not in my history ordered by rating (higher first) and only keep the first NumRecommendations
val myUserId = userDict.getIndex(MyUsername)
val recommendations = model.predict(candidates.map((myUserId, _))).collect
val endAls = DateTime.now
val result = recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds

println(s"ALS Time: $alsTime seconds")

as you can see, actual code only uses ALS algorithm, a collaborative filtering algorithm provided by Apache Spark mllib, a really good version will use at least three of this kind of algorithm, but it is enough for now, i will update the project soon. You can learn more following the fantastic guide of Databricks. Also, in a future, i will finish the front end and gather the results with a nosql or a sql database, maybe mongodb or elastic search.

The project is located in github, feel free to download it and play with it.

This is the way to run the code within a cloudera cluster, the project has sbt-pack support in order to generate unix commands, but i found a lot of problems trying to run it programmatically, so, the recommended way to run it within a cloudera cluster is using yarn.

The project is compiled with scala and sbt, so, download the project, go to the folder, run within a terminal the next commands:



pack //to generate unix commands, but not necessary

then, upload the jar to your cluster and use this command, that is all


//yarn mode
spark-submit –class example.spark.AmazonKafkaConnectorWithMongo –master yarn –deploy-mode cluster –driver-memory 1024Mb –executor-memory 1G –executor-cores 1 /home/cloudera/awesome-recommendation-engine/target/scala-2.10/my-recommendation-spark-engine_2.10-1.0-SNAPSHOT.jar amazonRatingsTopic

Have fun and be nice!



Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s