About how to work with RDDs using scala

About how to work with RDDs using scala

I am doing a big data course with formacionhadoop.com, concretly, online master big data expert, 150 hours and i am going to write this post in order to remember in the future how to work with RDDs using scala code, a pure functional language for the JVM.

RDD means Resilient distributed datasets, basically means collections of data distributed between nodes with failover recover capabilities.

A RDD is a graph, a very special one, a Directed Acyclic Graph, go to the wikipedia to read a bit more about it.

I prefer to work with the spark-shell running spark jobs, so, lets start!

First at all, you will need a spark cluster. Do you have one?🙂 if the answer is no, you can download an image with a preinstalled
spark in pseudodistributed mode, which means that your host machine is going to host a complete spark cluster, with NameNodes, DataNodes,
WorkerNodes and so on so forth… go to this link,  and download your version.
There are docker, vmware, kvm and virtual box images.

Another way to work is to download spark in your local machine and run the spark-shell.

Are you ready? cool, lets get start uploading some files to your datalake, i mean, to your HDFS or hadoop data file system.

I am working with data provided by formacionhadoop.com, and in this moment i dont know if i can share it with you, but think about
you have to work with csv, txt, xml, json files…

I am going to upload a set of files that belongs to the inmortal Cervantes to my datalake with this command:

In my case, cervantes is a folder which contains a set of three folders, novela, poesia and teatro…

[cloudera@quickstart Downloads]$ hdfs dfs -put cervantes

Now, i check that files are finally there:

[cloudera@quickstart Downloads]$ hdfs dfs -ls /user/cloudera/cervantes
Found 4 items
-rw-r–r– 1 cloudera cloudera 6148 2016-05-02 16:07 /user/cloudera/cervantes/.DS_Store
drwxr-xr-x – cloudera cloudera 0 2016-05-02 16:07 /user/cloudera/cervantes/novela
drwxr-xr-x – cloudera cloudera 0 2016-05-02 16:07 /user/cloudera/cervantes/poesia
drwxr-xr-x – cloudera cloudera 0 2016-05-02 16:07 /user/cloudera/cervantes/teatro

Ok, we have files, lets run some commands in the spark-shell…

[cloudera@quickstart Downloads]$ spark-shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/05/02 16:10:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
16/05/02 16:10:22 INFO spark.SecurityManager: Changing view acls to: cloudera
16/05/02 16:10:22 INFO spark.SecurityManager: Changing modify acls to: cloudera
16/05/02 16:10:22 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
16/05/02 16:10:22 INFO spark.HttpServer: Starting HTTP Server
16/05/02 16:10:22 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/05/02 16:10:22 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:54113
16/05/02 16:10:22 INFO util.Utils: Successfully started service ‘HTTP class server’ on port 54113.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 1.3.0
/_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.

16/05/02 16:10:34 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
16/05/02 16:10:35 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.
scala> val myCervantesFile = sc.textFile(“/user/cloudera/cervantes/novela/quijote.txt”).flatMap(line=>line.split(” “)).map(word=>(word,1)).reduceByKey(_+_).saveAsTextFile(“output-quijote”)
16/05/02 16:15:37 INFO storage.MemoryStore: ensureFreeSpace(277083) called with curMem=323981, maxMem=280248975

16/05/02 16:15:40 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/05/02 16:15:40 INFO output.FileOutputCommitter: Saved output of task ‘attempt_201605021615_0001_m_000000_1’ to hdfs://quickstart.cloudera:8020/user/cloudera/output-quijote/_temporary/0/task_201605021615_0001_m_000000
16/05/02 16:15:40 INFO spark.SparkHadoopWriter: attempt_201605021615_0001_m_000000_1: Committed
16/05/02 16:15:40 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1828 bytes result sent to driver
16/05/02 16:15:40 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at <console>:21) finished in 1.051 s
16/05/02 16:15:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1055 ms on localhost (1/1)
16/05/02 16:15:40 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/05/02 16:15:40 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at <console>:21, took 2.561276 s
myCervantesFile: Unit = ()

scala>

Ok, it is a typical word count of the quijote.txt, the output is located in /user/cloudera/output-quijote as you can see…

[cloudera@quickstart Downloads]$ hdfs dfs -cat /user/cloudera/output-quijote/part-00000 | more
(alforjas,,1)
(viniese,4)
(invierno,1)
(despe�a,1)
(guilla,1)
(abusos,1)
(pu�os,1)
(perdoneis,1)

//ok, now, how many words have this file? lets figure it out… (5537)

scala> val myCervantesFile = sc.textFile(“/user/cloudera/cervantes/novela/quijote.txt”).count
16/05/02 16:21:34 INFO storage.MemoryStore: ensureFreeSpace(96516) called with curMem=215763, maxMem=280248975

myCervantesFile: Long = 5537

scala> 16/05/02 16:21:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 78 ms on localhost (1/1)
16/05/02 16:21:34 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool

//Now, running collect action, it returs an array of Strings…

scala> val myCervantesFile = sc.textFile(“/user/cloudera/cervantes/novela/quijote.txt”).collect()
16/05/02 16:23:44 INFO storage.MemoryStore: ensureFreeSpace(191295) called with curMem=0, maxMem=280248975

16/05/02 16:23:46 INFO scheduler.DAGScheduler: Job 0 finished: collect at <console>:21, took 0.373456 s
myCervantesFile: Array[String] = Array(EL INGENIOSO HIDALGO DON QUIJOTE DE LA MANCHA, “”, Miguel de Cervantes Saavedra, “”, ” Cap�tulo primero”, “”, ” Que trata de la condici�n y ejercicio del famoso hidalgo D.”, “Quijote de la “, ” Mancha”, “”, ” En un lugar de la Mancha, de cuyo nombre no quiero”, acordarme, no ha mucho tiempo que viv�a un hidalgo de los de, lanza en astillero, adarga antigua, roc�n flaco y galgo, corredor. Una olla de algo m�s vaca que carnero, salpic�n las, m�s noches, duelos y quebrantos los s�bados, lentejas los, viernes, alg�n palomino de a�adidura los domingos, consum�an las, tres partes de su hacienda. El resto della conclu�an sayo de, velarte, calzas de velludo para las fiestas con sus pantuflos de, lo mismo, los d�as de entre semana se honraba…
scala>
Now, i am going to work with web log files, with this format:
[cloudera@quickstart Downloads]$ hdfs dfs -cat /user/cloudera/weblogs/2014-02-04.log | tail -10
184.159.249.194 – 175 [04/Feb/2014:00:02:31 +0100] “GET /KBDOC-00241.html HTTP/1.0” 200 683 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”
184.159.249.194 – 175 [04/Feb/2014:00:02:31 +0100] “GET /theme.css HTTP/1.0” 200 11495 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”
249.38.156.33 – 2490 [04/Feb/2014:00:01:41 +0100] “GET /KBDOC-00285.html HTTP/1.0” 200 10618 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser iFruit 1”
249.38.156.33 – 2490 [04/Feb/2014:00:01:41 +0100] “GET /theme.css HTTP/1.0” 200 16703 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser iFruit 1”
178.90.177.183 – 80837 [04/Feb/2014:00:01:24 +0100] “GET /KBDOC-00013.html HTTP/1.0” 200 7914 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser MeeToo 3.0”
178.90.177.183 – 80837 [04/Feb/2014:00:01:24 +0100] “GET /theme.css HTTP/1.0” 200 11500 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser MeeToo 3.0”
180.208.224.172 – 194 [04/Feb/2014:00:01:07 +0100] “GET /KBDOC-00113.html HTTP/1.0” 200 8977 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”
180.208.224.172 – 194 [04/Feb/2014:00:01:07 +0100] “GET /theme.css HTTP/1.0” 200 9564 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”
226.221.167.76 – 105387 [04/Feb/2014:00:00:44 +0100] “GET /KBDOC-00170.html HTTP/1.0” 200 7361 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F41L”
226.221.167.76 – 105387 [04/Feb/2014:00:00:44 +0100] “GET /theme.css HTTP/1.0” 200 13388 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F41L”

Where the first field is the ip, the second field is -, the third field represents an userId, so on so forth…

Now, creating a RDD from a single log file:

scala> val webLogFile = sc.textFile(“/user/cloudera/weblogs/2014-02-04.log”)
16/05/02 16:38:21 INFO storage.MemoryStore: ensureFreeSpace(96516) called with curMem=210739, maxMem=280248975
16/05/02 16:38:21 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 94.3 KB, free 267.0 MB)
16/05/02 16:38:21 INFO storage.MemoryStore: ensureFreeSpace(21083) called with curMem=307255, maxMem=280248975
16/05/02 16:38:21 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 20.6 KB, free 267.0 MB)
16/05/02 16:38:21 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:35426 (size: 20.6 KB, free: 267.2 MB)
16/05/02 16:38:21 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0
16/05/02 16:38:21 INFO spark.SparkContext: Created broadcast 2 from textFile at <console>:21
webLogFile: org.apache.spark.rdd.RDD[String] = /user/cloudera/weblogs/2014-02-04.log MapPartitionsRDD[3] at textFile at <console>:21

Now, creating a new RDD starting from the last one, in this case, filtering the lines with .jpg and taking 5 lines from them:

scala> val jpgFiles = webLogFile.filter(line=>line.contains(“.jpg”))
jpgFiles: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:23
scala> jpgFiles.take(5)
16/05/02 16:41:19 INFO mapred.FileInputFormat: Total input paths to process : 1

16/05/02 16:41:19 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 49 ms on localhost (1/1)
res0: Array[String] = Array(65.103.192.86 – 88816 [04/Feb/2014:23:58:34 +0100] “GET /titanic_2400.jpg HTTP/1.0” 200 2935 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser iFruit 3A”, 102.100.4.122 – 40439 [04/Feb/2014:23:58:20 +0100] “GET /ronin_s1.jpg HTTP/1.0” 200 13346 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F41L”, 215.166.217.166 – 40984 [04/Feb/2014:23:54:39 +0100] “GET /sorrento_f21l.jpg HTTP/1.0” 200 11038 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F11L”, 79.242.61.21 – 46925 [04/Feb/2014:23:50:52 +0100] “GET /ifruit_4.jpg HTTP/1.0” 200 5951 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser MeeToo 2.0”, 73.233.61.30 – 75640 [04/Feb/2014:23:42:48 +0100] “GET /ifruit_3.jpg HTTP/1.0” 200 14506 “http://www.loudacre.com&#8221; “Loudacre Mobile Bro…
scala> 16/05/02 16:41:19 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
//how many jpgs entries are in this file? (408)
scala> jpgFiles.count()
16/05/02 16:43:54 INFO spark.SparkContext: Starting job: count at <console>:26

16/05/02 16:43:54 INFO scheduler.DAGScheduler: Job 2 finished: count at <console>:26, took 0.081684 s
res1: Long = 408
scala> 16/05/02 16:43:54 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 71 ms on localhost (1/1)
16/05/02 16:43:54 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool

Now, i am going to show how to load only the ips, remember, the first entry…

scala> webLogFile.map(line=>line.split(” “)).map(word=>word(0)).foreach(println)
223.71.22.212
145.160.69.65
145.160.69.65
118.190.236.173
118.190.236.173
147.123.27.79
147.123.27.79
147.123.27.79
147.123.27.79
68.187.208.51
68.187.208.51
196.229.69.187
196.229.69.187
20.241.124.96
20.241.124.96
184.159.249.194
184.159.249.194
249.38.156.33
249.38.156.33
178.90.177.183
178.90.177.183
180.208.224.172
180.208.224.172
226.221.167.76
226.221.167.76

Now, i am going to show ips and userId, field zero and field two:

scala> webLogFile.map(line=>line.split(” “)).map(word=>(word(0),word(2))).foreach(println)
(145.160.69.65,60901)
(118.190.236.173,155)
(118.190.236.173,155)
(147.123.27.79,98897)
(147.123.27.79,98897)
(147.123.27.79,98897)
(147.123.27.79,98897)
(68.187.208.51,134)
(68.187.208.51,134)
(196.229.69.187,31704)
(196.229.69.187,31704)
(20.241.124.96,11851)
(20.241.124.96,11851)
(184.159.249.194,175)
(184.159.249.194,175)
(249.38.156.33,2490)
(249.38.156.33,2490)
(178.90.177.183,80837)
(178.90.177.183,80837)
(180.208.224.172,194)
(180.208.224.172,194)
(226.221.167.76,105387)
(226.221.167.76,105387)

Do you want more fields?🙂

scala> webLogFile.map(line=>line.split(” “)).map(word=>(word(0),word(2),..,word(n-1))).foreach(println)

//now, i am going to load every file located within /user/cloudera/weblogs, filter files that contains .html and take only userID (line(2)) and ip (line(0))

scala> val htmlLogs = sc.textFile(“/user/cloudera/weblogs/*”).filter(line=>line.contains(“.html”)).map(line=>line.split(” “)).map(line=>(line(2),line(0)))
16/05/02 16:59:05 INFO storage.MemoryStore: ensureFreeSpace(277083) called with curMem=420894, maxMem=280248975

htmlLogs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[22] at map at <console>:21

//now i am going to show them following this pattern: userId/ip

scala> htmlLogs.take(5).foreach(line => println(line._1 + “/” + line._2))
16/05/02 16:59:55 INFO mapred.FileInputFormat: Total input paths to process : 68
16/05/02 16:59:55 INFO spark.SparkContext: Starting job: take at <console>:24
16/05/02 16:59:55 INFO scheduler.DAGScheduler: Got job 8 (take at <console>:24) with 1 output partitions (allowLocal=true)

128/116.180.70.237
94/218.193.16.244
131/198.122.118.164
85365/103.17.173.248
16261/233.19.62.103

cool, huh?

//now, i am going to filter lines with .html and create key value pairs with this format:
key is userId, located in file with line(2)
value is the complete line…

scala> val userCounts = webLogFiles.filter(lines=>lines.contains(“.html”)).keyBy(line=>line(2))
userCounts: org.apache.spark.rdd.RDD[(Char, String)] = MapPartitionsRDD[38] at keyBy at <console>:23
scala> userCounts.take(5).foreach(println)
16/05/02 17:15:57 INFO spark.SparkContext: Starting job: take at <console>:26

16/05/02 17:15:57 INFO scheduler.DAGScheduler: Job 13 finished: take at <console>:26, took 0.032779 s
(6,116.180.70.237 – 128 [15/Sep/2013:23:59:53 +0100] “GET /KBDOC-00031.html HTTP/1.0” 200 1388 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”)
(8,218.193.16.244 – 94 [15/Sep/2013:23:58:45 +0100] “GET /KBDOC-00273.html HTTP/1.0” 200 5325 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”)
(8,198.122.118.164 – 131 [15/Sep/2013:23:58:02 +0100] “GET /KBDOC-00117.html HTTP/1.0” 200 15818 “http://www.loudacre.com&#8221; “Loudacre CSR Browser”)
(3,103.17.173.248 – 85365 [15/Sep/2013:23:56:07 +0100] “GET /KBDOC-00128.html HTTP/1.0” 200 14255 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F24L”)
(3,233.19.62.103 – 16261 [15/Sep/2013:23:55:57 +0100] “GET /titanic_1100_sales.html HTTP/1.0” 200 8924 “http://www.loudacre.com&#8221; “Loudacre Mobile Browser Sorrento F10L”)

It will continue…
Interesting links

https://en.wikipedia.org/wiki/Directed_acyclic_graph

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s