Tutorial Delta Lake Spark
Tutorial Delta Lake¶
# %%
# Import necessary libraries
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# %%
# Configure Spark session with Delta Lake dependencies
spark = (
SparkSession.builder.appName("Delta Lake")
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
1. Create a Simple Dataset¶
To start with, let's create a simple dataset consisting of letters and numbers.
# %%
# Create a simple dataset
data = [("a", 1), ("b", 2), ("c", 3), ("d", 4), ("e", 5)]
# Create a DataFrame
df = spark.createDataFrame(data, ["letter", "number"])
2. Write Data to a Delta Lake Table¶
Now, we'll write the data to a Delta Lake table named letters.delta. Delta Lake provides ACID transactions and time travel capabilities, making it suitable for reliable data storage and versioning.
# %%
# Write the data to a Delta Lake table
df.write.format("delta").mode("overwrite").save("data/letters.delta")
history_df = spark.sql("DESCRIBE HISTORY 'data/letters.delta'")
history_df.show(truncate=False)
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp |userId|userName|operation|operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|0 |2024-07-03 14:01:44.696|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |NULL |Serializable |false |{numFiles -> 6, numOutputRows -> 5, numOutputBytes -> 3936}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
3. Read Data from the Delta Lake Table¶
Let's read the data back from the Delta Lake table and display it.
# %%
# Read the data from the Delta Lake table
df = spark.read.format("delta").load("data/letters.delta")
df.show()
+------+------+ |letter|number| +------+------+ | d| 4| | c| 3| | e| 5| | b| 2| | a| 1| +------+------+
4. Append Data to the Delta Lake Table¶
We can append new data to the existing Delta Lake table without affecting the existing data.
# %%
# Append data to the Delta Lake table
new_data = [("f", 6), ("g", 7)]
df_new = spark.createDataFrame(new_data, ["letter", "number"])
df_new.write.format("delta").mode("append").save("data/letters.delta")
history_df = spark.sql("DESCRIBE HISTORY 'data/letters.delta'")
history_df.show(truncate=False)
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp |userId|userName|operation|operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|1 |2024-07-03 14:01:52.167|NULL |NULL |WRITE |{mode -> Append, partitionBy -> []} |NULL|NULL |NULL |0 |Serializable |true |{numFiles -> 3, numOutputRows -> 2, numOutputBytes -> 1809}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|0 |2024-07-03 14:01:44.696|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |NULL |Serializable |false |{numFiles -> 6, numOutputRows -> 5, numOutputBytes -> 3936}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
# %%
# Read the Delta Lake table with appended data
df = spark.read.format("delta").load("data/letters.delta")
df.show()
+------+------+ |letter|number| +------+------+ | d| 4| | c| 3| | g| 7| | f| 6| | e| 5| | b| 2| | a| 1| +------+------+
5. Update Data in the Delta Lake Table¶
Delta Lake supports updating data in place. Let's update the data to set the number to 100 where the letter is 'a'.
# %%
# Update data in the Delta Lake table
df = spark.read.format("delta").load("data/letters.delta")
# Update the data
df = df.withColumn(
"number", F.when(F.col("letter") == "a", 100).otherwise(F.col("number"))
)
# Overwrite the Delta Lake table with updated data
df.write.format("delta").mode("overwrite").save("data/letters.delta")
history_df = spark.sql("DESCRIBE HISTORY 'data/letters.delta'")
history_df.show(truncate=False)
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp |userId|userName|operation|operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|2 |2024-07-03 14:02:04.829|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |1 |Serializable |false |{numFiles -> 7, numOutputRows -> 7, numOutputBytes -> 4963}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|1 |2024-07-03 14:01:52.167|NULL |NULL |WRITE |{mode -> Append, partitionBy -> []} |NULL|NULL |NULL |0 |Serializable |true |{numFiles -> 3, numOutputRows -> 2, numOutputBytes -> 1809}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|0 |2024-07-03 14:01:44.696|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |NULL |Serializable |false |{numFiles -> 6, numOutputRows -> 5, numOutputBytes -> 3936}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
6. Read Data from a Specific Version of the Delta Lake Table¶
Delta Lake maintains a version history of changes. Let's retrieve data from a specific version of the Delta Lake table.
# %%
# Read data from a specific version of the Delta Lake table
df_version1 = (
spark.read.format("delta").option("versionAsOf", 1).load("data/letters.delta")
)
df_version1.show()
+------+------+ |letter|number| +------+------+ | d| 4| | c| 3| | g| 7| | f| 6| | e| 5| | b| 2| | a| 1| +------+------+
7. Get History of Changes Made to the Delta Lake Table¶
Delta Lake allows us to view the history of changes made to the table, which is useful for auditing and understanding data evolution over time.
# %%
# Get the history of changes made to the Delta Lake table
history_df = spark.sql("DESCRIBE HISTORY 'data/letters.delta'")
history_df.show(truncate=False)
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp |userId|userName|operation|operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|2 |2024-07-03 14:02:04.829|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |1 |Serializable |false |{numFiles -> 7, numOutputRows -> 7, numOutputBytes -> 4963}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|1 |2024-07-03 14:01:52.167|NULL |NULL |WRITE |{mode -> Append, partitionBy -> []} |NULL|NULL |NULL |0 |Serializable |true |{numFiles -> 3, numOutputRows -> 2, numOutputBytes -> 1809}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|0 |2024-07-03 14:01:44.696|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |NULL |Serializable |false |{numFiles -> 6, numOutputRows -> 5, numOutputBytes -> 3936}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+