https://docs.delta.io/latest/quick-start.html#create-a-table&language-scala

first, install apache spark, i am osx user, so i will not recommend to use homebrew because it will not install third party libraries.

I recommend to download from https://spark.apache.org

Latest version is 3.5.0 at 28 nov 2023.

Then, run spark-shell with delta lake support:

ATTENTION, be sure about delta lake version, you must be sure in the matrix compability:

https://docs.delta.io/latest/quick-start.html#set-up-interactive-shell

Before running any shell or any command, we must be sure that it is working in our local machine.

┌<▪> ~/spark-3.5.0-bin-hadoop3
└➤
bin/spark-submit --class org.apache.spark.examples.JavaSparkPi --master local examples/jars/spark-examples_2.12-3.5.0.jar
23/11/28 10:20:52 INFO SparkContext: Running Spark version 3.5.0
23/11/28 10:20:52 INFO SparkContext: OS info Mac OS X, 14.1.1, x86_64
23/11/28 10:20:52 INFO SparkContext: Java version 21.0.1
23/11/28 10:20:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
...
23/11/28 10:20:54 INFO DAGScheduler: Job 0 finished: reduce at JavaSparkPi.java:54, took 0,686980 s
Pi is roughly 3.14132
23/11/28 10:20:54 INFO SparkContext: SparkContext is stopping with exitCode 0.
...
23/11/28 10:20:54 INFO ShutdownHookManager: Deleting directory /private/var/folders/0x/gmzly6vn2f7ggfk47bdcsbnc0000gn/T/spark-cd55e3bf-9b5c-454b-8fc5-bd0f16abbdb6

With the above output we are sure about the software. Be careful because, some ways to invoke the jar and the classes inside in it has changed.

Now, i am going to run the spark-shell with delta lake support.

┌<▸> ~
└➤
bin/spark-shell --packages io.delta:delta-spark_2.12:3.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

23/11/27 13:19:33 WARN Utils: Your hostname, MacBook-Pro-de-Alonso.local resolves to a loopback address: 127.0.0.1; using 192.168.1.37 instead (on interface en0)
23/11/27 13:19:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/usr/local/Cellar/apache-spark/3.5.0/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/aironman/.ivy2/cache
The jars for the packages stored in: /Users/aironman/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2958dc03-f45d-455d-bf22-16798fe37d4f;1.0
    confs: [default]
    found io.delta#delta-spark_2.12;3.0.0 in central
    found io.delta#delta-storage;3.0.0 in central
    found org.antlr#antlr4-runtime;4.9.3 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.0.0/delta-spark_2.12-3.0.0.jar ...
    [SUCCESSFUL ] io.delta#delta-spark_2.12;3.0.0!delta-spark_2.12.jar (1366ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/3.0.0/delta-storage-3.0.0.jar ...
    [SUCCESSFUL ] io.delta#delta-storage;3.0.0!delta-storage.jar (39ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.9.3/antlr4-runtime-4.9.3.jar ...
    [SUCCESSFUL ] org.antlr#antlr4-runtime;4.9.3!antlr4-runtime.jar (59ms)
:: resolution report :: resolve 2080ms :: artifacts dl 1471ms
    :: modules in use:
    io.delta#delta-spark_2.12;3.0.0 from central in [default]
    io.delta#delta-storage;3.0.0 from central in [default]
    org.antlr#antlr4-runtime;4.9.3 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   3   |   3   |   3   |   0   ||   3   |   3   |
    ---------------------------------------------------------------------

:: problems summary ::
:::: ERRORS
    unknown resolver null


:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
:: retrieving :: org.apache.spark#spark-submit-parent-2958dc03-f45d-455d-bf22-16798fe37d4f
    confs: [default]
    3 artifacts copied, 0 already retrieved (5230kB/9ms)
23/11/27 13:19:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.37:4040
Spark context available as 'sc' (master = local[*], app id = local-1701087581574).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/

Using Scala version 2.12.18 (Java HotSpot(TM) 64-Bit Server VM, Java 21.0.1)
Type in expressions to have them evaluated.
Type :help for more information.

lets load a clientes.csv file, something like:

┌<▪> ~
└➤
cat clientes.csv
id,nombre,apellido,edad,sexo
1,"Juan","López",30,"Hombre"
2,"María","García",25,"Mujer"
3,"Antonio","Martínez",40,"Hombre"
4,"Laura","Rodríguez",20,"Mujer"
5,"Pedro","Pérez",35,"Hombre"
6,"Ana","González",27,"Mujer"
7,"David","Sánchez",42,"Hombre"
8,"Isabel","Pérez",29,"Mujer"
9,"Luis","Hernández",37,"Hombre"
10,"Carmen","García",24,"Mujer"

scala>
scala> val words = sc.textFile("clientes.csv")
words: org.apache.spark.rdd.RDD[String] = clientes.csv MapPartitionsRDD[1] at textFile at <console>:23

This is a word count, typical example.
scala> words.flatMap(_.split("")).map((_,1)).reduceByKey(_+_).foreach(println)
(s,3)
(d,8)
(e,26)
(G,3)
(M,7)
(z,9)
(7,3)
(5,3)
(a,16)
(ó,1)
(i,5)
(A,2)
(",60)
(4,4)
(8,1)
(u,9)
(I,1)
(o,13)
(9,2)
(3,4)
(L,3)
(p,2)
(x,1)
(R,1)
(P,3)
(é,2)
(í,5)
(S,1)
(t,2)
(0,4)
(b,7)
(6,1)
(C,1)
(1,2)
(g,1)
(m,7)
(h,1)
(c,3)
(2,7)
(n,11)
(j,5)
(á,3)
(J,1)
(v,1)
(H,6)
(,,44)
(l,4)
(r,22)
(D,1)

Lets import some libraries…

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df= spark.read.csv("clientes.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 3 more fields]

scala> df.show(false)
+---+-------+---------+----+------+
|_c0|_c1    |_c2      |_c3 |_c4   |
+---+-------+---------+----+------+
|id |nombre |apellido |edad|sexo  |
|1  |Juan   |López    |30  |Hombre|
|2  |María  |García   |25  |Mujer |
|3  |Antonio|Martínez |40  |Hombre|
|4  |Laura  |Rodríguez|20  |Mujer |
|5  |Pedro  |Pérez    |35  |Hombre|
|6  |Ana    |González |27  |Mujer |
|7  |David  |Sánchez  |42  |Hombre|
|8  |Isabel |Pérez    |29  |Mujer |
|9  |Luis   |Hernández|37  |Hombre|
|10 |Carmen |García   |24  |Mujer |
+---+-------+---------+----+------+

I can create a parquet file from this csv, but,…

scala> df.write.mode("overwrite").parquet("clientes.parquet")

This will create a folder named clientes-delta with a parquet file inside. In fact, this will create a delta lake table…

scala> df.write.format("delta").save("clientes-delta")

We can load delta table to a Read only DataFrame
scala> val dfclientes = spark.read.format(«delta»).load(«clientes-delta»)
dfclientes: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string … 3 more fields]

scala> dfclientes.show(false)
+---+-------+---------+----+------+
|_c0|_c1    |_c2      |_c3 |_c4   |
+---+-------+---------+----+------+
|id |nombre |apellido |edad|sexo  |
|1  |Juan   |López    |30  |Hombre|
|2  |María  |García   |25  |Mujer |
|3  |Antonio|Martínez |40  |Hombre|
|4  |Laura  |Rodríguez|20  |Mujer |
|5  |Pedro  |Pérez    |35  |Hombre|
|6  |Ana    |González |27  |Mujer |
|7  |David  |Sánchez  |42  |Hombre|
|8  |Isabel |Pérez    |29  |Mujer |
|9  |Luis   |Hernández|37  |Hombre|
|10 |Carmen |García   |24  |Mujer |
+---+-------+---------+----+------+

but although you can think that a parquet file is a delta table, it is not. Delta Lake is not Impala
You cannot create a delta lake table from a parquet file…

scala> val anotherDFClientes = spark.read.format("delta").load("clientes.parquet")
org.apache.spark.sql.delta.DeltaAnalysisException: [DELTA_TABLE_NOT_FOUND] Delta table `clientes.parquet` doesn't exist.
  at org.apache.spark.sql.delta.DeltaErrorsBase.nonExistentDeltaTable(DeltaErrors.scala:1393)
  at org.apache.spark.sql.delta.DeltaErrorsBase.nonExistentDeltaTable$(DeltaErrors.scala:1392)
  at org.apache.spark.sql.delta.DeltaErrors$.nonExistentDeltaTable(DeltaErrors.scala:3039)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation$lzycompute(DeltaTableV2.scala:241)
  at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation(DeltaTableV2.scala:229)
  at org.apache.spark.sql.delta.sources.DeltaDataSource.$anonfun$createRelation$5(DeltaDataSource.scala:249)
  at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
  at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
  at org.apache.spark.sql.delta.sources.DeltaDataSource.recordFrameProfile(DeltaDataSource.scala:49)
  at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:208)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
  ... 51 elided

Another way to load a delta lake table, first we import these libraries…
scala> import io.delta._
import io.delta._

scala> import io.delta.tables._
import io.delta.tables._

scala> val deltaClientesTable = DeltaTable.forPath("clientes-delta")
deltaClientesTable: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@40fce182

scala> deltaClientesTable.toDF.show(false)
+---+-------+---------+----+------+
|_c0|_c1    |_c2      |_c3 |_c4   |
+---+-------+---------+----+------+
|id |nombre |apellido |edad|sexo  |
|1  |Juan   |López    |30  |Hombre|
|2  |María  |García   |25  |Mujer |
|3  |Antonio|Martínez |40  |Hombre|
|4  |Laura  |Rodríguez|20  |Mujer |
|5  |Pedro  |Pérez    |35  |Hombre|
|6  |Ana    |González |27  |Mujer |
|7  |David  |Sánchez  |42  |Hombre|
|8  |Isabel |Pérez    |29  |Mujer |
|9  |Luis   |Hernández|37  |Hombre|
|10 |Carmen |García   |24  |Mujer |
+---+-------+---------+----+------+

I am going to update this delta lake table
scala> val set = Map(«_c0» -> expr(«_c0 + 100»), «_c1» -> expr(«Alonso»), «_c2» -> expr(«Isidoro»), «_c3» -> 47, «_c4» -> «Hombre»)
set: scala.collection.immutable.Map[String,Any] = Map(_c3 -> 47, _c0 -> (_c0 + 100), _c4 -> Hombre, _c1 -> Alonso, _c2 -> Isidoro)

scala> import org.apache.spark.sql.Column
import org.apache.spark.sql.Column

watch out, this is update if exist the row, if not, spark will not create the row
val setNew = Map(«_c0» -> «_c0 + 100», «_c1» -> «‘Alonso'», «_c2» -> «‘Isidoro'», «_c3» -> «47», «_c4» -> «‘Hombre'»)
scala> deltaClientesTable.updateExpr(«_c0 = 1»,setNew)

scala> deltaClientesTable.toDF.show(false)
+-----+-------+---------+----+------+
|_c0  |_c1    |_c2      |_c3 |_c4   |
+-----+-------+---------+----+------+
|id   |nombre |apellido |edad|sexo  |
|101.0|Alonso |Isidoro  |47  |Hombre|
|2    |María  |García   |25  |Mujer |
|3    |Antonio|Martínez |40  |Hombre|
|4    |Laura  |Rodríguez|20  |Mujer |
|5    |Pedro  |Pérez    |35  |Hombre|
|6    |Ana    |González |27  |Mujer |
|7    |David  |Sánchez  |42  |Hombre|
|8    |Isabel |Pérez    |29  |Mujer |
|9    |Luis   |Hernández|37  |Hombre|
|10   |Carmen |García   |24  |Mujer |
+-----+-------+---------+----+------+       

If you want to insert new data to this delta lake table, you have to merge a new Dataframe to the table. Yes it is weird, maybe it will change in the future…

Lets load a new Dataframe…

scala> val dfDelta = spark.read.csv("clientes-delta.csv")
dfDelta: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 3 more fields]

scala> dfDelta.show(false)
+---+-------+---------+----+------+
|_c0|_c1    |_c2      |_c3 |_c4   |
+---+-------+---------+----+------+
|id |nombre |apellido |edad|sexo  |
|11 |Daniel |Pérez    |32  |Hombre|
|12 |María  |Rodríguez|26  |Mujer |
|13 |Antonio|García   |41  |Hombre|
|14 |Laura  |Martínez |21  |Mujer |
|15 |Pedro  |Pérez    |36  |Hombre|
|16 |Ana    |González |28  |Mujer |
|17 |David  |Sánchez  |43  |Hombre|
|18 |Isabel |Pérez    |30  |Mujer |
|19 |Luis   |Hernández|38  |Hombre|
|20 |Carmen |García   |25  |Mujer |
+---+-------+---------+----+------+

So deltaClientesTable is the name of previous delta table with an update. Now i am inserting, well merging a new dataframe…

scala> deltaClientesTable.as("target").merge(dfDelta.as("source"), "target._c0 = source._c0").whenMatched().updateAll().whenNotMatched().insertAll().execute()


scala> deltaClientesTable.toDF.show(false)
+-----+-------+---------+---+------+
|_c0  |_c1    |_c2      |_c3|_c4   |
+-----+-------+---------+---+------+
|10   |Carmen |García   |24 |Mujer |
|101.0|Alonso |Isidoro  |47 |Hombre|
|11   |Daniel |Pérez    |32 |Hombre|
|12   |María  |Rodríguez|26 |Mujer |
|13   |Antonio|García   |41 |Hombre|
|14   |Laura  |Martínez |21 |Mujer |
|15   |Pedro  |Pérez    |36 |Hombre|
|16   |Ana    |González |28 |Mujer |
|17   |David  |Sánchez  |43 |Hombre|
|18   |Isabel |Pérez    |30 |Mujer |
|19   |Luis   |Hernández|38 |Hombre|
|2    |María  |García   |25 |Mujer |
|20   |Carmen |García   |25 |Mujer |
|3    |Antonio|Martínez |40 |Hombre|
|4    |Laura  |Rodríguez|20 |Mujer |
|5    |Pedro  |Pérez    |35 |Hombre|
|6    |Ana    |González |27 |Mujer |
|7    |David  |Sánchez  |42 |Hombre|
|8    |Isabel |Pérez    |29 |Mujer |
|9    |Luis   |Hernández|37 |Hombre|
+-----+-------+---------+---+------+
only showing top 20 rows

work in progress

Deja un comentario