Recently i read a fantastic message in Twitter from Aurimas Griciūnas, @Aurimas_Gr

What do you need to know about 𝗦𝗽𝗮𝗿𝗸 𝗣𝗮𝗿𝗮𝗹𝗹𝗲𝗹𝗶𝘀𝗺?

While optimizing Spark Applications you will usually tweak two elements – performance and resource utilization.

Understanding parallelism in Spark and tuning it according to the situation will help you in both.

𝗦𝗼𝗺𝗲 𝗙𝗮𝗰𝘁𝘀:

➡️ Spark Executor can have multiple CPU Cores assigned to it.
➡️ Number of CPU Cores per Spark executor is defined by 𝘀𝗽𝗮𝗿𝗸.𝗲𝘅𝗲𝗰𝘂𝘁𝗼𝗿.𝗰𝗼𝗿𝗲𝘀 configuration.
➡️ Single CPU Core can read one file or partition of a splittable file at a single point in time.
➡️ Once read a file is transformed into one or multiple partitions in memory.

𝗢𝗽𝘁𝗶𝗺𝗶𝘇𝗶𝗻𝗴 𝗥𝗲𝗮𝗱 𝗣𝗮𝗿𝗮𝗹𝗹𝗲𝗹𝗶𝘀𝗺:

❗️ If number of cores is equal to the number of files, files are not splittable and some of them are larger in size – larger files become a bottleneck, Cores responsible for reading smaller files will idle for some time.
❗️ If there are more Cores than the number of files – Cores that do not have files assigned to them will Idle. If we do not perform repartition after reading the files – the cores will remain Idle during processing stages.

✅ Rule of thumb: set number of Cores to be two times less than files being read. Adjust according to your situation.

𝗢𝗽𝘁𝗶𝗺𝗶𝘇𝗶𝗻𝗴 𝗣𝗿𝗼𝗰𝗲𝘀𝘀𝗶𝗻𝗴 𝗣𝗮𝗿𝗮𝗹𝗹𝗲𝗹𝗶𝘀𝗺:

➡️ Use 𝘀𝗽𝗮𝗿𝗸.𝗱𝗲𝗳𝗮𝘂𝗹𝘁.𝗽𝗮𝗿𝗮𝗹𝗹𝗲𝗹𝗶𝘀𝗺 and 𝘀𝗽𝗮𝗿𝗸.𝘀𝗾𝗹.𝘀𝗵𝘂𝗳𝗳𝗹𝗲.𝗽𝗮𝗿𝘁𝗶𝘁𝗶𝗼𝗻𝘀 configurations to set the number of partitions created after performing wide transformations.
➡️ After reading the files there will be as many partitions as there were files or partitions in splittable files.

❗️ After data is loaded as partitions into memory – Spark jobs will suffer from the same set of parallelism inefficiencies like when reading the data.

✅ Rule of thumb: set 𝘀𝗽𝗮𝗿𝗸.𝗱𝗲𝗳𝗮𝘂𝗹𝘁.𝗽𝗮𝗿𝗮𝗹𝗹𝗲𝗹𝗶𝘀𝗺 equal to 𝘀𝗽𝗮𝗿𝗸.𝗲𝘅𝗲𝗰𝘂𝘁𝗼𝗿.𝗰𝗼𝗿𝗲𝘀 times the number of executors times a small number from 2 to 8, tune to specific Spark job.

𝗔𝗱𝗱𝗶𝘁𝗶𝗼𝗻𝗮𝗹 𝗡𝗼𝘁𝗲𝘀:

👉 You can use 𝘀𝗽𝗮𝗿𝗸.𝘀𝗾𝗹.𝗳𝗶𝗹𝗲𝘀.𝗺𝗮𝘅𝗣𝗮𝗿𝘁𝗶𝘁𝗶𝗼𝗻𝗕𝘆𝘁𝗲𝘀 configuration to set maximum size of the partition when reading files. Files that are larger will be split into multiple partitions accordingly.
👉 It has been shown that write throughput starts to bottleneck once there are more than 5 CPU Cores assigned per Executor so keep 𝘀𝗽𝗮𝗿𝗸.𝗲𝘅𝗲𝗰𝘂𝘁𝗼𝗿.𝗰𝗼𝗿𝗲𝘀 at or below 5.

In general, the above tips seem to be useful for optimizing performance and parallelism on Apache Spark. Next, I offer my opinion on each of the points mentioned:

Configuration of CPU Cores: It is important to assign multiple CPU Cores to Spark executors to take advantage of parallelism. This configuration is done through the spark.executor.cores property. Making sure to allocate an adequate number of cores can improve the performance of data read and transformation operations.

How can I empirically know the appropriate number of cores to improve the performance of I/O operations and data transformation?

Empirically determining the appropriate number of cores to improve I/O and data transformation performance in Apache Spark involves testing and experimenting with different configurations. Here are some steps you can take to find the optimal number of cores:

  1. Start with an initial configuration: Set an initial number of cores for executors, a commonly used initial configuration is to assign 1 core per executor, and run your Spark applications with that configuration. It records the execution time and the performance obtained.
  2. Gradually increase the number of cores: Increase the number of cores assigned to executors and rerun your applications. Record execution times and performance at each iteration.
  3. Observe the impact on performance: Analyze the results obtained in each iteration. See if there are significant improvements in the execution time and performance of your I/O operations and data transformations. Note that in some cases increasing the number of cores may not result in linear improvements and there could be a saturation point.
  4. Find the sweet spot: Based on the results obtained, identify the number of cores that provide the best performance for your specific applications and operations. Consider other factors as well, such as the size and nature of the data, as well as the resources available in your Spark environment.
  5. Conduct extensive tests: To confirm your findings, it is recommended to conduct extensive tests with different workloads and data sets. This will allow you to validate the optimal core configuration and verify that it remains efficient in a variety of situations.

Remember that the appropriate number of cores can vary depending on the type of operation, the size and complexity of the data, as well as the resource capacity of your Spark cluster. Therefore, it is essential to customize and adjust the settings according to your specific needs and to carry out tests to obtain the best results.

Remember that there is no single solution to determine the optimal number of cores per runner, as it will depend on the specifics of your application and environment. The combination of hardware insight, operations assessment, and performance testing will help you make more informed decisions and optimize the performance of your Spark application.

File sizes and partitions: If the input files are not divisible (not splittable) and some of them are larger than others, there may be processing bottlenecks. In this case, some cores could be idle while smaller files are processed. A good practice is to allocate a number of cores twice less than the number of files to read, adjusting it according to the specific situation.

To assign the number of cores in Apache Spark, you can use the spark.executor.cores parameter. This parameter defines the number of CPU cores assigned to each executor in your Spark application.

You can set this parameter in the spark-defaults.conf configuration file or pass it as an option when running your Spark application. Here is an example of how to configure the number of cores:

spark-submit –master –executor-cores

Where is the number of cores you want to allocate per executor. Adjust this value based on the recommendation to allocate a number of cores twice less than the number of files to read. Keep in mind that you can also take other factors into account, such as the size of the files and the complexity of the operations to be performed.

Remember that the total number of cores used by your Spark application will be determined by the number of executors you are using and the setting of spark.executor.instances. If you have multiple executors, the total number of cores will be the multiplication of spark.executor.cores by spark.executor.instances.

It is important to experiment and tune these parameters based on the specific characteristics and requirements of your application and environment to achieve optimal performance.

that is, at a minimum, –executor-cores will have the value 1, so the statement «twice less than the number of files to read» means the number of files to read in parallel, so that a core per executor will be reading a file in each executor

The statement «twice less than the number of files to read» means that you can allocate one core per executor to read one file in parallel. This means that if you have, say, 10 files that you want to read in parallel, you could set –executor-cores to 1, which would allocate one core per executor and allow each executor to read one file.

In this case, if you have 10 files and you set –executor-cores to 1, you would need to have at least 10 executors in your Spark cluster to read all 10 files simultaneously. Note that this is just an example, and the number of executors and cores you need may vary based on file sizes, complexity of operations, and other factors in your application.

The main idea is to take advantage of parallelism in reading files to maximize performance. Assigning one core per executor to read a file in parallel can help avoid bottlenecks and efficiently use available resources in your Spark cluster.

Configuration of partitions after wide transformations: Transformations such as groupBy and join, can generate a large number of partitions. Setting spark.default.parallelism and spark.sql.shuffle.partitions allows you to control the number of partitions created after performing these transformations. Ensuring that you have an adequate number of partitions can improve performance in these operations.

In Apache Spark, you can control the size and number of partitions after performing wide transformations, such as groupBy and join, by setting the spark.default.parallelism and spark.sql.shuffle.partitions parameters.

spark.default.parallelism: This parameter determines the default number of partitions when not explicitly specified. It is used in operations such as groupBy and parallelize.

The default value is the total number of cores in your cluster. You can adjust this value as needed to control parallelism in transformations.

spark.sql.shuffle.partitions: This parameter is specifically used to control the number of partitions in operations that involve shuffle, such as join and groupBy. The shuffle operation redistributes data across partitions and can generate a large number of temporary partitions. The default value of spark.sql.shuffle.partitions is 200. You can tune this based on the size of your cluster and the nature of your operations to optimize performance.

For example, if you have a cluster with 8 cores and want more control over the number of partitions after a transform, you can set spark.default.parallelism to 16 and spark.sql.shuffle.partitions to a suitable value, such as 100 .

In addition to these parameters, you can also use Spark-specific functions to adjust the number of partitions at a particular stage. For example, you can use repartition or coalesce on a DataFrame to change the number of partitions before performing an operation involving shuffle.

It is important to do empirical testing and tuning to find the right size and number of partitions for your application. Observe the behavior of your application, analyze the use of resources and perform performance measurements to optimize these parameters according to your specific needs.

Maximum partition size when reading files: The spark.sql.files.maxPartitionBytes property allows you to control the maximum size of partitions when reading files. This is useful when you have large files that need to be divided into multiple partitions. Properly adjusting this value can help balance partition sizes and improve performance on read operations.
We can do the parallelism of seeing a partition as a folder that has files, only here we’re talking about multiple folders on multiple machines, okay?

Maximum number of cores per executor: It has been observed that allocating more than 5 CPU Cores per executor can cause a bottleneck in write performance. Therefore, it is recommended to keep spark.executor.cores equal to or below 5 to avoid this issue.

Overall, these tips provide a solid foundation for optimizing performance and parallelism in Apache Spark. However, it is important to note that each application and data set is unique, so additional testing and tuning is recommended based on the specific characteristics and requirements of each case.

Deja un comentario