Article
document-ingestionragpythonautomationvector-databasedata-preprocessinglangchainscript
Automate Document Ingestion for RAG Systems
Use the Document Ingestion Pipeline script to automate data preprocessing for your RAG applications. This guide shows how to set up the script to fetch, clean, chunk, and embed documents from local files and URLs, preparing them for a vector store.
intermediate30 min5 steps
The play
- Set Up Your EnvironmentThe Document Ingestion Pipeline is a script that relies on several data processing and AI libraries. Install the necessary Python packages using pip. This example uses LangChain components, which are common for such pipelines.
- Initialize the PipelineInstantiate the Document Ingestion Pipeline. You'll typically configure it with an embedding model, and parameters for text splitting like chunk size and overlap. This setup is crucial for controlling how documents are processed.
- Ingest Local FilesUse the pipeline to process documents from your local filesystem. The script's format detection capability automatically selects the correct loader for different file types like .txt, .pdf, or .md. Create a file named 'data/project_notes.txt' to test.
- Ingest from URLsThe pipeline can also fetch and process content directly from web pages. Provide a list of URLs to ingest. The script will download the HTML, extract the main text content, and process it just like a local file.
- Build and Save the Vector StoreCombine all processed documents and use the pipeline to build a vector store. This final step embeds all text chunks and saves them to a searchable index (e.g., FAISS), which your RAG application can then query.
Starter code
import os
from langchain.document_loaders import UnstructuredFileLoader, UnstructuredURLLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
class DocumentIngestionPipeline:
"""A plausible implementation of the described Document Ingestion Pipeline."""
def __init__(self, embedding_model_name='all-MiniLM-L6-v2', chunk_size=1000, chunk_overlap=100):
self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
self.embeddings = HuggingFaceEmbeddings(model_name=embedding_model_name)
print("Pipeline initialized.")
def ingest_from_path(self, file_path: str):
"""Ingests and processes a single document from a local file path."""
print(f"Ingesting from file: {file_path}")
loader = UnstructuredFileLoader(file_path)
documents = loader.load()
return self.text_splitter.split_documents(documents)
def ingest_from_urls(self, urls: list[str]):
"""Ingests and processes documents from a list of URLs."""
print(f"Ingesting from URLs: {urls}")
loader = UnstructuredURLLoader(urls=urls, show_progress_bar=True)
documents = loader.load()
return self.text_splitter.split_documents(documents)
def build_vector_store(self, docs):
"""Creates a FAISS vector store from a list of documents."""
print(f"Building vector store with {len(docs)} document chunks...")
return FAISS.from_documents(docs, self.embeddings)
if __name__ == '__main__':
# --- 1. Create dummy data for the example ---
if not os.path.exists('data'):
os.makedirs('data')
with open('data/my_document.txt', 'w') as f:
f.write("This is the first document. It contains important information about AI pipelines.")
print("Created 'data/my_document.txt' for demonstration.")
# --- 2. Initialize and run the pipeline ---
pipeline = DocumentIngestionPipeline(
embedding_model_name='all-MiniLM-L6-v2',
chunk_size=500,
chunk_overlap=50
)
# --- 3. Ingest documents from different sources ---
local_chunks = pipeline.ingest_from_path('data/my_document.txt')
url_chunks = pipeline.ingest_from_urls(['https://www.promptingguide.ai/'])
# --- 4. Combine chunks and build the vector store ---
all_chunks = local_chunks + url_chunks
if all_chunks:
vector_store = pipeline.build_vector_store(all_chunks)
# --- 5. Save the store and test a query ---
db_path = "my_faiss_index"
vector_store.save_local(db_path)
print(f"Vector store saved to '{db_path}'.")
# Optional: Load and query to verify
db = FAISS.load_local(db_path, pipeline.embeddings, allow_dangerous_deserialization=True)
query = "What are AI pipelines?"
results = db.similarity_search(query, k=1)
print(f"\nQuery: '{query}'")
print(f"Top result: {results[0].page_content[:200]}...")
else:
print("No documents were ingested. Vector store not created.")