It is better to use repartition() on a Dataframe or partitionBy() on an RDD before executing a long and expensive operation.
Operations like join(), cogroup(), groupWith(), join(),leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookup() can gain a lot if we get the partitioning right.
val moviePairs = ratings.as("ratings1")
.join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
.select($"ratings1.movieId".alias("movie1"),
$"ratings2.movieId".alias("movie2"),
$"ratings1.rating".alias("rating1"),
$"ratings2.rating".alias("rating2")
)
.repartition(100)
.as[MoviePairs]
You have to play with this value (100), because it will depend on, at least, so many partitions that can be managed by the executors, the cluster nodes. Each partition by default weighs 128MB. An executor will be able to keep in memory as many partitions as RAM memory available. This 100 implies that 100 partitions will be created with the data, so they will be distributed among the available executors.
Actually, it seems a value that the framework should precompute at runtime to apply before executing the join. In fact, it does, in version 3.0.X, the Adaptive Query execution was introduced.
spark-submit --class playground.Playground --master spark:172.19.0.2:7077 --deploy-mode cluster --total-executor-cores 1 target/scala-2.12/spark-essentials_2.12-0.1.jar