Let say that i have this (python) code:
df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')
df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \
.cache()
df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
.select(lit('Insert').alias('_action'), *df_source_hashed) \
.dropDuplicates() \
.cache()
inserts_count = df_inserts.count()
df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \
.select(lit('Update').alias('_action'), *df_source_hashed) \
.where(col('a.hashkey') != col('b.hashkey')) \
.dropDuplicates() \
.cache()
updates_count = df_updates.count()
df_output = df_inserts.union(df_updates)
df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
¿How can i improve performance?
- Avoid unnecessary counts, if you can.
- Cache everything you can afford:
df_output = df_inserts.union(df_updates).cache()
then if you have to, do the count actions.
- the repartition(1) should be replaced by coalesce(1). The former will shuffle all data, while the latter will read in the existing partitions and not shuffle them again.
- Repartitioning to a single partition is discouraged, unless you can guarantee the data fit into one worker’s memory.
- You can compute Insert and Update in one go, so that you don’t have to join with df_reference twice.
df_actions = df_source_hashed.alias('a') \
.join(df_reference.alias('b'), pk_list, how="left") \
.withColumn('_action', when(col('b.hashkey').isNull, 'Insert') \
.otherwise(col('a.hashkey') != col('b.hashkey'), 'Update')) \
.select(col('_action'), *df_source_hashed).dropDuplicates() \
.cache()
- Since df_actions is cached, you can count inserts and updates quickly with only that one join in df_actions:
inserts_count = df_actions.where(col('_action') === 'Insert').count() updates_count = df_actions.where(col('_action') === 'Update').count()
And you can get rid of the union:df_output = df_actions.where(col('_action').isNotNull)
- If you have to write that output to parquet anyway, then you can get the count quickly from the parquet file if it is partitioned by the _action column
(Spark then only looks into parquet’s metadata to get the count, it does not read any row):df_output.coalesce(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
df_output = sql_context.read.parquet('/path/to/output.parquet')
inserts_count = df_output.where(col('_action') === 'Insert').count()
updates_count = df_output.where(col('_action') === 'Update').count()
That’s it for now, more on Spark and AWS in the future. offtopic, I’m starting to get tired of this WordPress editing tool.
Update
I will add more content here.