https://docs.delta.io/latest/quick-start.html#create-a-table&language-scala
first, install apache spark, i am osx user, so i will not recommend to use homebrew because it will not install third party libraries.
I recommend to download from https://spark.apache.org
Latest version is 3.5.0 at 28 nov 2023.
Then, run spark-shell with delta lake support:
ATTENTION, be sure about delta lake version, you must be sure in the matrix compability:
https://docs.delta.io/latest/quick-start.html#set-up-interactive-shell
Before running any shell or any command, we must be sure that it is working in our local machine.
┌<▪> ~/spark-3.5.0-bin-hadoop3
└➤
bin/spark-submit --class org.apache.spark.examples.JavaSparkPi --master local examples/jars/spark-examples_2.12-3.5.0.jar
23/11/28 10:20:52 INFO SparkContext: Running Spark version 3.5.0
23/11/28 10:20:52 INFO SparkContext: OS info Mac OS X, 14.1.1, x86_64
23/11/28 10:20:52 INFO SparkContext: Java version 21.0.1
23/11/28 10:20:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
...
23/11/28 10:20:54 INFO DAGScheduler: Job 0 finished: reduce at JavaSparkPi.java:54, took 0,686980 s
Pi is roughly 3.14132
23/11/28 10:20:54 INFO SparkContext: SparkContext is stopping with exitCode 0.
...
23/11/28 10:20:54 INFO ShutdownHookManager: Deleting directory /private/var/folders/0x/gmzly6vn2f7ggfk47bdcsbnc0000gn/T/spark-cd55e3bf-9b5c-454b-8fc5-bd0f16abbdb6
With the above output we are sure about the software. Be careful because, some ways to invoke the jar and the classes inside in it has changed.
Now, i am going to run the spark-shell with delta lake support.
┌<▸> ~
└➤
bin/spark-shell --packages io.delta:delta-spark_2.12:3.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
23/11/27 13:19:33 WARN Utils: Your hostname, MacBook-Pro-de-Alonso.local resolves to a loopback address: 127.0.0.1; using 192.168.1.37 instead (on interface en0)
23/11/27 13:19:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/usr/local/Cellar/apache-spark/3.5.0/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/aironman/.ivy2/cache
The jars for the packages stored in: /Users/aironman/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2958dc03-f45d-455d-bf22-16798fe37d4f;1.0
confs: [default]
found io.delta#delta-spark_2.12;3.0.0 in central
found io.delta#delta-storage;3.0.0 in central
found org.antlr#antlr4-runtime;4.9.3 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.0.0/delta-spark_2.12-3.0.0.jar ...
[SUCCESSFUL ] io.delta#delta-spark_2.12;3.0.0!delta-spark_2.12.jar (1366ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/3.0.0/delta-storage-3.0.0.jar ...
[SUCCESSFUL ] io.delta#delta-storage;3.0.0!delta-storage.jar (39ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.9.3/antlr4-runtime-4.9.3.jar ...
[SUCCESSFUL ] org.antlr#antlr4-runtime;4.9.3!antlr4-runtime.jar (59ms)
:: resolution report :: resolve 2080ms :: artifacts dl 1471ms
:: modules in use:
io.delta#delta-spark_2.12;3.0.0 from central in [default]
io.delta#delta-storage;3.0.0 from central in [default]
org.antlr#antlr4-runtime;4.9.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 3 | 3 | 0 || 3 | 3 |
---------------------------------------------------------------------
:: problems summary ::
:::: ERRORS
unknown resolver null
:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
:: retrieving :: org.apache.spark#spark-submit-parent-2958dc03-f45d-455d-bf22-16798fe37d4f
confs: [default]
3 artifacts copied, 0 already retrieved (5230kB/9ms)
23/11/27 13:19:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.37:4040
Spark context available as 'sc' (master = local[*], app id = local-1701087581574).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.0
/_/
Using Scala version 2.12.18 (Java HotSpot(TM) 64-Bit Server VM, Java 21.0.1)
Type in expressions to have them evaluated.
Type :help for more information.
lets load a clientes.csv file, something like:
┌<▪> ~
└➤
cat clientes.csv
id,nombre,apellido,edad,sexo
1,"Juan","López",30,"Hombre"
2,"María","García",25,"Mujer"
3,"Antonio","Martínez",40,"Hombre"
4,"Laura","Rodríguez",20,"Mujer"
5,"Pedro","Pérez",35,"Hombre"
6,"Ana","González",27,"Mujer"
7,"David","Sánchez",42,"Hombre"
8,"Isabel","Pérez",29,"Mujer"
9,"Luis","Hernández",37,"Hombre"
10,"Carmen","García",24,"Mujer"
scala>
scala> val words = sc.textFile("clientes.csv")
words: org.apache.spark.rdd.RDD[String] = clientes.csv MapPartitionsRDD[1] at textFile at <console>:23
This is a word count, typical example.
scala> words.flatMap(_.split("")).map((_,1)).reduceByKey(_+_).foreach(println)
(s,3)
(d,8)
(e,26)
(G,3)
(M,7)
(z,9)
(7,3)
(5,3)
(a,16)
(ó,1)
(i,5)
(A,2)
(",60)
(4,4)
(8,1)
(u,9)
(I,1)
(o,13)
(9,2)
(3,4)
(L,3)
(p,2)
(x,1)
(R,1)
(P,3)
(é,2)
(í,5)
(S,1)
(t,2)
(0,4)
(b,7)
(6,1)
(C,1)
(1,2)
(g,1)
(m,7)
(h,1)
(c,3)
(2,7)
(n,11)
(j,5)
(á,3)
(J,1)
(v,1)
(H,6)
(,,44)
(l,4)
(r,22)
(D,1)
Lets import some libraries…
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> val df= spark.read.csv("clientes.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 3 more fields]
scala> df.show(false)
+---+-------+---------+----+------+
|_c0|_c1 |_c2 |_c3 |_c4 |
+---+-------+---------+----+------+
|id |nombre |apellido |edad|sexo |
|1 |Juan |López |30 |Hombre|
|2 |María |García |25 |Mujer |
|3 |Antonio|Martínez |40 |Hombre|
|4 |Laura |Rodríguez|20 |Mujer |
|5 |Pedro |Pérez |35 |Hombre|
|6 |Ana |González |27 |Mujer |
|7 |David |Sánchez |42 |Hombre|
|8 |Isabel |Pérez |29 |Mujer |
|9 |Luis |Hernández|37 |Hombre|
|10 |Carmen |García |24 |Mujer |
+---+-------+---------+----+------+
I can create a parquet file from this csv, but,…
scala> df.write.mode("overwrite").parquet("clientes.parquet")
This will create a folder named clientes-delta with a parquet file inside. In fact, this will create a delta lake table…
scala> df.write.format("delta").save("clientes-delta")
We can load delta table to a Read only DataFrame
scala> val dfclientes = spark.read.format(«delta»).load(«clientes-delta»)
dfclientes: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string … 3 more fields]
scala> dfclientes.show(false)
+---+-------+---------+----+------+
|_c0|_c1 |_c2 |_c3 |_c4 |
+---+-------+---------+----+------+
|id |nombre |apellido |edad|sexo |
|1 |Juan |López |30 |Hombre|
|2 |María |García |25 |Mujer |
|3 |Antonio|Martínez |40 |Hombre|
|4 |Laura |Rodríguez|20 |Mujer |
|5 |Pedro |Pérez |35 |Hombre|
|6 |Ana |González |27 |Mujer |
|7 |David |Sánchez |42 |Hombre|
|8 |Isabel |Pérez |29 |Mujer |
|9 |Luis |Hernández|37 |Hombre|
|10 |Carmen |García |24 |Mujer |
+---+-------+---------+----+------+
but although you can think that a parquet file is a delta table, it is not. Delta Lake is not Impala
You cannot create a delta lake table from a parquet file…
scala> val anotherDFClientes = spark.read.format("delta").load("clientes.parquet")
org.apache.spark.sql.delta.DeltaAnalysisException: [DELTA_TABLE_NOT_FOUND] Delta table `clientes.parquet` doesn't exist.
at org.apache.spark.sql.delta.DeltaErrorsBase.nonExistentDeltaTable(DeltaErrors.scala:1393)
at org.apache.spark.sql.delta.DeltaErrorsBase.nonExistentDeltaTable$(DeltaErrors.scala:1392)
at org.apache.spark.sql.delta.DeltaErrors$.nonExistentDeltaTable(DeltaErrors.scala:3039)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation$lzycompute(DeltaTableV2.scala:241)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation(DeltaTableV2.scala:229)
at org.apache.spark.sql.delta.sources.DeltaDataSource.$anonfun$createRelation$5(DeltaDataSource.scala:249)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
at org.apache.spark.sql.delta.sources.DeltaDataSource.recordFrameProfile(DeltaDataSource.scala:49)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:208)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
... 51 elided
Another way to load a delta lake table, first we import these libraries…
scala> import io.delta._
import io.delta._
scala> import io.delta.tables._
import io.delta.tables._
scala> val deltaClientesTable = DeltaTable.forPath("clientes-delta")
deltaClientesTable: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@40fce182
scala> deltaClientesTable.toDF.show(false)
+---+-------+---------+----+------+
|_c0|_c1 |_c2 |_c3 |_c4 |
+---+-------+---------+----+------+
|id |nombre |apellido |edad|sexo |
|1 |Juan |López |30 |Hombre|
|2 |María |García |25 |Mujer |
|3 |Antonio|Martínez |40 |Hombre|
|4 |Laura |Rodríguez|20 |Mujer |
|5 |Pedro |Pérez |35 |Hombre|
|6 |Ana |González |27 |Mujer |
|7 |David |Sánchez |42 |Hombre|
|8 |Isabel |Pérez |29 |Mujer |
|9 |Luis |Hernández|37 |Hombre|
|10 |Carmen |García |24 |Mujer |
+---+-------+---------+----+------+
I am going to update this delta lake table
scala> val set = Map(«_c0» -> expr(«_c0 + 100»), «_c1» -> expr(«Alonso»), «_c2» -> expr(«Isidoro»), «_c3» -> 47, «_c4» -> «Hombre»)
set: scala.collection.immutable.Map[String,Any] = Map(_c3 -> 47, _c0 -> (_c0 + 100), _c4 -> Hombre, _c1 -> Alonso, _c2 -> Isidoro)
scala> import org.apache.spark.sql.Column
import org.apache.spark.sql.Column
watch out, this is update if exist the row, if not, spark will not create the row
val setNew = Map(«_c0» -> «_c0 + 100», «_c1» -> «‘Alonso'», «_c2» -> «‘Isidoro'», «_c3» -> «47», «_c4» -> «‘Hombre'»)
scala> deltaClientesTable.updateExpr(«_c0 = 1»,setNew)
scala> deltaClientesTable.toDF.show(false)
+-----+-------+---------+----+------+
|_c0 |_c1 |_c2 |_c3 |_c4 |
+-----+-------+---------+----+------+
|id |nombre |apellido |edad|sexo |
|101.0|Alonso |Isidoro |47 |Hombre|
|2 |María |García |25 |Mujer |
|3 |Antonio|Martínez |40 |Hombre|
|4 |Laura |Rodríguez|20 |Mujer |
|5 |Pedro |Pérez |35 |Hombre|
|6 |Ana |González |27 |Mujer |
|7 |David |Sánchez |42 |Hombre|
|8 |Isabel |Pérez |29 |Mujer |
|9 |Luis |Hernández|37 |Hombre|
|10 |Carmen |García |24 |Mujer |
+-----+-------+---------+----+------+
If you want to insert new data to this delta lake table, you have to merge a new Dataframe to the table. Yes it is weird, maybe it will change in the future…
Lets load a new Dataframe…
scala> val dfDelta = spark.read.csv("clientes-delta.csv")
dfDelta: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 3 more fields]
scala> dfDelta.show(false)
+---+-------+---------+----+------+
|_c0|_c1 |_c2 |_c3 |_c4 |
+---+-------+---------+----+------+
|id |nombre |apellido |edad|sexo |
|11 |Daniel |Pérez |32 |Hombre|
|12 |María |Rodríguez|26 |Mujer |
|13 |Antonio|García |41 |Hombre|
|14 |Laura |Martínez |21 |Mujer |
|15 |Pedro |Pérez |36 |Hombre|
|16 |Ana |González |28 |Mujer |
|17 |David |Sánchez |43 |Hombre|
|18 |Isabel |Pérez |30 |Mujer |
|19 |Luis |Hernández|38 |Hombre|
|20 |Carmen |García |25 |Mujer |
+---+-------+---------+----+------+
So deltaClientesTable is the name of previous delta table with an update. Now i am inserting, well merging a new dataframe…
scala> deltaClientesTable.as("target").merge(dfDelta.as("source"), "target._c0 = source._c0").whenMatched().updateAll().whenNotMatched().insertAll().execute()
scala> deltaClientesTable.toDF.show(false)
+-----+-------+---------+---+------+
|_c0 |_c1 |_c2 |_c3|_c4 |
+-----+-------+---------+---+------+
|10 |Carmen |García |24 |Mujer |
|101.0|Alonso |Isidoro |47 |Hombre|
|11 |Daniel |Pérez |32 |Hombre|
|12 |María |Rodríguez|26 |Mujer |
|13 |Antonio|García |41 |Hombre|
|14 |Laura |Martínez |21 |Mujer |
|15 |Pedro |Pérez |36 |Hombre|
|16 |Ana |González |28 |Mujer |
|17 |David |Sánchez |43 |Hombre|
|18 |Isabel |Pérez |30 |Mujer |
|19 |Luis |Hernández|38 |Hombre|
|2 |María |García |25 |Mujer |
|20 |Carmen |García |25 |Mujer |
|3 |Antonio|Martínez |40 |Hombre|
|4 |Laura |Rodríguez|20 |Mujer |
|5 |Pedro |Pérez |35 |Hombre|
|6 |Ana |González |27 |Mujer |
|7 |David |Sánchez |42 |Hombre|
|8 |Isabel |Pérez |29 |Mujer |
|9 |Luis |Hernández|37 |Hombre|
+-----+-------+---------+---+------+
only showing top 20 rows
work in progress