Skip to main content
Article
delta-lakepysparkdata-engineeringdata-laketime-travelacid-transactionslakehouse

Get Started with Delta Lake and PySpark

Learn to build a reliable data lake using Delta Lake. This guide shows you how to create, read, update, and time-travel your data tables with ACID transactions using PySpark.

beginner15 min5 steps
The play
  1. Setup PySpark with Delta Lake
    Initialize a SparkSession configured to use Delta Lake. This requires adding the Delta Spark package and setting specific SQL extensions, enabling Spark to process the Delta format.
  2. Create Your First Delta Table
    Create a sample PySpark DataFrame and write it to a file path using the `delta` format. Delta Lake stores the data in versioned Parquet files along with a transaction log (`_delta_log`) that tracks all changes.
  3. Read and Update the Table
    Read the data back from your Delta Lake path. Then, create a new DataFrame and overwrite the existing table. This operation creates a new version of the table in the transaction log without affecting the old version.
  4. Time Travel to a Previous Version
    Use Delta Lake's time travel feature to query a previous version of your table. By specifying `versionAsOf` in the read options, you can access the data as it existed before the update, demonstrating data versioning.
  5. Perform an Upsert with Merge
    Use the MERGE operation to perform an 'upsert' (update and insert). This is a common, powerful operation in data warehousing that Delta Lake brings to data lakes, ensuring atomicity.
Starter code
import shutil
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# This script assumes you have run: pip install pyspark delta-spark

# 1. Configure and create a SparkSession with Delta Lake support
print("Configuring SparkSession with Delta Lake...")
builder = SparkSession.builder.appName("DeltaLakeStarter") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

delta_table_path = "/tmp/delta-starter-table"

# 2. Create a Delta table (Version 0)
print(f"\nCreating Delta table at: {delta_table_path}")
data_v0 = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "d")
], ["id", "value"])
data_v0.write.format("delta").mode("overwrite").save(delta_table_path)
print("Initial data (Version 0):")
data_v0.show()

# 3. Overwrite the table with new data (Creates Version 1)
print("\nOverwriting table to create Version 1...")
data_v1 = spark.createDataFrame([
    (4, "e"),
    (5, "f")
], ["id", "value"])
data_v1.write.format("delta").mode("overwrite").save(delta_table_path)

print("Current data (Version 1):")
current_df = spark.read.format("delta").load(delta_table_path)
current_df.show()

# 4. Use Time Travel to read the original data (Version 0)
print("\nReading Version 0 using Time Travel:")
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df_v0.show()

# 5. Clean up the created Delta table directory
print(f"\nCleaning up by deleting {delta_table_path}...")
try:
    shutil.rmtree(delta_table_path)
    print("Cleanup successful.")
except OSError as e:
    print(f"Error during cleanup: {e.strerror}")

spark.stop()
Get Started with Delta Lake and PySpark — Action Pack