Es mejor usar repartition() en un Dataframe o partitionBy() en un RDD antes de ejecutar una operacion larga y costosa.

operaciones como join(), cogroup(), groupWith(), join(),leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookup() pueden ganar mucho si acertamos en el particionamiento.

val moviePairs = ratings.as("ratings1")
.join(ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" && $"ratings1.movieId" < $"ratings2.movieId")
.select($"ratings1.movieId".alias("movie1"),
$"ratings2.movieId".alias("movie2"),
$"ratings1.rating".alias("rating1"),
$"ratings2.rating".alias("rating2")
)
.repartition(100)
.as[MoviePairs]

Hay que jugar con ese valor (100), pues va a depender de, al menos, tantas particiones como núcleos tengas, o executors que puedan caber en la memoria disponible.

Voy a ejecutar un script Scala.

aironman@MacBook-Pro-de-Alonso TestWithFrankCrane % spark-submit --version
21/03/04 14:09:53 WARN Utils: Your hostname, MacBook-Pro-de-Alonso.local resolves to a loopback address: 127.0.0.1; using 192.168.1.37 instead (on interface en0)
21/03/04 14:09:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/Cellar/apache-spark/3.0.1/libexec/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.9
Branch HEAD
Compiled by user ubuntu on 2020-08-28T08:58:35Z
Revision 2b147c4cd50da32fe2b4167f97c8142102a0510d
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
aironman@MacBook-Pro-de-Alonso TestWithFrankCrane % java -version 
openjdk version "1.8.0_282"
OpenJDK Runtime Environment Corretto-8.282.08.1 (build 1.8.0_282-b08)
OpenJDK 64-Bit Server VM Corretto-8.282.08.1 (build 25.282-b08, mixed mode)

Sin usar repartition en local:

aironman@MacBook-Pro-de-Alonso TestWithFrankCrane % spark-submit --class com.sundogsoftware.spark.MovieSimilaritiesDataset target/sample-1.0-SNAPSHOT.jar 50
...
Loading movie names...

Top 10 similar movies for Star Wars (1977)
Empire Strikes Back, The (1980)	score: 0.9895522078385338	strength: 345
Return of the Jedi (1983)	score: 0.9857230861253026	strength: 480
Raiders of the Lost Ark (1981)	score: 0.981760098872619	strength: 380
20,000 Leagues Under the Sea (1954)	score: 0.9789385605497993	strength: 68
12 Angry Men (1957)	score: 0.9776576120448436	strength: 109
Close Shave, A (1995)	score: 0.9775948291054827	strength: 92
African Queen, The (1951)	score: 0.9764692222674887	strength: 138
Sting, The (1973)	score: 0.9751512937740359	strength: 204
Wrong Trousers, The (1993)	score: 0.9748681355460885	strength: 103
Wallace & Gromit: The Best of Aardman Animation (1996)	score: 0.9741816128302572	strength: 58
Tiempo transcurrido: 10 seconds

Usando repartition(8). En mi máquina tengo 8 núcleos físicos, 16 virtuales.

aironman@MacBook-Pro-de-Alonso TestWithFrankCrane % spark-submit --class com.sundogsoftware.spark.MovieSimilaritiesDataset target/sample-1.0-SNAPSHOT.jar 50
...
Loading movie names...

Top 10 similar movies for Star Wars (1977)
Empire Strikes Back, The (1980)	score: 0.9895522078385338	strength: 345
Return of the Jedi (1983)	score: 0.9857230861253026	strength: 480
Raiders of the Lost Ark (1981)	score: 0.981760098872619	strength: 380
20,000 Leagues Under the Sea (1954)	score: 0.9789385605497993	strength: 68
12 Angry Men (1957)	score: 0.9776576120448436	strength: 109
Close Shave, A (1995)	score: 0.9775948291054827	strength: 92
African Queen, The (1951)	score: 0.9764692222674887	strength: 138
Sting, The (1973)	score: 0.9751512937740359	strength: 204
Wrong Trousers, The (1993)	score: 0.9748681355460885	strength: 103
Wallace & Gromit: The Best of Aardman Animation (1996)	score: 0.9741816128302572	strength: 58
Tiempo transcurrido: 22 seconds

La verdad, parece un valor que el framework debería precalcular en tiempo de ejecución para aplicar antes de ejecutar el join.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s