Es mejor usar repartition() en un Dataframe o partitionBy() en un RDD antes de ejecutar una operacion larga y costosa.
operaciones como join(), cogroup(), groupWith(), join(),leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookup() pueden ganar mucho si acertamos en el particionamiento.
val moviePairs = ratings.as("ratings1")
.join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
.select($"ratings1.movieId".alias("movie1"),
$"ratings2.movieId".alias("movie2"),
$"ratings1.rating".alias("rating1"),
$"ratings2.rating".alias("rating2")
)
.repartition(100)
.as[MoviePairs]
Hay que jugar con ese valor (100), pues va a depender de, al menos, tantas particiones que puedan ser manejadas por los executors, los nodos del cluster. Cada particion por defecto pesa 128MB. Un executor podrá mantener en memoria tantas particiones como memoria RAM disponible tengan. Ese 100 implica que se van a crear 100 particiones con los datos, por lo que, se distribuirán entre los executors disponibles
La verdad, parece un valor que el framework debería precalcular en tiempo de ejecución para aplicar antes de ejecutar el join.
De hecho, lo hace, en la versión 3.0.X, se introdujo el Adaptive Query execution.
spark-submit --class playground.Playground --master spark:172.19.0.2:7077 --deploy-mode cluster --total-executor-cores 1 target/scala-2.12/spark-essentials_2.12-0.1.jar
Like this:
Like Loading...
Related