It is better to use repartition() on a Dataframe or partitionBy() on an RDD before executing a long and expensive operation.

Operations like join(), cogroup(), groupWith(), join(),leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookup() can gain a lot if we get the partitioning right.

val moviePairs = ratings.as("ratings1")
.join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
.select($"ratings1.movieId".alias("movie1"),
$"ratings2.movieId".alias("movie2"),
$"ratings1.rating".alias("rating1"),
$"ratings2.rating".alias("rating2")
)
.repartition(100)
.as[MoviePairs]

You have to play with this value (100), because it will depend on, at least, so many partitions that can be managed by the executors, the cluster nodes. Each partition by default weighs 128MB. An executor will be able to keep in memory as many partitions as RAM memory available. This 100 implies that 100 partitions will be created with the data, so they will be distributed among the available executors.

Actually, it seems a value that the framework should precompute at runtime to apply before executing the join. In fact, it does, in version 3.0.X, the Adaptive Query execution was introduced.

spark-submit --class playground.Playground --master spark:172.19.0.2:7077 --deploy-mode cluster --total-executor-cores 1 target/scala-2.12/spark-essentials_2.12-0.1.jar

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s