Es mejor usar repartition() en un Dataframe o partitionBy() en un RDD antes de ejecutar una operacion larga y costosa.
operaciones como join(), cogroup(), groupWith(), join(),leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookup() pueden ganar mucho si acertamos en el particionamiento.
val moviePairs = ratings.as("ratings1")
.join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
.select($"ratings1.movieId".alias("movie1"),
$"ratings2.movieId".alias("movie2"),
$"ratings1.rating".alias("rating1"),
$"ratings2.rating".alias("rating2")
)
.repartition(100)
.as[MoviePairs]
Hay que jugar con ese valor (100), pues va a depender de, al menos, tantas particiones como núcleos tengas, o executors que puedan caber en la memoria disponible.
Voy a ejecutar un script Scala.
aironman@MacBook-Pro-de-Alonso TestWithFrankCrane % spark-submit --version
21/03/04 14:09:53 WARN Utils: Your hostname, MacBook-Pro-de-Alonso.local resolves to a loopback address: 127.0.0.1; using 192.168.1.37 instead (on interface en0)
21/03/04 14:09:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/Cellar/apache-spark/3.0.1/libexec/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.1
/_/
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.9
Branch HEAD
Compiled by user ubuntu on 2020-08-28T08:58:35Z
Revision 2b147c4cd50da32fe2b4167f97c8142102a0510d
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
aironman@MacBook-Pro-de-Alonso TestWithFrankCrane % java -version
openjdk version "1.8.0_282"
OpenJDK Runtime Environment Corretto-8.282.08.1 (build 1.8.0_282-b08)
OpenJDK 64-Bit Server VM Corretto-8.282.08.1 (build 25.282-b08, mixed mode)
Sin usar repartition en local:
aironman@MacBook-Pro-de-Alonso TestWithFrankCrane % spark-submit --class com.sundogsoftware.spark.MovieSimilaritiesDataset target/sample-1.0-SNAPSHOT.jar 50
...
Loading movie names...
Top 10 similar movies for Star Wars (1977)
Empire Strikes Back, The (1980) score: 0.9895522078385338 strength: 345
Return of the Jedi (1983) score: 0.9857230861253026 strength: 480
Raiders of the Lost Ark (1981) score: 0.981760098872619 strength: 380
20,000 Leagues Under the Sea (1954) score: 0.9789385605497993 strength: 68
12 Angry Men (1957) score: 0.9776576120448436 strength: 109
Close Shave, A (1995) score: 0.9775948291054827 strength: 92
African Queen, The (1951) score: 0.9764692222674887 strength: 138
Sting, The (1973) score: 0.9751512937740359 strength: 204
Wrong Trousers, The (1993) score: 0.9748681355460885 strength: 103
Wallace & Gromit: The Best of Aardman Animation (1996) score: 0.9741816128302572 strength: 58
Tiempo transcurrido: 10 seconds
Usando repartition(8). En mi máquina tengo 8 núcleos físicos, 16 virtuales.
aironman@MacBook-Pro-de-Alonso TestWithFrankCrane % spark-submit --class com.sundogsoftware.spark.MovieSimilaritiesDataset target/sample-1.0-SNAPSHOT.jar 50
...
Loading movie names...
Top 10 similar movies for Star Wars (1977)
Empire Strikes Back, The (1980) score: 0.9895522078385338 strength: 345
Return of the Jedi (1983) score: 0.9857230861253026 strength: 480
Raiders of the Lost Ark (1981) score: 0.981760098872619 strength: 380
20,000 Leagues Under the Sea (1954) score: 0.9789385605497993 strength: 68
12 Angry Men (1957) score: 0.9776576120448436 strength: 109
Close Shave, A (1995) score: 0.9775948291054827 strength: 92
African Queen, The (1951) score: 0.9764692222674887 strength: 138
Sting, The (1973) score: 0.9751512937740359 strength: 204
Wrong Trousers, The (1993) score: 0.9748681355460885 strength: 103
Wallace & Gromit: The Best of Aardman Animation (1996) score: 0.9741816128302572 strength: 58
Tiempo transcurrido: 22 seconds
La verdad, parece un valor que el framework debería precalcular en tiempo de ejecución para aplicar antes de ejecutar el join.