It has been a while since I last published something, so I am going to refresh how to work with spark. These are some notes that I think I will need some day. I start with the basic operation with the Dataframes. I am going to run a spark-shell, actual version is 2.4.4.


// It can probably be optimized.
import org.apache.spark.sql.Row
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.types.MapType
import org.apache.spark.sql.types.ArrayType
import scala.collection.mutable
import org.apache.spark.sql.functions.{col, explode, lit, map, map_concat, map_from_entries, map_keys, map_values}
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
import org.apache.spark.sql.{Column, Row, SparkSession}

val structureData = Seq(
  Row("36636","Finance",Row(3000,"USA")),
  Row("40288","Finance",Row(5000,"IND")),
  Row("42114","Sales",Row(3900,"USA")),
  Row("39192","Marketing",Row(2500,"CAN")),
  Row("34534","Sales",Row(6500,"USA"))
)

# curious, but i have to flat this sentence instead of doing like structuredData
# creating schema, there is a better way if i know how the json file is constructed, but it is ok 
# https://stackoverflow.com/questions/40957585/create-spark-dataframe-schema-from-json-schema-representation
val structureSchema = new StructType().add("id",StringType).add("dept",StringType).add("properties",new StructType().add("salary",IntegerType).add("location",StringType))

var df = spark.createDataFrame(spark.sparkContext.parallelize(structureData),structureSchema)

df.printSchema()
root
 |-- id: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- location: string (nullable = true)

df.show(false)
+-----+---------+-----------+
|id   |dept     |properties |
+-----+---------+-----------+
|36636|Finance  |[3000, USA]|
|40288|Finance  |[5000, IND]|
|42114|Sales    |[3900, USA]|
|39192|Marketing|[2500, CAN]|
|34534|Sales    |[6500, USA]|
+-----+---------+-----------+

val index = df.schema.fieldIndex("properties")
index: Int = 2

val propSchema = df.schema(index).dataType.asInstanceOf[StructType]
propSchema: org.apache.spark.sql.types.StructType = StructType(StructField(salary,IntegerType,true), StructField(location,StringType,true))

var columns = mutable.LinkedHashSet[Column]()
columns: scala.collection.mutable.LinkedHashSet[org.apache.spark.sql.Column] = Set()

propSchema.fields.foreach(field =>{
  columns.add(lit(field.name))
  columns.add(col("properties." + field.name))
})

df = df.withColumn("propertiesMap",map(columns.toSeq:_*))

df.printSchema

df.show(false)

df = df.drop("properties")

df.printSchema
root
 |-- id: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- propertiesMap: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

df.show(false)
+-----+---------+---------------------------------+
|id   |dept     |propertiesMap                    |
+-----+---------+---------------------------------+
|36636|Finance  |[salary -> 3000, location -> USA]|
|40288|Finance  |[salary -> 5000, location -> IND]|
|42114|Sales    |[salary -> 3900, location -> USA]|
|39192|Marketing|[salary -> 2500, location -> CAN]|
|34534|Sales    |[salary -> 6500, location -> USA]|
+-----+---------+---------------------------------+

# not very useful...
df.select(col("id"),map_keys(col("propertiesMap"))).show(false)
+-----+-----------------------+
|id   |map_keys(propertiesMap)|
+-----+-----------------------+
|36636|[salary, location]     |
|40288|[salary, location]     |
|42114|[salary, location]     |
|39192|[salary, location]     |
|34534|[salary, location]     |
+-----+-----------------------+

# this is useful...
df.select(col("id"),map_values(col("propertiesMap"))).show(false)

val arrayStructureData = Seq(
    Row("James",List(Row("Newark","NY"),Row("Brooklyn","NY")),Map("hair"->"black","eye"->"brown"), Map("height"->"5.9")),
  Row("Michael",List(Row("SanJose","CA"),Row("Sandiago","CA")),Map("hair"->"brown","eye"->"black"),Map("height"->"6")),
  Row("Robert",List(Row("LasVegas","NV")),Map("hair"->"red","eye"->"gray"),Map("height"->"6.3")),
  Row("Maria",null,Map("hair"->"blond","eye"->"red"),Map("height"->"5.6")),
  Row("Jen",List(Row("LAX","CA"),Row("Orange","CA")),Map("white"->"black","eye"->"black"),Map("height"->"5.2"))
  )


val arrayStructureSchema = new StructType().add("name",StringType).add("addresses", ArrayType(new StructType().add("city",StringType).add("state",StringType))).add("properties", MapType(StringType,StringType)).add("secondProp", MapType(StringType,StringType))

val concatDF = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)

concatDF.withColumn("mapConcat",map_concat(col("properties"),col("secondProp"))).select("name","mapConcat").show(false)

+-------+---------------------------------------------+
|name   |mapConcat                                    |
+-------+---------------------------------------------+
|James  |[hair -> black, eye -> brown, height -> 5.9] |
|Michael|[hair -> brown, eye -> black, height -> 6]   |
|Robert |[hair -> red, eye -> gray, height -> 6.3]    |
|Maria  |[hair -> blond, eye -> red, height -> 5.6]   |
|Jen    |[white -> black, eye -> black, height -> 5.2]|
+-------+---------------------------------------------+

scala> concatDF.withColumn("mapProperties",map_concat(col("properties"),col("secondProp"))).select("name","addresses","mapProperties").show(false)

concatDF.withColumn("mapFromEntries",map_from_entries(col("addresses"))).select("name","mapFromEntries").show(false)

# doing this is the same than above...
concatDF.select("name","addresses").show(false)


# Complete Spark SQL map function example

import org.apache.spark.sql.functions.{col, explode, lit, map, map_concat, map_from_entries, map_keys, map_values}
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
import org.apache.spark.sql.{Column, Row, SparkSession}

import scala.collection.mutable

object MapFunctions extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkSQL_Map_Complete")
    .getOrCreate()
  import spark.implicits._

  val structureData = Seq(
    Row("36636","Finance",Row(3000,"USA")),
    Row("40288","Finance",Row(5000,"IND")),
    Row("42114","Sales",Row(3900,"USA")),
    Row("39192","Marketing",Row(2500,"CAN")),
    Row("34534","Sales",Row(6500,"USA"))
  )

  val structureSchema = new StructType().add("id",StringType).add("dept",StringType).add("properties",new StructType().add("salary",IntegerType).add("location",StringType))

  var df = spark.createDataFrame(spark.sparkContext.parallelize(structureData),structureSchema)
  df.printSchema()
  df.show(false)

  // Convert to Map
  val index = df.schema.fieldIndex("properties")
  val propSchema = df.schema(index).dataType.asInstanceOf[StructType]
  var columns = mutable.LinkedHashSet[Column]()
  propSchema.fields.foreach(field =>{
    columns.add(lit(field.name))
    columns.add(col("properties." + field.name))
  })

  df = df.withColumn("propertiesMap",map(columns.toSeq:_*))
  df.printSchema()
  df = df.drop("properties")
  df.printSchema()
  df.show(false)

  //Retrieve all keys from a Map
  val keys = df.select(explode(map_keys($"propertiesMap"))).as[String].distinct.collect
  print(keys.mkString(","))

  // map_keys
  df.select(col("id"),map_keys(col("propertiesMap"))).show(false)

  //map_values
  df.select(col("id"),map_values(col("propertiesMap"))).show(false)

  //Creating DF with MapType
  val arrayStructureData = Seq(
    Row("James",List(Row("Newark","NY"),Row("Brooklyn","NY")),Map("hair"->"black","eye"->"brown"), Map("height"->"5.9")),
  Row("Michael",List(Row("SanJose","CA"),Row("Sandiago","CA")),Map("hair"->"brown","eye"->"black"),Map("height"->"6")),
  Row("Robert",List(Row("LasVegas","NV")),Map("hair"->"red","eye"->"gray"),Map("height"->"6.3")),
  Row("Maria",null,Map("hair"->"blond","eye"->"red"),Map("height"->"5.6")),
  Row("Jen",List(Row("LAX","CA"),Row("Orange","CA")),Map("white"->"black","eye"->"black"),Map("height"->"5.2"))
  )

  val arrayStructureSchema = new StructType().add("name",StringType).add("addresses", ArrayType(new StructType().add("city",StringType).add("state",StringType))).add("properties", MapType(StringType,StringType)).add("secondProp", MapType(StringType,StringType))

  val concatDF = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)

  concatDF.printSchema()

  concatDF.show()

  concatDF.withColumn("mapConcat",map_concat(col("properties"),col("secondProp"))).select("name","mapConcat").show(false)

  concatDF.withColumn("mapFromEntries",map_from_entries(col("addresses"))).select("name","mapFromEntries").show(false)
}


// String to Date
Seq(("06-03-2009"),("07-24-2009")).toDF("Date").select(
    col("Date"),
    to_date(col("Date"),"MM-dd-yyyy").as("to_date")
  ).show()


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s