It has been a while since I last published something, so I am going to refresh how to work with spark. These are some notes that I think I will need some day. I start with the basic operation with the Dataframes. I am going to run a spark-shell, actual version is 2.4.4.
// It can probably be optimized.
import org.apache.spark.sql.Row
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.types.MapType
import org.apache.spark.sql.types.ArrayType
import scala.collection.mutable
import org.apache.spark.sql.functions.{col, explode, lit, map, map_concat, map_from_entries, map_keys, map_values}
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
import org.apache.spark.sql.{Column, Row, SparkSession}
val structureData = Seq(
Row("36636","Finance",Row(3000,"USA")),
Row("40288","Finance",Row(5000,"IND")),
Row("42114","Sales",Row(3900,"USA")),
Row("39192","Marketing",Row(2500,"CAN")),
Row("34534","Sales",Row(6500,"USA"))
)
# curious, but i have to flat this sentence instead of doing like structuredData
# creating schema, there is a better way if i know how the json file is constructed, but it is ok
# https://stackoverflow.com/questions/40957585/create-spark-dataframe-schema-from-json-schema-representation
val structureSchema = new StructType().add("id",StringType).add("dept",StringType).add("properties",new StructType().add("salary",IntegerType).add("location",StringType))
var df = spark.createDataFrame(spark.sparkContext.parallelize(structureData),structureSchema)
df.printSchema()
root
|-- id: string (nullable = true)
|-- dept: string (nullable = true)
|-- properties: struct (nullable = true)
| |-- salary: integer (nullable = true)
| |-- location: string (nullable = true)
df.show(false)
+-----+---------+-----------+
|id |dept |properties |
+-----+---------+-----------+
|36636|Finance |[3000, USA]|
|40288|Finance |[5000, IND]|
|42114|Sales |[3900, USA]|
|39192|Marketing|[2500, CAN]|
|34534|Sales |[6500, USA]|
+-----+---------+-----------+
val index = df.schema.fieldIndex("properties")
index: Int = 2
val propSchema = df.schema(index).dataType.asInstanceOf[StructType]
propSchema: org.apache.spark.sql.types.StructType = StructType(StructField(salary,IntegerType,true), StructField(location,StringType,true))
var columns = mutable.LinkedHashSet[Column]()
columns: scala.collection.mutable.LinkedHashSet[org.apache.spark.sql.Column] = Set()
propSchema.fields.foreach(field =>{
columns.add(lit(field.name))
columns.add(col("properties." + field.name))
})
df = df.withColumn("propertiesMap",map(columns.toSeq:_*))
df.printSchema
df.show(false)
df = df.drop("properties")
df.printSchema
root
|-- id: string (nullable = true)
|-- dept: string (nullable = true)
|-- propertiesMap: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
df.show(false)
+-----+---------+---------------------------------+
|id |dept |propertiesMap |
+-----+---------+---------------------------------+
|36636|Finance |[salary -> 3000, location -> USA]|
|40288|Finance |[salary -> 5000, location -> IND]|
|42114|Sales |[salary -> 3900, location -> USA]|
|39192|Marketing|[salary -> 2500, location -> CAN]|
|34534|Sales |[salary -> 6500, location -> USA]|
+-----+---------+---------------------------------+
# not very useful...
df.select(col("id"),map_keys(col("propertiesMap"))).show(false)
+-----+-----------------------+
|id |map_keys(propertiesMap)|
+-----+-----------------------+
|36636|[salary, location] |
|40288|[salary, location] |
|42114|[salary, location] |
|39192|[salary, location] |
|34534|[salary, location] |
+-----+-----------------------+
# this is useful...
df.select(col("id"),map_values(col("propertiesMap"))).show(false)
val arrayStructureData = Seq(
Row("James",List(Row("Newark","NY"),Row("Brooklyn","NY")),Map("hair"->"black","eye"->"brown"), Map("height"->"5.9")),
Row("Michael",List(Row("SanJose","CA"),Row("Sandiago","CA")),Map("hair"->"brown","eye"->"black"),Map("height"->"6")),
Row("Robert",List(Row("LasVegas","NV")),Map("hair"->"red","eye"->"gray"),Map("height"->"6.3")),
Row("Maria",null,Map("hair"->"blond","eye"->"red"),Map("height"->"5.6")),
Row("Jen",List(Row("LAX","CA"),Row("Orange","CA")),Map("white"->"black","eye"->"black"),Map("height"->"5.2"))
)
val arrayStructureSchema = new StructType().add("name",StringType).add("addresses", ArrayType(new StructType().add("city",StringType).add("state",StringType))).add("properties", MapType(StringType,StringType)).add("secondProp", MapType(StringType,StringType))
val concatDF = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
concatDF.withColumn("mapConcat",map_concat(col("properties"),col("secondProp"))).select("name","mapConcat").show(false)
+-------+---------------------------------------------+
|name |mapConcat |
+-------+---------------------------------------------+
|James |[hair -> black, eye -> brown, height -> 5.9] |
|Michael|[hair -> brown, eye -> black, height -> 6] |
|Robert |[hair -> red, eye -> gray, height -> 6.3] |
|Maria |[hair -> blond, eye -> red, height -> 5.6] |
|Jen |[white -> black, eye -> black, height -> 5.2]|
+-------+---------------------------------------------+
scala> concatDF.withColumn("mapProperties",map_concat(col("properties"),col("secondProp"))).select("name","addresses","mapProperties").show(false)
concatDF.withColumn("mapFromEntries",map_from_entries(col("addresses"))).select("name","mapFromEntries").show(false)
# doing this is the same than above...
concatDF.select("name","addresses").show(false)
# Complete Spark SQL map function example
import org.apache.spark.sql.functions.{col, explode, lit, map, map_concat, map_from_entries, map_keys, map_values}
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
import org.apache.spark.sql.{Column, Row, SparkSession}
import scala.collection.mutable
object MapFunctions extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkSQL_Map_Complete")
.getOrCreate()
import spark.implicits._
val structureData = Seq(
Row("36636","Finance",Row(3000,"USA")),
Row("40288","Finance",Row(5000,"IND")),
Row("42114","Sales",Row(3900,"USA")),
Row("39192","Marketing",Row(2500,"CAN")),
Row("34534","Sales",Row(6500,"USA"))
)
val structureSchema = new StructType().add("id",StringType).add("dept",StringType).add("properties",new StructType().add("salary",IntegerType).add("location",StringType))
var df = spark.createDataFrame(spark.sparkContext.parallelize(structureData),structureSchema)
df.printSchema()
df.show(false)
// Convert to Map
val index = df.schema.fieldIndex("properties")
val propSchema = df.schema(index).dataType.asInstanceOf[StructType]
var columns = mutable.LinkedHashSet[Column]()
propSchema.fields.foreach(field =>{
columns.add(lit(field.name))
columns.add(col("properties." + field.name))
})
df = df.withColumn("propertiesMap",map(columns.toSeq:_*))
df.printSchema()
df = df.drop("properties")
df.printSchema()
df.show(false)
//Retrieve all keys from a Map
val keys = df.select(explode(map_keys($"propertiesMap"))).as[String].distinct.collect
print(keys.mkString(","))
// map_keys
df.select(col("id"),map_keys(col("propertiesMap"))).show(false)
//map_values
df.select(col("id"),map_values(col("propertiesMap"))).show(false)
//Creating DF with MapType
val arrayStructureData = Seq(
Row("James",List(Row("Newark","NY"),Row("Brooklyn","NY")),Map("hair"->"black","eye"->"brown"), Map("height"->"5.9")),
Row("Michael",List(Row("SanJose","CA"),Row("Sandiago","CA")),Map("hair"->"brown","eye"->"black"),Map("height"->"6")),
Row("Robert",List(Row("LasVegas","NV")),Map("hair"->"red","eye"->"gray"),Map("height"->"6.3")),
Row("Maria",null,Map("hair"->"blond","eye"->"red"),Map("height"->"5.6")),
Row("Jen",List(Row("LAX","CA"),Row("Orange","CA")),Map("white"->"black","eye"->"black"),Map("height"->"5.2"))
)
val arrayStructureSchema = new StructType().add("name",StringType).add("addresses", ArrayType(new StructType().add("city",StringType).add("state",StringType))).add("properties", MapType(StringType,StringType)).add("secondProp", MapType(StringType,StringType))
val concatDF = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
concatDF.printSchema()
concatDF.show()
concatDF.withColumn("mapConcat",map_concat(col("properties"),col("secondProp"))).select("name","mapConcat").show(false)
concatDF.withColumn("mapFromEntries",map_from_entries(col("addresses"))).select("name","mapFromEntries").show(false)
}
// String to Date
Seq(("06-03-2009"),("07-24-2009")).toDF("Date").select(
col("Date"),
to_date(col("Date"),"MM-dd-yyyy").as("to_date")
).show()