Es mejor usar repartition() en un Dataframe o partitionBy() en un RDD antes de ejecutar una operacion larga y costosa.

operaciones como join(), cogroup(), groupWith(), join(),leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookup() pueden ganar mucho si acertamos en el particionamiento.

val moviePairs = ratings.as("ratings1")
      .join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
      .select($"ratings1.movieId".alias("movie1"),
        $"ratings2.movieId".alias("movie2"),
        $"ratings1.rating".alias("rating1"),
        $"ratings2.rating".alias("rating2")
      )
      .repartition(100)
      .as[MoviePairs]


Hay que jugar con ese valor (100), pues va a depender de, al menos, tantas particiones que puedan ser manejadas por los executors, los nodos del cluster. Cada particion por defecto pesa 128MB. Un executor podrá mantener en memoria tantas particiones como memoria RAM disponible tengan. Ese 100 implica que se van a crear 100 particiones con los datos, por lo que, se distribuirán entre los executors disponibles

La verdad, parece un valor que el framework debería precalcular en tiempo de ejecución para aplicar antes de ejecutar el join. 

De hecho, lo hace, en la versión 3.0.X, se introdujo el Adaptive Query execution.

spark-submit --class playground.Playground --master spark:172.19.0.2:7077 --deploy-mode cluster --total-executor-cores 1  target/scala-2.12/spark-essentials_2.12-0.1.jar

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s