Another session with Apache Spark, with Frank Kane.

Install apache spark.

brew install apache-spark

aironman@MacBook-Pro-de-Alonso ~> cd /usr/local/Cellar/apache-spark/2.4.4/libexec/conf/

aironman@MacBook-Pro-de-Alonso /u/l/C/a/2/l/conf> mv log4j.properties.template log4j.properties

change INFO error level to ERROR

aironman@MacBook-Pro-de-Alonso /u/l/C/a/2/l/conf> spark-shell
# how many lines has this file?
scala> sc.textFile("testSpark.txt").count
res0: Long = 1

Download this set of files. http://files.grouplens.org/datasets/movielens/ml-100k.zip

New scala project, new scala file…

build.sbt

name := "spark-scala"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"

RatingsCounter.scala

    import org.apache.spark._
    import org.apache.spark.SparkContext._
    import org.apache.log4j._

    /** Count up how many of each star rating exists in the MovieLens 100K data set. */
    object RatingsCounter {

      /** Our main function where the action happens */
      def main(args: Array[String]) {

        // Set the log level to only print errors
        Logger.getLogger("org").setLevel(Level.ERROR)

        // Create a SparkContext using every core of the local machine, named RatingsCounter
        val sc = new SparkContext("local[*]", "RatingsCounter")

        // Load up each line of the ratings data into an RDD
        val lines = sc.textFile("../ml-100k/u.data")

        // Convert each line to a string, split it out by tabs, and extract the third field.
        // (The file format is userID, movieID, rating, timestamp)
        // UserId watch this movieId, gives it this rating at this timestamp
        // 291  118 2   874833878
        // 308  1   4   887736532
        // 95   546 2   879196566
        // ...
        val ratings = lines.map(x => x.toString().split("\t")(2))

        // Count up how many times each value (rating) occurs
        val results = ratings.countByValue()

        // Sort the resulting map of (rating, count) tuples
        val sortedResults = results.toSeq.sortBy(_._1)

        // Print each result on its own line.
        sortedResults.foreach(println)

        Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
        (1,6110)
        (2,11370)
        (3,27145)
        (4,34174)
        (5,21201)

        Process finished with exit code 0
        */
      }
    }

#id, name, age, num-friends
1,Alonso,42,15
2,Jose, 38, 150
3,Angel,42,2
4,Maria,38,1
5,Antonio, 39,0
...

def parseLine(line:String) = {
    val fields = line.split(",")
    val age = fields(2).toInt
    val num-friends = fields(3).toInt
    (age,num-friends)
}

val lines = sc.textFile("fakeFriends.csv")

val rdd = lines.map(parseLine)

42,15
38,150
42,2
38,1
39,0
    ...

val totalByAges = rdd.mapValues(x=>(x,1)).reduceByKey((x,y) => (x._1 + y._1,x._2 +y._2))

#rdd.mapValues(x=>(x,1))
(42,(15,1))
(38,(150,1))
(42,(2,1))
(38,(1,1))
(39,(0,1))

reduceByKey applies to two values. (15,1),(150,1),(2,1),(1,1),(0,1) donde x._1 es el 15 , y._1 es el 2, x._2 es 1 y y._2 es 1 en las tuplas que tienen el mismo key
en este caso, 42. (42,(15,1)) (42,(2,1))

reduceByKey((x,y) => (x._1 + y._1,x._2 +y._2)) 
(42,(17,2))
(38,(151,2))
(39,(0,1)) # => se tiene de amigo a sí mismo. 🙂


import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._

/** Compute the average number of friends by age in a social network. */
object FriendsByAge {

  /** A function that splits a line of input into (age, numFriends) tuples. */
  def parseLine(line: String) = {
      // Split by commas
      val fields = line.split(",")
      // Extract the age and numFriends fields, and convert to integers
      val age = fields(2).toInt
      val numFriends = fields(3).toInt
      // Create a tuple that is our result.
      (age, numFriends)
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "FriendsByAge")

    // Load each line of the source data into an RDD
    val lines = sc.textFile("/Users/aironman/Downloads/SparkScala/fakefriends.csv")

    // Use our parseLines function to convert to (age, numFriends) tuples
    val rdd = lines.map(parseLine)

    // Lots going on here...
    // We are starting with an RDD of form (age, numFriends) where age is the KEY and numFriends is the VALUE
    // We use mapValues to convert each numFriends value to a tuple of (numFriends, 1)
    // Then we use reduceByKey to sum up the total numFriends and total instances for each age, by
    // adding together all the numFriends values and 1's respectively.
    val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))

    /*
    scala> rdd.mapValues(x=>(x,1)).takeSample(false,100).foreach(println)
    (45,(455,1))
    (38,(380,1))
    (56,(371,1))
    (65,(208,1))
    (18,(418,1))
    (58,(348,1))
    (28,(311,1))
    (40,(220,1))
    (44,(353,1))
    (59,(284,1))
    ...
    (55,(284,1))
    (66,(383,1))
    (39,(106,1))
    (40,(254,1))
    (38,(203,1))
    (68,(293,1))
    (29,(173,1))
    (29,(128,1))
    (19,(272,1))
    (60,(246,1))
    (67,(153,1))
    (45,(340,1))
    (59,(284,1))
    (54,(72,1))
    (58,(174,1))
    (45,(147,1))
    (30,(184,1))
    (57,(229,1))
    (38,(2,1))
    (24,(492,1))
    (52,(77,1))
    (33,(294,1))
    (67,(149,1))
    (56,(313,1))
    (31,(340,1))
    (43,(428,1))
    (31,(172,1))
    (28,(34,1))
    (18,(326,1))
    (28,(304,1))
    (18,(24,1))
    ...

    ala> rdd.mapValues(x=>(x,1)).reduceByKey((x,y) => (x._1 + y._1,x._2 + y._2)).takeSample(true,10).foreach(println)
    (60,(1419,7))
    (48,(2814,10))
    (40,(4264,17))
    (58,(1282,11))
    (52,(3747,11))
    (47,(2099,9))
    (34,(1473,6))
    (56,(1840,6))
    (46,(2908,13))
    (52,(3747,11))
    ...

Esto significa lo siguiente, los usuarios que tienen 60 años, son 7 y entre todos suman 1419 amigos, por ejemplo.

    // So now we have tuples of (age, (totalFriends, totalInstances))
    // To compute the average we divide totalFriends / totalInstances for each age.
    val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)

    /*
    Por lo que, tenemos que los usuarios de 60 años, la media por edad es 1419/7
    */

    // Collect the results from the RDD (This kicks off computing the DAG and actually executes the job)
    val results = averagesByAge.collect()

    // Sort and print the final results.
    results.sorted.foreach(println)
  }

}

fakeFriends.csv

0,Will,33,385
1,Jean-Luc,26,2
2,Hugh,55,221
3,Deanna,40,465
4,Quark,68,21
5,Weyoun,59,318
6,Gowron,37,220
7,Will,54,307
...


scala> results.sorted.foreach(println)
(18,343)
(19,213)
(20,165)
(21,350)
(22,206)
(23,246)
(24,233)
(25,197)
(26,242)
(27,228)
(28,209)
(29,215)
(30,235)
(31,267)
(32,207)
(33,325)
(34,245)
(35,211)
(36,246)
(37,249)
(38,193)
(39,169)
(40,250)
(41,268)
(42,303)
(43,230)
(44,282)
(45,309)
(46,223)
(47,233)
(48,281)
(49,184)
(50,254)
(51,302)
(52,340)
(53,222)
(54,278)
(55,295)
(56,306)
(57,258)
(58,116)
(59,220)
(60,202)
(61,256)
(62,220)
(63,384)
(64,281)
(65,298)
(66,276)
(67,214)
(68,269)

Let say that i have this (python) code:

df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \
            .cache()

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
                    .select(lit('Insert').alias('_action'), *df_source_hashed) \
                    .dropDuplicates() \
                    .cache()
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \
                        .select(lit('Update').alias('_action'), *df_source_hashed) \
                        .where(col('a.hashkey') != col('b.hashkey')) \
                        .dropDuplicates() \
                        .cache()
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)

df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')

¿How can i improve performance?

  1. Avoid unnecesary counts, if you can.
  2. Do df_output = df_inserts.union(df_updates).cache()

then if you have to, do counts

  1. the repartition(1) should be replaced by coalesce(1). The former will shuffle all data, while the latter will read in the existing partitions and not shuffle them again. Repartitioning to a single partition is discouraged, unless you can guarantee the data fit into one worker’s memory.
  2. You can compute Insert and Update in one go, so that you don’t have to join with df_reference twice. df_actions = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="left") \ .withColumn('_action', when(col('b.hashkey').isNull, 'Insert').otherwise(col('a.hashkey') != col('b.hashkey'), 'Update')) \ .select(col('_action'), *df_source_hashed) \ .dropDuplicates() \ .cache() Since df_actions is cached, you can count inserts and updates quickly with only that one join in df_actions:inserts_count = df_actions.where(col('_action') === 'Insert').count() updates_count = df_actions.where(col('_action') === 'Update').count()And you can get rid of the union:df_output = df_actions.where(col('_action').isNotNull)</code></pre></li>

If you have to write that output to parquet anyway, then you can get the count quickly from the parquet file if it is partitioned by the _action column
(Spark then only looks into parquet’s metadata to get the count, it does not read any row):

        df_output.coalesce(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
        df_output = sql_context.read.parquet('/path/to/output.parquet')
        inserts_count = df_output.where(col('_action') === 'Insert').count()
        updates_count = df_output.where(col('_action') === 'Update').count()

Lets say i have a file (1800.csv) like this:

ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,
ITE00100554,18000102,TMAX,-60,,I,E,
ITE00100554,18000102,TMIN,-125,,,E,
...

def parseLine(line:String) ={
    val fields = line.split(",")
    val stationId = fields(0)
    val entryType = fields(2)
    val temperature = fields(3).toFloat * 0.1f *(9.0f / 5.0f) + 32f
    (stationId,entryType,temperature)
}

val lines = sc.textFile("/Users/aironman/gitProjects/alonsoir.github.io/SparkScala/1800.csv")

val parsedLines = lines.map(parseLine)

parsedLines.foreach(println)
...
(GM000010962,PRCP,37.76)
(EZE00100082,TMAX,64.399994)
(EZE00100082,TMIN,55.04)
(ITE00100554,TMAX,75.380005)
(ITE00100554,TMIN,65.3)
(GM000010962,PRCP,32.0)
(EZE00100082,TMAX,66.740005)
...


# parsedLines are tuples with stationId,entryType,temperature

val minTemps = parsedLines.filter(x=> x._2 == "TMIN")

minTemps.foreach(println)
...
(ITE00100554,TMIN,36.5)
(EZE00100082,TMIN,33.26)
(ITE00100554,TMIN,34.88)
(EZE00100082,TMIN,31.46)
(ITE00100554,TMIN,37.58)
(EZE00100082,TMIN,34.52)
(ITE00100554,TMIN,36.5)
(EZE00100082,TMIN,31.28)

val stationTemps = minTemps.map(x=>(x._1,x._3.toFloat))

stationTemps.foreach(printl)
...
(EZE00100082,56.66)
(ITE00100554,69.619995)
(EZE00100082,59.9)
(ITE00100554,70.34)
(EZE00100082,55.04)
(ITE00100554,65.3)

val minTempByStation = stationTemps.reduceByKey((x,y)=>min(x,y))

# here i have an error. scala> val minTempByStation = stationTemps.reduceByKey((x,y)=>min(x,y))
# <console>:25: error: overloaded method value min with alternatives:
#   (columnName: String)org.apache.spark.sql.Column <and>
#   (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
#  cannot be applied to (Float, Float)
#        val minTempByStation = stationTemps.reduceByKey((x,y)=>min(x,y))

#SOLVED, i had to import exact dependency. Personally i hate this kind of scala errors because it is not clear. It could tell me, maybe wrong dependency...
import scala.math.min
                                   ^
val results = minTempByStation.collect

for (result <- results.sorted) {
       val station = result._1
       val temp = result._2
       val formattedTemp = f"$temp%.2f F"
       println(s"$station minimum temperature: $formattedTemp") 
}

EZE00100082 minimum temperature: 16,52 F
ITE00100554 minimum temperature: 18,50 F



val maxTempByStation = stationTemps.reduceByKey((x,y)=>max(x,y))

val results = maxTempByStation.collect

for (result <- results.sorted){
    val stationId = result._1
    val temp = result._2
    val formatedTemp = f"$temp%.2f F"
    println(s"$station maximum temperature: $formattedTemp") 
}

EZE00100082 max temperature: 90,14 F
ITE00100554 max temperature: 90,14 F

#Having this file:
aironman@MacBook-Pro-de-Alonso ~> cat testSpark.txt
this is a text file

# counting lines from file...
scala> sc.textFile("/Users/aironman/testSpark.txt").map(line=>(line,1)).reduceByKey(_+_).foreach(println)
(this is a text file,1)

# typical word count sample
sc.textFile("/Users/aironman/testSpark.txt").flatMap(x=>x.split(" ")).map(line=>(line,1)).reduceByKey(_+_).foreach(println)
(a,1)
(this,1)
(is,1)
(text,1)
(file,1)


sc.textFile("/Users/aironman/testSpark.txt").flatMap(x=>x.split(" ")).map(line=>(line,1)).reduceByKey(_+_).collect.sorted.foreach(println)
(a,1)
(file,1)
(is,1)
(text,1)
(this,1

scala> sc.textFile("/Users/aironman/testSpark.txt").flatMap(x=>x.split(" ")).map(line=>(line,1)).reduceByKey(_+_).toDF.registerTempTable("TEMP")

spark is spark_session

scala> spark.table("TEMP").show
20/02/24 16:51:18 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
20/02/24 16:51:18 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
20/02/24 16:51:18 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+----+---+
|  _1| _2|
+----+---+
|this|  1|
|  is|  1|
|file|  1|
|   a|  1|
|text|  1|
+----+---+

# counting words from a book
val book = sc.textFile("/Users/aironman/Downloads/SparkScala/book.txt")
book: org.apache.spark.rdd.RDD[String] = /Users/aironman/Downloads/SparkScala/book.txt MapPartitionsRDD[118] at textFile at <console>:26

book.flatMap(word=>word.split(" ")).map(l=>(l,1)).reduceByKey(_+_).foreach(println)
(convince,2)
(raises,2)
(Let,1)
("Flexibility,1)
(salesperson.,1)
...

or

scala> book.flatMap(word=>word.split(" ")).countByValue.foreach(println)
(convince,2)
(raises,2)
(Let,1)
("Flexibility,1)
(salesperson.,1)
...

countByValue happens in every worker, reduceByKey happens in driver

https://stackoverflow.com/questions/52915466/when-to-use-countbyvalue-and-when-to-use-map-reducebykey/52915892

book.map(word=>(word,1)).sortByKey()


(If you really have found a millions-of-dollars-per-year opportunity and you can’t protect it, it’s probably a matter of time before a larger company will identify that same opportunity. In this case, you may be better off going down the riskier “growth business” path instead of a lifestyle business, with the objective of gaining traction in the marketplace before others do, and getting acquired by that larger company down the road. That’s all a big gamble, however.,1)
(If you scroll down, more detailed information is available about the product� but above the fold, things have been kept as simple and visual as possible. A large video dominates the page, which auto-plays when loaded. It’s designed to capture your attention before you click away.,1)
(If you sell products online, Google Analytics is a free, sophisticated tool that lets you track trends with your customers, and where those customers came from. Go get an account, and integrate it with your website and any other websites that sell your products on your behalf.,1)

scala> val wordCount= sc.textFile("/Users/aironman/Downloads/SparkScala/twitter.txt").flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((x,y)=>x+y).map(x=>(x._2,x._1)).sortByKey().foreach(println)
(1,384953089-Kw81gqpF1eja3ch8Y9qXqUfm6X6ReCKUiHOvORlS)
(1,consumerSecret)
(1,HSQxluLbHezug8GgdPfX8zqYPkFPdfX20i42GJCGWGPuX)
(1,6Xchc9BczeXBqNr7foRO8mok7LP2SkqTTegjtPwgPEciEmbXp7)
(1,ofclzVJAz2530GKhJOB5GXOUj)
(1,accessTokenSecret)
(1,accessToken)
(1,consumerKey)
wordCount: Unit = ()

# Sample about how to use broadcast variables.

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.io.Source
import java.nio.charset.CodingErrorAction
import scala.io.Codec

/** Find the movies with the most ratings. */
object PopularMoviesNicer {

  /** Load up a Map of movie IDs to movie names. */
  def loadMovieNames() : Map[Int, String] = {

    // Handle character encoding issues:
    implicit val codec = Codec("UTF-8")
    codec.onMalformedInput(CodingErrorAction.REPLACE)
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

    // Create a Map of Ints to Strings, and populate it from u.item.
    var movieNames:Map[Int, String] = Map()

     val lines = Source.fromFile("../ml-100k/u.item").getLines()
     for (line <- lines) {
       var fields = line.split('|')
       if (fields.length > 1) {
        movieNames += (fields(0).toInt -> fields(1))
       }
     }

     return movieNames
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

     // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "PopularMoviesNicer")  

    // Create a broadcast variable of our ID -> movie name map
    var nameDict = sc.broadcast(loadMovieNames)

    // Read in each rating line
    val lines = sc.textFile("../ml-100k/u.data")

    // Map to (movieID, 1) tuples
    val movies = lines.map(x => (x.split("\t")(1).toInt, 1))

    // Count up all the 1's for each movie
    val movieCounts = movies.reduceByKey( (x, y) => x + y )

    // Flip (movieID, count) to (count, movieID)
    val flipped = movieCounts.map( x => (x._2, x._1) )

    // Sort
    val sortedMovies = flipped.sortByKey()

    // Fold in the movie names from the broadcast variable
    val sortedMoviesWithNames = sortedMovies.map( x  => (nameDict.value(x._2), x._1) )

    // Collect and print results
    val results = sortedMoviesWithNames.collect()

    results.foreach(println)
  }

}


import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

    val simpleData = Seq(("James","Sales",3000),
      ("Michael","Sales",4600),
      ("Robert","Sales",4100),
      ("Maria","Finance",3000),
      ("Raman","Finance",3000),
      ("Scott","Finance",3300),
      ("Jen","Finance",3900),
      ("Jeff","Marketing",3000),
      ("Kumar","Marketing",2000)
    )
val df = simpleData.toDF("employee_name","department","salary")
df.show()

//Get the first row from a group.
val w2 = Window.partitionBy("department").orderBy(col("salary"))
df.withColumn("row",row_number.over(w2)).where($"row" === 1).drop("row").show()

//Retrieve Highest salary
val w3 = Window.partitionBy("department").orderBy(col("salary").desc)
df.withColumn("row",row_number.over(w3)).where($"row" === 1).drop("row").show()

//Maximum, Minimum, Average, total salary for each window group
val w4 = Window.partitionBy("department")
val aggDF = df.withColumn("row",row_number.over(w3)).withColumn("avg", avg(col("salary")).over(w4)).withColumn("sum", sum(col("salary")).over(w4)).withColumn("min", min(col("salary")).over(w4)).withColumn("max", max(col("salary")).over(w4)).where(col("row")===1).select("department","avg","sum","min","max").show()

//Retrieve Lowest salary
val w5 = Window.partitionBy("department").orderBy(col("salary").asc)
df.withColumn("row",row_number.over(w5)).where($"row" === 1).drop("row").show()

# Using broadcast...

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.io.Source
import java.nio.charset.CodingErrorAction
import scala.io.Codec
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}


/** Find the movies with the most ratings. */
object PopularMoviesNicer {

  /** Load up a Map of movie IDs to movie names. */
  def loadMovieNames() : Map[Int, String] = {

    // Handle character encoding issues:
    implicit val codec = Codec("UTF-8")
    codec.onMalformedInput(CodingErrorAction.REPLACE)
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

    // Create a Map of Ints to Strings, and populate it from u.item.
    var movieNames:Map[Int, String] = Map()

     val lines = Source.fromFile("../ml-100k/u.item").getLines()
     for (line <- lines) {
       var fields = line.split('|')
       if (fields.length > 1) {
        movieNames += (fields(0).toInt -> fields(1))
       }
     }

     return movieNames
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

     // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "PopularMoviesNicer")  

    // Create a broadcast variable of our ID -> movie name map
    var nameDict = sc.broadcast(loadMovieNames)

    // Read in each rating line
    val lines = sc.textFile("../ml-100k/u.data")

    // Map to (movieID, 1) tuples
    val movies = lines.map(x => (x.split("\t")(1).toInt, 1))

    // Count up all the 1's for each movie
    val movieCounts = movies.reduceByKey( (x, y) => x + y )

    // Flip (movieID, count) to (count, movieID)
    val flipped = movieCounts.map( x => (x._2, x._1) )

    // Sort
    val sortedMovies = flipped.sortByKey()

    // Fold in the movie names from the broadcast variable
    val sortedMoviesWithNames = sortedMovies.map( x  => (nameDict.value(x._2), x._1) )

    // scala> sortedMoviesWithNames
    // res13: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[54] at map at <console>:46
    // i want to save RDD to FS, first thing is to create a Dataframe from it, then i can save it in parquet format.
    val sortedMoviesWithNamesDF = spark.createDataFrame(sortedMoviesWithNames)

    val sortedMoviesWithNamesDFWithSchema = spark.createDataFrame(sortedMoviesWithNames).toDF("name", "count")

    // i can work with one or another, i select first one...
    sortedMoviesWithNamesDF.createTempView("MOVIES")

    spark.sql("SELECT * FROM MOVIES").show(false)

    sortedMoviesWithNamesDF.write.parquet("movies.parquet")


    // Collect and print results
    val results = sortedMoviesWithNames.collect()

    results.foreach(println)
  }

}

links

https://github.com/PacktPublishing?utf8=✓&q=&type=&language=java

https://learning.oreilly.com/live-training/courses/stream-processing-with-apache-spark/0636920360490/

https://training.databricks.com/visualapi.pdf => pure gold!

https://spark.apache.org/docs/latest/sql-programming-guide.html

https://spark.apache.org/docs/latest/rdd-programming-guide.html

https://es.wikipedia.org/wiki/Evaluación_perezosa

https://sparkbyexamples.com/spark/spark-read-write-dataframe-parquet-example/

https://learning.oreilly.com/videos/apache-spark-with/9781787129849/9781787129849-video4_3?autoplay=false

https://www.arcgis.com/apps/opsdashboard/index.html#/bda7594740fd40299423467b48e9ecf6

https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/

https://dzone.com/articles/5-amazing-examples-of-artificial-intelligence-in-a

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s