About how to work with RDDs using scala

About how to work with RDDs using scala

I am doing a big data course with formacionhadoop.com, concretly, online master big data expert, 150 hours and i am going to write this post in order to remember in the future how to work with RDDs using scala code, a pure functional language for the JVM.

RDD means Resilient distributed datasets, basically means collections of data distributed between nodes with failover recover capabilities.

A RDD is a graph, a very special one, a Directed Acyclic Graph, go to the wikipedia to read a bit more about it.

I prefer to work with the spark-shell running spark jobs, so, lets start!

First at all, you will need a spark cluster. Do you have one? ­čÖé if the answer is no, you can download an image with a preinstalled
spark in pseudodistributed mode, which means that your host machine is going to host a complete spark cluster, with NameNodes, DataNodes,
WorkerNodes and so on so forth… go to this link, ┬áand download your version.
There are docker, vmware, kvm and virtual box images.

Another way to work is to download spark in your local machine and run the spark-shell.

Are you ready? cool, lets get start uploading some files to your datalake, i mean, to your HDFS or hadoop data file system.

I am working with data provided by formacionhadoop.com, and in this moment i dont know if i can share it with you, but think about
you have to work with csv, txt, xml, json files…

I am going to upload a set of files that belongs to the inmortal Cervantes to my datalake with this command:

In my case, cervantes is a folder which contains a set of three folders, novela, poesia and teatro…

[cloudera@quickstart Downloads]$ hdfs dfs -put cervantes

Now, i check that files are finally there:

[cloudera@quickstart Downloads]$ hdfs dfs -ls /user/cloudera/cervantes
Found 4 items
-rw-r–r– 1 cloudera cloudera 6148 2016-05-02 16:07 /user/cloudera/cervantes/.DS_Store
drwxr-xr-x – cloudera cloudera 0 2016-05-02 16:07 /user/cloudera/cervantes/novela
drwxr-xr-x – cloudera cloudera 0 2016-05-02 16:07 /user/cloudera/cervantes/poesia
drwxr-xr-x – cloudera cloudera 0 2016-05-02 16:07 /user/cloudera/cervantes/teatro

Ok, we have files, lets run some commands in the spark-shell…

[cloudera@quickstart Downloads]$ spark-shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/05/02 16:10:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
16/05/02 16:10:22 INFO spark.SecurityManager: Changing view acls to: cloudera
16/05/02 16:10:22 INFO spark.SecurityManager: Changing modify acls to: cloudera
16/05/02 16:10:22 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
16/05/02 16:10:22 INFO spark.HttpServer: Starting HTTP Server
16/05/02 16:10:22 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/05/02 16:10:22 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:54113
16/05/02 16:10:22 INFO util.Utils: Successfully started service ‘HTTP class server’ on port 54113.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 1.3.0
/_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.

16/05/02 16:10:34 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
16/05/02 16:10:35 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.
scala> val myCervantesFile = sc.textFile(“/user/cloudera/cervantes/novela/quijote.txt”).flatMap(line=>line.split(” “)).map(word=>(word,1)).reduceByKey(_+_).saveAsTextFile(“output-quijote”)
16/05/02 16:15:37 INFO storage.MemoryStore: ensureFreeSpace(277083) called with curMem=323981, maxMem=280248975

16/05/02 16:15:40 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/05/02 16:15:40 INFO output.FileOutputCommitter: Saved output of task ‘attempt_201605021615_0001_m_000000_1’ to hdfs://quickstart.cloudera:8020/user/cloudera/output-quijote/_temporary/0/task_201605021615_0001_m_000000
16/05/02 16:15:40 INFO spark.SparkHadoopWriter: attempt_201605021615_0001_m_000000_1: Committed
16/05/02 16:15:40 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1828 bytes result sent to driver
16/05/02 16:15:40 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at <console>:21) finished in 1.051 s
16/05/02 16:15:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1055 ms on localhost (1/1)
16/05/02 16:15:40 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/05/02 16:15:40 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at <console>:21, took 2.561276 s
myCervantesFile: Unit = ()

scala>

Ok, it is a typical word count of the quijote.txt, the output is located in /user/cloudera/output-quijote as you can see…

[cloudera@quickstart Downloads]$ hdfs dfs -cat /user/cloudera/output-quijote/part-00000 | more
(alforjas,,1)
(viniese,4)
(invierno,1)
(despe´┐Ża,1)
(guilla,1)
(abusos,1)
(pu´┐Żos,1)
(perdoneis,1)

//ok, now, how many words have this file? lets figure it out… (5537)

scala> val myCervantesFile = sc.textFile(“/user/cloudera/cervantes/novela/quijote.txt”).count
16/05/02 16:21:34 INFO storage.MemoryStore: ensureFreeSpace(96516) called with curMem=215763, maxMem=280248975

myCervantesFile: Long = 5537

scala> 16/05/02 16:21:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 78 ms on localhost (1/1)
16/05/02 16:21:34 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool

//Now, running collect action, it returs an array of Strings…

scala> val myCervantesFile = sc.textFile(“/user/cloudera/cervantes/novela/quijote.txt”).collect()
16/05/02 16:23:44 INFO storage.MemoryStore: ensureFreeSpace(191295) called with curMem=0, maxMem=280248975

16/05/02 16:23:46 INFO scheduler.DAGScheduler: Job 0 finished: collect at <console>:21, took 0.373456 s
myCervantesFile: Array[String] = Array(EL INGENIOSO HIDALGO DON QUIJOTE DE LA MANCHA, “”, Miguel de Cervantes Saavedra, “”, ” Cap´┐Żtulo primero”, “”, ” Que trata de la condici´┐Żn y ejercicio del famoso hidalgo D.”, “Quijote de la “, ” Mancha”, “”, ” En un lugar de la Mancha, de cuyo nombre no quiero”, acordarme, no ha mucho tiempo que viv´┐Ża un hidalgo de los de, lanza en astillero, adarga antigua, roc´┐Żn flaco y galgo, corredor. Una olla de algo m´┐Żs vaca que carnero, salpic´┐Żn las, m´┐Żs noches, duelos y quebrantos los s´┐Żbados, lentejas los, viernes, alg´┐Żn palomino de a´┐Żadidura los domingos, consum´┐Żan las, tres partes de su hacienda. El resto della conclu´┐Żan sayo de, velarte, calzas de velludo para las fiestas con sus pantuflos de, lo mismo, los d´┐Żas de entre semana se honraba…
scala>
Now, i am going to work with web log files, with this format:
[cloudera@quickstart Downloads]$ hdfs dfs -cat /user/cloudera/weblogs/2014-02-04.log | tail -10
184.159.249.194 – 175 [04/Feb/2014:00:02:31 +0100] “GET /KBDOC-00241.html HTTP/1.0” 200 683 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”
184.159.249.194 – 175 [04/Feb/2014:00:02:31 +0100] “GET /theme.css HTTP/1.0” 200 11495 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”
249.38.156.33 – 2490 [04/Feb/2014:00:01:41 +0100] “GET /KBDOC-00285.html HTTP/1.0” 200 10618 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser iFruit 1”
249.38.156.33 – 2490 [04/Feb/2014:00:01:41 +0100] “GET /theme.css HTTP/1.0” 200 16703 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser iFruit 1”
178.90.177.183 – 80837 [04/Feb/2014:00:01:24 +0100] “GET /KBDOC-00013.html HTTP/1.0” 200 7914 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser MeeToo 3.0”
178.90.177.183 – 80837 [04/Feb/2014:00:01:24 +0100] “GET /theme.css HTTP/1.0” 200 11500 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser MeeToo 3.0”
180.208.224.172 – 194 [04/Feb/2014:00:01:07 +0100] “GET /KBDOC-00113.html HTTP/1.0” 200 8977 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”
180.208.224.172 – 194 [04/Feb/2014:00:01:07 +0100] “GET /theme.css HTTP/1.0” 200 9564 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”
226.221.167.76 – 105387 [04/Feb/2014:00:00:44 +0100] “GET /KBDOC-00170.html HTTP/1.0” 200 7361 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F41L”
226.221.167.76 – 105387 [04/Feb/2014:00:00:44 +0100] “GET /theme.css HTTP/1.0” 200 13388 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F41L”

Where the first field is the ip, the second field is -, the third field represents an userId, so on so forth…

Now, creating a RDD from a single log file:

scala> val webLogFile = sc.textFile(“/user/cloudera/weblogs/2014-02-04.log”)
16/05/02 16:38:21 INFO storage.MemoryStore: ensureFreeSpace(96516) called with curMem=210739, maxMem=280248975
16/05/02 16:38:21 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 94.3 KB, free 267.0 MB)
16/05/02 16:38:21 INFO storage.MemoryStore: ensureFreeSpace(21083) called with curMem=307255, maxMem=280248975
16/05/02 16:38:21 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 20.6 KB, free 267.0 MB)
16/05/02 16:38:21 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:35426 (size: 20.6 KB, free: 267.2 MB)
16/05/02 16:38:21 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0
16/05/02 16:38:21 INFO spark.SparkContext: Created broadcast 2 from textFile at <console>:21
webLogFile: org.apache.spark.rdd.RDD[String] = /user/cloudera/weblogs/2014-02-04.log MapPartitionsRDD[3] at textFile at <console>:21

Now, creating a new RDD starting from the last one, in this case, filtering the lines with .jpg and taking 5 lines from them:

scala> val jpgFiles = webLogFile.filter(line=>line.contains(“.jpg”))
jpgFiles: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:23
scala> jpgFiles.take(5)
16/05/02 16:41:19 INFO mapred.FileInputFormat: Total input paths to process : 1

16/05/02 16:41:19 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 49 ms on localhost (1/1)
res0: Array[String] = Array(65.103.192.86 – 88816 [04/Feb/2014:23:58:34 +0100] “GET /titanic_2400.jpg HTTP/1.0” 200 2935 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser iFruit 3A”, 102.100.4.122 – 40439 [04/Feb/2014:23:58:20 +0100] “GET /ronin_s1.jpg HTTP/1.0” 200 13346 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F41L”, 215.166.217.166 – 40984 [04/Feb/2014:23:54:39 +0100] “GET /sorrento_f21l.jpg HTTP/1.0” 200 11038 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F11L”, 79.242.61.21 – 46925 [04/Feb/2014:23:50:52 +0100] “GET /ifruit_4.jpg HTTP/1.0” 200 5951 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser MeeToo 2.0”, 73.233.61.30 – 75640 [04/Feb/2014:23:42:48 +0100] “GET /ifruit_3.jpg HTTP/1.0” 200 14506 “http://www.loudacre.com&#8221; “Loudacre Mobile Bro…
scala> 16/05/02 16:41:19 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
//how many jpgs entries are in this file? (408)
scala> jpgFiles.count()
16/05/02 16:43:54 INFO spark.SparkContext: Starting job: count at <console>:26

16/05/02 16:43:54 INFO scheduler.DAGScheduler: Job 2 finished: count at <console>:26, took 0.081684 s
res1: Long = 408
scala> 16/05/02 16:43:54 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 71 ms on localhost (1/1)
16/05/02 16:43:54 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool

Now, i am going to show how to load only the ips, remember, the first entry…

scala> webLogFile.map(line=>line.split(” “)).map(word=>word(0)).foreach(println)
223.71.22.212
145.160.69.65
145.160.69.65
118.190.236.173
118.190.236.173
147.123.27.79
147.123.27.79
147.123.27.79
147.123.27.79
68.187.208.51
68.187.208.51
196.229.69.187
196.229.69.187
20.241.124.96
20.241.124.96
184.159.249.194
184.159.249.194
249.38.156.33
249.38.156.33
178.90.177.183
178.90.177.183
180.208.224.172
180.208.224.172
226.221.167.76
226.221.167.76

Now, i am going to show ips and userId, field zero and field two:

scala> webLogFile.map(line=>line.split(” “)).map(word=>(word(0),word(2))).foreach(println)
(145.160.69.65,60901)
(118.190.236.173,155)
(118.190.236.173,155)
(147.123.27.79,98897)
(147.123.27.79,98897)
(147.123.27.79,98897)
(147.123.27.79,98897)
(68.187.208.51,134)
(68.187.208.51,134)
(196.229.69.187,31704)
(196.229.69.187,31704)
(20.241.124.96,11851)
(20.241.124.96,11851)
(184.159.249.194,175)
(184.159.249.194,175)
(249.38.156.33,2490)
(249.38.156.33,2490)
(178.90.177.183,80837)
(178.90.177.183,80837)
(180.208.224.172,194)
(180.208.224.172,194)
(226.221.167.76,105387)
(226.221.167.76,105387)

Do you want more fields? ­čÖé

scala> webLogFile.map(line=>line.split(” “)).map(word=>(word(0),word(2),..,word(n-1))).foreach(println)

//now, i am going to load every file located within /user/cloudera/weblogs, filter files that contains .html and take only userID (line(2)) and ip (line(0))

scala> val htmlLogs = sc.textFile(“/user/cloudera/weblogs/*”).filter(line=>line.contains(“.html”)).map(line=>line.split(” “)).map(line=>(line(2),line(0)))
16/05/02 16:59:05 INFO storage.MemoryStore: ensureFreeSpace(277083) called with curMem=420894, maxMem=280248975

htmlLogs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[22] at map at <console>:21

//now i am going to show them following this pattern: userId/ip

scala> htmlLogs.take(5).foreach(line => println(line._1 + “/” + line._2))
16/05/02 16:59:55 INFO mapred.FileInputFormat: Total input paths to process : 68
16/05/02 16:59:55 INFO spark.SparkContext: Starting job: take at <console>:24
16/05/02 16:59:55 INFO scheduler.DAGScheduler: Got job 8 (take at <console>:24) with 1 output partitions (allowLocal=true)

128/116.180.70.237
94/218.193.16.244
131/198.122.118.164
85365/103.17.173.248
16261/233.19.62.103

cool, huh?

//now, i am going to filter lines with .html and create key value pairs with this format:
key is userId, located in file with line(2)
value is the complete line…

scala> val userCounts = webLogFiles.filter(lines=>lines.contains(“.html”)).keyBy(line=>line(2))
userCounts: org.apache.spark.rdd.RDD[(Char, String)] = MapPartitionsRDD[38] at keyBy at <console>:23
scala> userCounts.take(5).foreach(println)
16/05/02 17:15:57 INFO spark.SparkContext: Starting job: take at <console>:26

16/05/02 17:15:57 INFO scheduler.DAGScheduler: Job 13 finished: take at <console>:26, took 0.032779 s
(6,116.180.70.237 – 128 [15/Sep/2013:23:59:53 +0100] “GET /KBDOC-00031.html HTTP/1.0” 200 1388 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”)
(8,218.193.16.244 – 94 [15/Sep/2013:23:58:45 +0100] “GET /KBDOC-00273.html HTTP/1.0” 200 5325 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”)
(8,198.122.118.164 – 131 [15/Sep/2013:23:58:02 +0100] “GET /KBDOC-00117.html HTTP/1.0” 200 15818 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”)
(3,103.17.173.248 – 85365 [15/Sep/2013:23:56:07 +0100] “GET /KBDOC-00128.html HTTP/1.0” 200 14255 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F24L”)
(3,233.19.62.103 – 16261 [15/Sep/2013:23:55:57 +0100] “GET /titanic_1100_sales.html HTTP/1.0” 200 8924 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F10L”)

It will continue…
Interesting links

https://en.wikipedia.org/wiki/Directed_acyclic_graph

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s