Let say that i have this (python) code:

df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
                    .select(lit('Insert').alias('_action'), *df_source_hashed) \
                    .dropDuplicates() \
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \
                        .select(lit('Update').alias('_action'), *df_source_hashed) \
                        .where(col('a.hashkey') != col('b.hashkey')) \
                        .dropDuplicates() \
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)


¿How can i improve performance?

  1. Avoid unnecessary counts, if you can.
  2. Cache everything you can afford: df_output = df_inserts.union(df_updates).cache()

then if you have to, do the count actions.

  1. the repartition(1) should be replaced by coalesce(1). The former will shuffle all data, while the latter will read in the existing partitions and not shuffle them again.
  2. Repartitioning to a single partition is discouraged, unless you can guarantee the data fit into one worker’s memory.
  3. You can compute Insert and Update in one go, so that you don’t have to join with df_reference twice.

df_actions = df_source_hashed.alias('a') \
.join(df_reference.alias('b'), pk_list, how="left") \
.withColumn('_action', when(col('b.hashkey').isNull, 'Insert') \
.otherwise(col('a.hashkey') != col('b.hashkey'), 'Update')) \
.select(col('_action'), *df_source_hashed).dropDuplicates() \

  1. Since df_actions is cached, you can count inserts and updates quickly with only that one join in df_actions: inserts_count = df_actions.where(col('_action') === 'Insert').count() updates_count = df_actions.where(col('_action') === 'Update').count() And you can get rid of the union: df_output = df_actions.where(col('_action').isNotNull)
  2. If you have to write that output to parquet anyway, then you can get the count quickly from the parquet file if it is partitioned by the _action column
    (Spark then only looks into parquet’s metadata to get the count, it does not read any row): df_output.coalesce(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')

df_output = sql_context.read.parquet('/path/to/output.parquet')
inserts_count = df_output.where(col('_action') === 'Insert').count()

updates_count = df_output.where(col('_action') === 'Update').count()

That’s it for now, more on Spark and AWS in the future. offtopic, I’m starting to get tired of this WordPress editing tool.


I will add more content here.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s