mis notas sobre scala y spark

Estoy usando una imagen vmware para correr un hdfs pseudo distribuido junto con apache spark, en concreto una de cloudera, cloudera-quickstart-vm-5.4.2.0. Dicha imagen te la puedes bajar de http://www.cloudera.com/content/cloudera/en/downloads/quickstart_vms/cdh-5-4-x.html

Una vez que tienes descargada y funcionando la imagen vmware y puedes acceder al hdfs a través de la herramienta Hue, puedes cargar tus propios ficheros de prueba. Estos ficheros pueden pesar lo que ud quiera, siempre pensando que estas aprendiendo y que igual no es buena idea cargar ficheros de muchos GB o TB en tu pequeño portátil, solo ten en cuenta que la naturaleza de un HDFS es poder cargar ficheros grandes entre todos los hdd y memoria principal de tu cluster.

En mi browser puedo navegar a esta url para acceder a este fichero, que pesa unos 800 MB:

http://192.168.30.147:8888/filebrowser/view/tmp/labdata/sparkdata/nyctaxi.csv

Abre tu scala-shell en la terminal de la imagen vmware y escribe estos comandos:

scala> val text_file = sc.textFile(“hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv”)

15/09/14 17:35:01 INFO storage.MemoryStore: ensureFreeSpace(277083) called with curMem=626819, maxMem=280248975
15/09/14 17:35:01 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 270.6 KB, free 266.4 MB)
15/09/14 17:35:02 INFO storage.MemoryStore: ensureFreeSpace(21083) called with curMem=903902, maxMem=280248975
15/09/14 17:35:02 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 20.6 KB, free 266.4 MB)
15/09/14 17:35:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:43300 (size: 20.6 KB, free: 267.2 MB)
15/09/14 17:35:02 INFO storage.BlockManagerMaster: Updated info of block broadcast_4_piece0
15/09/14 17:35:02 INFO spark.SparkContext: Created broadcast 4 from textFile at <console>:21
text_file: org.apache.spark.rdd.RDD[String] = hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv MapPartitionsRDD[10] at textFile at <console>:21

scala> val errors = text_file.filter(line => line.contains(“ERROR”))
errors: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:23

scala> errors.count
15/09/14 17:36:23 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/14 17:36:24 INFO spark.SparkContext: Starting job: count at <console>:26
15/09/14 17:36:24 INFO scheduler.DAGScheduler: Got job 1 (count at <console>:26) with 7 output partitions (allowLocal=false)
15/09/14 17:36:24 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at <console>:26)
15/09/14 17:36:24 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/09/14 17:36:24 INFO scheduler.DAGScheduler: Missing parents: List()
15/09/14 17:36:24 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[11] at filter at <console>:23), which has no missing parents
15/09/14 17:36:24 INFO storage.MemoryStore: ensureFreeSpace(2888) called with curMem=924985, maxMem=280248975
15/09/14 17:36:24 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 2.8 KB, free 266.4 MB)
15/09/14 17:36:24 INFO storage.MemoryStore: ensureFreeSpace(1785) called with curMem=927873, maxMem=280248975
15/09/14 17:36:24 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 1785.0 B, free 266.4 MB)
15/09/14 17:36:24 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:43300 (size: 1785.0 B, free: 267.2 MB)
15/09/14 17:36:24 INFO storage.BlockManagerMaster: Updated info of block broadcast_5_piece0
15/09/14 17:36:24 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:839
15/09/14 17:36:24 INFO scheduler.DAGScheduler: Submitting 7 missing tasks from Stage 1 (MapPartitionsRDD[11] at filter at <console>:23)
15/09/14 17:36:24 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 7 tasks
15/09/14 17:36:24 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 7, localhost, ANY, 1324 bytes)
15/09/14 17:36:24 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 7)
15/09/14 17:36:24 INFO rdd.HadoopRDD: Input split: hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv:0+134217728
15/09/14 17:36:24 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 7). 1830 bytes result sent to driver
15/09/14 17:36:24 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 8, localhost, ANY, 1324 bytes)
15/09/14 17:36:24 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 7) in 776 ms on localhost (1/7)
15/09/14 17:36:24 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 8)
15/09/14 17:36:24 INFO rdd.HadoopRDD: Input split: hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv:134217728+134217728
15/09/14 17:36:25 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 8). 1830 bytes result sent to driver
15/09/14 17:36:25 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 9, localhost, ANY, 1324 bytes)
15/09/14 17:36:25 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 8) in 721 ms on localhost (2/7)
15/09/14 17:36:25 INFO executor.Executor: Running task 2.0 in stage 1.0 (TID 9)
15/09/14 17:36:25 INFO rdd.HadoopRDD: Input split: hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv:268435456+134217728
15/09/14 17:36:26 INFO executor.Executor: Finished task 2.0 in stage 1.0 (TID 9). 1830 bytes result sent to driver
15/09/14 17:36:26 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 10, localhost, ANY, 1324 bytes)
15/09/14 17:36:26 INFO executor.Executor: Running task 3.0 in stage 1.0 (TID 10)
15/09/14 17:36:26 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 9) in 732 ms on localhost (3/7)
15/09/14 17:36:26 INFO rdd.HadoopRDD: Input split: hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv:402653184+134217728
15/09/14 17:36:26 INFO executor.Executor: Finished task 3.0 in stage 1.0 (TID 10). 1830 bytes result sent to driver
15/09/14 17:36:26 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 1.0 (TID 11, localhost, ANY, 1324 bytes)
15/09/14 17:36:26 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 1.0 (TID 10) in 706 ms on localhost (4/7)
15/09/14 17:36:26 INFO executor.Executor: Running task 4.0 in stage 1.0 (TID 11)
15/09/14 17:36:26 INFO rdd.HadoopRDD: Input split: hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv:536870912+134217728
15/09/14 17:36:27 INFO executor.Executor: Finished task 4.0 in stage 1.0 (TID 11). 1830 bytes result sent to driver
15/09/14 17:36:27 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 1.0 (TID 12, localhost, ANY, 1324 bytes)
15/09/14 17:36:27 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 11) in 673 ms on localhost (5/7)
15/09/14 17:36:27 INFO executor.Executor: Running task 5.0 in stage 1.0 (TID 12)
15/09/14 17:36:27 INFO rdd.HadoopRDD: Input split: hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv:671088640+134217728
15/09/14 17:36:28 INFO executor.Executor: Finished task 5.0 in stage 1.0 (TID 12). 1830 bytes result sent to driver
15/09/14 17:36:28 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 1.0 (TID 13, localhost, ANY, 1324 bytes)
15/09/14 17:36:28 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 1.0 (TID 12) in 711 ms on localhost (6/7)
15/09/14 17:36:28 INFO executor.Executor: Running task 6.0 in stage 1.0 (TID 13)
15/09/14 17:36:28 INFO rdd.HadoopRDD: Input split: hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv:805306368+44633003
15/09/14 17:36:28 INFO executor.Executor: Finished task 6.0 in stage 1.0 (TID 13). 1830 bytes result sent to driver
15/09/14 17:36:28 INFO scheduler.DAGScheduler: Stage 1 (count at <console>:26) finished in 4.578 s
15/09/14 17:36:28 INFO scheduler.DAGScheduler: Job 1 finished: count at <console>:26, took 4.594608 s
res3: Long = 0

scala> 15/09/14 17:36:28 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 13) in 272 ms on localhost (7/7)
15/09/14 17:36:28 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

Que hemos hecho?

  1. cargar un fichero desde el hdfs con esta instrucción scala> val text_file = sc.textFile(“hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/nyctaxi.csv”)
  2. filtrar el fichero buscando la palabra ERROR con esta instrucción scala> val errors = text_file.filter(line => line.contains(“ERROR”))
  3. contar el numero de errores con la siguiente instrucción, scala> errors.count, por cierto, a mi me salen cero errores

Este es el ejemplo mas sencillo que se puede hacer con spark , buscar en un fichero de texto cargado en el HDFS un patron y calcular el numero de ocurrencias.

Otro ejemplo, contar el numero de ocurrencias que hay en un fichero construyendo un conjunto de fichero de pares (String,Int) y después lo voy a guardar en un fichero de texto del hdfs, para este ejemplo, voy a cargar otro fichero, mas pequeño:

El fichero tiene este contenido:

1,BarackObama,Barack Obama
2,ladygaga,Goddess of Love
3,jeresig,John Resig
4,justinbieber,Justin Bieber
6,matei_zaharia,Matei Zaharia
7,odersky,Martin Odersky
8,anonsys

Cargamos el fichero en una variable de solo lectura users_text_file:

scala> val users_text_file = sc.textFile(“hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/users.txt”)

15/09/14 18:21:27 INFO storage.MemoryStore: ensureFreeSpace(277083) called with curMem=1398548, maxMem=280248975
15/09/14 18:21:27 INFO storage.MemoryStore: Block broadcast_19 stored as values in memory (estimated size 270.6 KB, free 265.7 MB)
15/09/14 18:21:27 INFO storage.MemoryStore: ensureFreeSpace(21083) called with curMem=1675631, maxMem=280248975
15/09/14 18:21:27 INFO storage.MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 20.6 KB, free 265.6 MB)
15/09/14 18:21:27 INFO storage.BlockManagerInfo: Added broadcast_19_piece0 in memory on localhost:43300 (size: 20.6 KB, free: 267.1 MB)
15/09/14 18:21:27 INFO storage.BlockManagerMaster: Updated info of block broadcast_19_piece0
15/09/14 18:21:27 INFO spark.SparkContext: Created broadcast 19 from textFile at <console>:21
users_text_file: org.apache.spark.rdd.RDD[String] = hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/users.txt MapPartitionsRDD[24] at textFile at <console>:21

Con esta instrucción, cojo cada linea del fichero, dividiéndola en palabras individuales, es decir, separándolas por espacios (instrucción split), cada una de esas palabras les asigno el valor 1con la instrucción map(word=>(word,1)) y finalmente sumo cada una de las ocurrencias con la instrucción reduceByKey(_+_)

scala> val counts_users = users_text_file.flatMap(line=>line.split(” “)).map(word=>(word,1)).reduceByKey(_+_)
15/09/14 18:22:42 INFO mapred.FileInputFormat: Total input paths to process : 1
counts_users: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[27] at reduceByKey at <console>:23

Finalmente guardo el fichero calculado en el hdfs, también podriamos guardarlo en una tabla de Impala o en el sistema sql pensado para interactuar con spark, a día de hoy y que yo conozca Impala y SparkSQL.

scala> counts_users.saveAsTextFile(“hdfs://192.168.30.147/tmp/counts_users.txt”)
15/09/14 18:23:52 INFO storage.BlockManager: Removing broadcast 18
15/09/14 18:23:52 INFO storage.BlockManager: Removing block broadcast_18_piece0
15/09/14 18:23:52 INFO storage.MemoryStore: Block broadcast_18_piece0 of size 45725 dropped from memory (free 278597986)
15/09/14 18:23:52 INFO storage.BlockManagerInfo: Removed broadcast_18_piece0 on localhost:43300 in memory (size: 44.7 KB, free: 267.2 MB)
15/09/14 18:23:52 INFO storage.BlockManagerMaster: Updated info of block broadcast_18_piece0
15/09/14 18:23:52 INFO storage.BlockManager: Removing block broadcast_18
15/09/14 18:23:52 INFO storage.MemoryStore: Block broadcast_18 of size 133552 dropped from memory (free 278731538)
15/09/14 18:23:52 INFO spark.ContextCleaner: Cleaned broadcast 18
15/09/14 18:23:52 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/09/14 18:23:52 INFO spark.SparkContext: Starting job: saveAsTextFile at <console>:26
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Registering RDD 26 (map at <console>:23)
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Got job 13 (saveAsTextFile at <console>:26) with 1 output partitions (allowLocal=false)
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Final stage: Stage 17(saveAsTextFile at <console>:26)
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 16)
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Missing parents: List(Stage 16)
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Submitting Stage 16 (MapPartitionsRDD[26] at map at <console>:23), which has no missing parents
15/09/14 18:23:52 INFO storage.MemoryStore: ensureFreeSpace(3784) called with curMem=1517437, maxMem=280248975
15/09/14 18:23:52 INFO storage.MemoryStore: Block broadcast_20 stored as values in memory (estimated size 3.7 KB, free 265.8 MB)
15/09/14 18:23:52 INFO storage.MemoryStore: ensureFreeSpace(2216) called with curMem=1521221, maxMem=280248975
15/09/14 18:23:52 INFO storage.MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 2.2 KB, free 265.8 MB)
15/09/14 18:23:52 INFO storage.BlockManagerInfo: Added broadcast_20_piece0 in memory on localhost:43300 (size: 2.2 KB, free: 267.1 MB)
15/09/14 18:23:52 INFO storage.BlockManagerMaster: Updated info of block broadcast_20_piece0
15/09/14 18:23:52 INFO spark.SparkContext: Created broadcast 20 from broadcast at DAGScheduler.scala:839
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 16 (MapPartitionsRDD[26] at map at <console>:23)
15/09/14 18:23:52 INFO scheduler.TaskSchedulerImpl: Adding task set 16.0 with 1 tasks
15/09/14 18:23:52 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 16.0 (TID 74, localhost, ANY, 1311 bytes)
15/09/14 18:23:52 INFO executor.Executor: Running task 0.0 in stage 16.0 (TID 74)
15/09/14 18:23:52 INFO rdd.HadoopRDD: Input split: hdfs://192.168.30.147:8020/tmp/labdata/sparkdata/users.txt:0+169
15/09/14 18:23:52 INFO executor.Executor: Finished task 0.0 in stage 16.0 (TID 74). 2003 bytes result sent to driver
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Stage 16 (map at <console>:23) finished in 0.045 s
15/09/14 18:23:52 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/09/14 18:23:52 INFO scheduler.DAGScheduler: running: Set()
15/09/14 18:23:52 INFO scheduler.DAGScheduler: waiting: Set(Stage 17)
15/09/14 18:23:52 INFO scheduler.DAGScheduler: failed: Set()
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Missing parents for Stage 17: List()
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Submitting Stage 17 (MapPartitionsRDD[28] at saveAsTextFile at <console>:26), which is now runnable
15/09/14 18:23:52 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 16.0 (TID 74) in 45 ms on localhost (1/1)
15/09/14 18:23:52 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 16.0, whose tasks have all completed, from pool
15/09/14 18:23:52 INFO storage.MemoryStore: ensureFreeSpace(133264) called with curMem=1523437, maxMem=280248975
15/09/14 18:23:52 INFO storage.MemoryStore: Block broadcast_21 stored as values in memory (estimated size 130.1 KB, free 265.7 MB)
15/09/14 18:23:52 INFO storage.MemoryStore: ensureFreeSpace(45519) called with curMem=1656701, maxMem=280248975
15/09/14 18:23:52 INFO storage.MemoryStore: Block broadcast_21_piece0 stored as bytes in memory (estimated size 44.5 KB, free 265.6 MB)
15/09/14 18:23:52 INFO storage.BlockManagerInfo: Added broadcast_21_piece0 in memory on localhost:43300 (size: 44.5 KB, free: 267.1 MB)
15/09/14 18:23:52 INFO storage.BlockManagerMaster: Updated info of block broadcast_21_piece0
15/09/14 18:23:52 INFO spark.SparkContext: Created broadcast 21 from broadcast at DAGScheduler.scala:839
15/09/14 18:23:52 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 17 (MapPartitionsRDD[28] at saveAsTextFile at <console>:26)
15/09/14 18:23:52 INFO scheduler.TaskSchedulerImpl: Adding task set 17.0 with 1 tasks
15/09/14 18:23:52 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 17.0 (TID 75, localhost, PROCESS_LOCAL, 1056 bytes)
15/09/14 18:23:52 INFO executor.Executor: Running task 0.0 in stage 17.0 (TID 75)
15/09/14 18:23:53 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
15/09/14 18:23:53 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/09/14 18:23:53 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/09/14 18:23:53 INFO output.FileOutputCommitter: Saved output of task ‘attempt_201509141823_0017_m_000000_75’ to hdfs://192.168.30.147/tmp/counts_users.txt/_temporary/0/task_201509141823_0017_m_000000
15/09/14 18:23:53 INFO spark.SparkHadoopWriter: attempt_201509141823_0017_m_000000_75: Committed
15/09/14 18:23:53 INFO executor.Executor: Finished task 0.0 in stage 17.0 (TID 75). 1828 bytes result sent to driver
15/09/14 18:23:53 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 17.0 (TID 75) in 192 ms on localhost (1/1)
15/09/14 18:23:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 17.0, whose tasks have all completed, from pool
15/09/14 18:23:53 INFO scheduler.DAGScheduler: Stage 17 (saveAsTextFile at <console>:26) finished in 0.189 s
15/09/14 18:23:53 INFO scheduler.DAGScheduler: Job 13 finished: saveAsTextFile at <c

El fichero se ha guardado en http://192.168.30.147:8888/filebrowser/view/tmp/counts_users.txt/part-00000 y tiene el siguiente contenido:

(7,odersky,Martin,1)
(Zaharia,1) (Resig,1)
(8,anonsys,1)
(2,ladygaga,Goddess,1)
(3,jeresig,John,1)
(6,matei_zaharia,Matei,1)
(Love,1)
(Bieber,1)
(4,justinbieber,Justin,1)
(of,1)
(Odersky,1)
(Obama,1)
(1,BarackObama,Barack,1)

Que significa este fichero si hemos dicho que hemos creado un fichero con pares String, Int? El String es cada entrada en el fichero después de dividir cada palabra por espacios vacios, pej, 7,odersky,Martin y sumamos cada ocurrencia de este par en el fichero.

Ahora vamos a calcular Pi, usando la estimación de lanzar dardos en un circulo.

Spark can also be used for compute-intensive tasks. This code estimates π by “throwing darts” at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.

scala> val count = sc.parallelize(1 to 100000).map{i =>
| val x = Math.random()
| val y = Math.random()
| if (x*x + y*y < 1) 1 else 0
| }.reduce(_+_)
15/09/14 18:47:20 INFO spark.SparkContext: Starting job: reduce at <console>:25
15/09/14 18:47:20 INFO scheduler.DAGScheduler: Got job 16 (reduce at <console>:25) with 1 output partitions (allowLocal=false)
15/09/14 18:47:20 INFO scheduler.DAGScheduler: Final stage: Stage 20(reduce at <console>:25)
15/09/14 18:47:20 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/09/14 18:47:20 INFO scheduler.DAGScheduler: Missing parents: List()
15/09/14 18:47:20 INFO scheduler.DAGScheduler: Submitting Stage 20 (MapPartitionsRDD[34] at map at <console>:21), which has no missing parents
15/09/14 18:47:20 INFO storage.MemoryStore: ensureFreeSpace(1888) called with curMem=1520508, maxMem=280248975
15/09/14 18:47:20 INFO storage.MemoryStore: Block broadcast_24 stored as values in memory (estimated size 1888.0 B, free 265.8 MB)
15/09/14 18:47:20 INFO storage.MemoryStore: ensureFreeSpace(1179) called with curMem=1522396, maxMem=280248975
15/09/14 18:47:20 INFO storage.MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 1179.0 B, free 265.8 MB)
15/09/14 18:47:20 INFO storage.BlockManagerInfo: Added broadcast_24_piece0 in memory on localhost:43300 (size: 1179.0 B, free: 267.1 MB)
15/09/14 18:47:20 INFO storage.BlockManagerMaster: Updated info of block broadcast_24_piece0
15/09/14 18:47:20 INFO spark.SparkContext: Created broadcast 24 from broadcast at DAGScheduler.scala:839
15/09/14 18:47:20 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 20 (MapPartitionsRDD[34] at map at <console>:21)
15/09/14 18:47:20 INFO scheduler.TaskSchedulerImpl: Adding task set 20.0 with 1 tasks
15/09/14 18:47:20 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 20.0 (TID 78, localhost, PROCESS_LOCAL, 1317 bytes)
15/09/14 18:47:20 INFO executor.Executor: Running task 0.0 in stage 20.0 (TID 78)
15/09/14 18:47:20 INFO executor.Executor: Finished task 0.0 in stage 20.0 (TID 78). 736 bytes result sent to driver
15/09/14 18:47:20 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 20.0 (TID 78) in 62 ms on localhost (1/1)
15/09/14 18:47:20 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 20.0, whose tasks have all completed, from pool
15/09/14 18:47:20 INFO scheduler.DAGScheduler: Stage 20 (reduce at <console>:25) finished in 0.061 s
15/09/14 18:47:20 INFO scheduler.DAGScheduler: Job 16 finished: reduce at <console>:25, took 0.071917 s
count: Int = 78547

scala> println(“Pi tiene el valor de ” + 4.0 * count /100000)

Pi tiene el valor de 3.14188

chulo verdad? mientras mas grande sea el valor que le des, en este caso 100000, mas precision tendrá el calculo de pi.


fuentes:

https://spark.apache.org/examples.html

https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples

http://scala-exercises.47deg.com/index.html

http://slides.com/gaijinco/scala-first-steps#/

http://slides.com/gaijinco/oop-solid-scala-1#/2

http://docs.scala-lang.org/es/tutorials/scala-for-java-programmers.html

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s