Skip to main content
Article
ragvector-searchmodel-servinggenaidatabricksllmdelta-lakepython

Build a RAG App with Databricks Vector Search & Model Serving

Create a vector index from your documents using Databricks Vector Search. Then, build a complete Retrieval-Augmented Generation (RAG) application by querying the index and feeding the results to a foundation model like DBRX, all within the Databricks platform.

intermediate1 hour5 steps
The play
  1. Prepare Data in a Delta Table
    Start in a Databricks notebook. Create a Spark DataFrame with your text data and save it as a Delta Table in Unity Catalog. This table will be the source for our vector index, enabling automatic synchronization.
  2. Create a Vector Search Endpoint
    A Vector Search Endpoint serves the embedding model and the vector index. Use the Databricks SDK to create one. This is a one-time setup for a given project or use case.
  3. Create a Delta Sync Index
    Create a Vector Search Index that automatically syncs from your source Delta Table. We specify the source table, the endpoint, the column to index, and the embedding model to use. Databricks handles the embedding process.
  4. Query the Index for Context
    With the index ready, use the `similarity_search` method to find relevant documents for a given query. This is the 'Retrieval' step in RAG. The results are the context we'll provide to the LLM.
  5. Generate an Answer with an LLM
    Use a Databricks Foundation Model endpoint (like DBRX Instruct) to generate a final answer. Combine the retrieved context with the original query in a prompt to give the LLM the information it needs.
Starter code
import pyspark.sql.functions as F
from databricks.vector_search.client import VectorSearchClient
from databricks.sdk import WorkspaceClient
import time

# --- 1. Configuration: Replace with your details ---
CATALOG = "main" # Your Unity Catalog name
SCHEMA = "default" # Your schema name

# Names for the resources we will create
TABLE_NAME = "documents_for_rag"
ENDPOINT_NAME = "rag_app_endpoint"

# --- 2. Setup: Create a source Delta Table ---
print("Step 2: Creating source Delta table...")
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

data = [
    (1, "Databricks is a data and AI company founded by the original creators of Apache Spark."),
    (2, "Delta Lake is an open-source storage framework that brings ACID transactions to data lakes."),
    (3, "MLflow is an open-source platform to manage the ML lifecycle, including experimentation, reproducibility and deployment."),
    (4, "Databricks Model Serving provides a highly available and low-latency service for deploying models.")
]
columns = ["id", "text"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(TABLE_NAME)

# --- 3. Create Vector Search Endpoint & Index ---
print("Step 3: Creating Vector Search endpoint and index...")
vsc = VectorSearchClient()

if ENDPOINT_NAME not in [e['name'] for e in vsc.list_endpoints().get('endpoints', [])]:
    print(f"Creating endpoint '{ENDPOINT_NAME}'. This may take 10-15 minutes...")
    vsc.create_endpoint(name=ENDPOINT_NAME, endpoint_type="STANDARD")
else:
    print(f"Endpoint '{ENDPOINT_NAME}' already exists.")

# Wait for endpoint to be ready
while vsc.get_endpoint(ENDPOINT_NAME)["status"]["state"] != 'ONLINE':
    print("Waiting for endpoint to be ready...")
    time.sleep(60)

SOURCE_TABLE_FULLNAME = f"{CATALOG}.{SCHEMA}.{TABLE_NAME}"
INDEX_FULLNAME = f"{CATALOG}.{SCHEMA}.{TABLE_NAME}_index"

if INDEX_FULLNAME not in [i['name'] for i in vsc.list_indexes(ENDPOINT_NAME).get('vector_indexes', [])]:
    print(f"Creating index '{INDEX_FULLNAME}'...")
    vsc.create_delta_sync_index(
        endpoint_name=ENDPOINT_NAME,
        index_name=INDEX_FULLNAME,
        source_table_name=SOURCE_TABLE_FULLNAME,
        pipeline_type='TRIGGERED',
        primary_key="id",
        text_column="text",
        embedding_model_endpoint_name="databricks-bge-large-en"
    )
else:
    print(f"Index '{INDEX_FULLNAME}' already exists. Syncing...")
    vsc.get_index(ENDPOINT_NAME, INDEX_FULLNAME).sync()

# Wait for index to be ready
while not vsc.get_index(ENDPOINT_NAME, INDEX_FULLNAME).status.get('ready', False):
    print("Waiting for index to be ready...")
    time.sleep(30)

print("Endpoint and index are ready.")

# --- 4. Perform RAG ---
print("\nStep 4: Performing RAG query...")
index = vsc.get_index(endpoint_name=ENDPOINT_NAME, index_name=INDEX_FULLNAME)

query_text = "What is MLflow used for?"

# 4a. Retrieve relevant documents
results = index.similarity_search(
    query_text=query_text,
    columns=["text"],
    num_results=2
)
retrieved_context = "\n".join([doc["text"] for doc in results.get('result', {}).get('data_array', [])])

# 4b. Augment prompt and generate answer
w = WorkspaceClient()
prompt = f"""You are a helpful assistant. Answer the user's question based ONLY on the following context.

Context:
{retrieved_context}

Question: {query_text}

Answer:"""

response = w.chat.completions.create(
    model="databricks-dbrx-instruct",
    messages=[{"role": "user", "content": prompt}],
    max_tokens=256
)

# --- 5. Display Results ---
print("--- RAG Application Results ---")
print(f"Query: {query_text}")
print(f"\nRetrieved Context:\n{retrieved_context}")
print(f"\nGenerated Answer:\n{response.choices[0].message.content}")
Build a RAG App with Databricks Vector Search & Model Serving — Action Pack