Article
delta-lakepysparkdata-engineeringdata-laketime-travelacid-transactionslakehouse
Get Started with Delta Lake and PySpark
Learn to build a reliable data lake using Delta Lake. This guide shows you how to create, read, update, and time-travel your data tables with ACID transactions using PySpark.
beginner15 min5 steps
The play
- Setup PySpark with Delta LakeInitialize a SparkSession configured to use Delta Lake. This requires adding the Delta Spark package and setting specific SQL extensions, enabling Spark to process the Delta format.
- Create Your First Delta TableCreate a sample PySpark DataFrame and write it to a file path using the `delta` format. Delta Lake stores the data in versioned Parquet files along with a transaction log (`_delta_log`) that tracks all changes.
- Read and Update the TableRead the data back from your Delta Lake path. Then, create a new DataFrame and overwrite the existing table. This operation creates a new version of the table in the transaction log without affecting the old version.
- Time Travel to a Previous VersionUse Delta Lake's time travel feature to query a previous version of your table. By specifying `versionAsOf` in the read options, you can access the data as it existed before the update, demonstrating data versioning.
- Perform an Upsert with MergeUse the MERGE operation to perform an 'upsert' (update and insert). This is a common, powerful operation in data warehousing that Delta Lake brings to data lakes, ensuring atomicity.
Starter code
import shutil
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
# This script assumes you have run: pip install pyspark delta-spark
# 1. Configure and create a SparkSession with Delta Lake support
print("Configuring SparkSession with Delta Lake...")
builder = SparkSession.builder.appName("DeltaLakeStarter") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
delta_table_path = "/tmp/delta-starter-table"
# 2. Create a Delta table (Version 0)
print(f"\nCreating Delta table at: {delta_table_path}")
data_v0 = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "d")
], ["id", "value"])
data_v0.write.format("delta").mode("overwrite").save(delta_table_path)
print("Initial data (Version 0):")
data_v0.show()
# 3. Overwrite the table with new data (Creates Version 1)
print("\nOverwriting table to create Version 1...")
data_v1 = spark.createDataFrame([
(4, "e"),
(5, "f")
], ["id", "value"])
data_v1.write.format("delta").mode("overwrite").save(delta_table_path)
print("Current data (Version 1):")
current_df = spark.read.format("delta").load(delta_table_path)
current_df.show()
# 4. Use Time Travel to read the original data (Version 0)
print("\nReading Version 0 using Time Travel:")
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df_v0.show()
# 5. Clean up the created Delta table directory
print(f"\nCleaning up by deleting {delta_table_path}...")
try:
shutil.rmtree(delta_table_path)
print("Cleanup successful.")
except OSError as e:
print(f"Error during cleanup: {e.strerror}")
spark.stop()