How to Manage Documents Using DocumentService in OpenRAG: Ingestion, Retrieval, and Deletion
You can manage documents in OpenRAG by invoking the DocumentService class to handle file uploads with automatic SHA-256 deduplication, Docling-based chunking, and OpenSearch indexing, while the REST API and Python SDK provide high-level wrappers for these operations.
The langflow-ai/openrag repository provides a robust DocumentService that orchestrates the entire document lifecycle—from raw file upload to vector storage. Whether you are building custom FastAPI endpoints or integrating via the official SDK, understanding how to manage documents using DocumentService in OpenRAG is essential for production RAG pipelines.
DocumentService Architecture and Data Flow
The ingestion pipeline in src/services/document_service.py follows a deterministic sequence to ensure data integrity and efficient vectorization.
File Reception and Deduplication
When process_upload_file receives a Starlette UploadFile, it immediately streams the content to temporary storage while computing a deterministic SHA-256 hash via hash_id. The service queries OpenSearch using opensearch_client.exists to check for existing documents. If a duplicate is found, the system returns status: "unchanged" unless the replace_duplicates flag is enabled.
Processing and Chunking
For new documents, the service invokes TaskProcessor.process_document_standard, which converts files to a Docling-compatible representation. The extract_relevant function in src/utils/document_processing.py flattens text and tables into per-page chunks. Subsequently, chunk_texts_for_embeddings uses tiktoken to count tokens and group chunks into batches that respect the embedding model’s context window.
Indexing
Finally, the service writes batches to the configured OpenSearch index (retrieved from config.settings.get_index_name) using the user-specific client from session_manager.get_user_opensearch_client.
Ingesting Documents into OpenRAG
You can ingest documents through three interfaces depending on your integration requirements.
Direct Service Invocation
For background workers or custom business logic, instantiate DocumentService directly and call process_upload_file.
import asyncio
from src.services.document_service import DocumentService
from src.session_manager import SessionManager
async def ingest_directly(upload_file):
"""
upload_file: Starlette UploadFile instance
"""
doc_service = DocumentService(session_manager=SessionManager())
result = await doc_service.process_upload_file(
upload_file,
owner_user_id="user-123",
jwt_token="eyJhbGci...", # Optional OIDC token
owner_name="Jane Doe",
owner_email="[email protected]",
delete_after_ingest=False,
replace_duplicates=False
)
print(f"Ingestion status: {result['status']}")
print(f"Document ID: {result.get('hash_id')}")
# asyncio.run(ingest_directly(my_file))
This method provides full control over metadata and processing flags while handling temporary file cleanup automatically via auto_cleanup_tempfile.
REST API Endpoint
The FastAPI endpoint in src/api/v1/documents.py exposes the same logic over HTTP at /api/v1/documents/ingest.
curl -X POST "https://your-openrag-instance/api/v1/documents/ingest" \
-H "Authorization: Bearer <API_KEY>" \
-F "file=@/path/to/annual_report.pdf" \
-F "delete_after_ingest=true" \
-F "replace_duplicates=true"
The ingest_endpoint forwards the multipart request to DocumentService, returning a task ID for asynchronous tracking.
Python SDK Implementation
The official SDK abstracts HTTP calls through the DocumentsClient class in sdks/python/openrag_sdk/documents.py.
import asyncio
from openrag_sdk.client import OpenRAGClient
async def sdk_ingestion():
client = OpenRAGClient(
api_key="YOUR_API_KEY",
base_url="https://your-openrag-instance"
)
# Ingest with polling until completion
status = await client.documents.ingest(
file_path="data/manual.pdf",
wait=True,
metadata={"department": "engineering"}
)
print(f"Task completed: {status.status}")
print(f"Chunks indexed: {status.chunks_count}")
# asyncio.run(sdk_ingestion())
The SDK handles multipart encoding, status polling via wait_for_task, and error retry logic automatically.
Extracting Context for Chat Applications
For conversational interfaces that require document content without persistent indexing, use process_upload_context instead of the full ingestion pipeline.
import asyncio
from src.services.document_service import DocumentService
from src.session_manager import SessionManager
async def extract_chat_context(upload_file):
doc_service = DocumentService(session_manager=SessionManager())
context = await doc_service.process_upload_context(
upload_file,
filename="temporary_notes.md",
use_docling=True # Convert PDFs to structured text
)
# Returns dict with "content" key containing consolidated text
print(f"Extracted {len(context['content'])} characters")
return context["content"]
# asyncio.run(extract_chat_context(my_upload))
This method reads the file into memory, optionally runs Docling conversion via extract_relevant, and returns a single consolidated string with page number markers—ideal for injecting into LLM prompts without OpenSearch overhead.
Deleting Documents and Indexed Chunks
Document deletion removes all associated chunks from the vector index using a filename-based query.
from src.api.v1.documents import delete_document_endpoint
from src.utils.opensearch_queries import build_filename_delete_body
from config.settings import get_index_name
async def delete_by_filename(session_manager, user_id, filename):
client = session_manager.get_user_opensearch_client(
user_id=user_id,
jwt_token=None
)
query = build_filename_delete_body(filename)
result = await client.delete_by_query(
index=get_index_name(),
body=query,
conflicts="proceed"
)
deleted_count = result.get("deleted", 0)
print(f"Removed {deleted_count} chunks for {filename}")
return deleted_count
The delete_document_endpoint in src/api/v1/documents.py wraps this logic, accepting a filename parameter and returning the deletion count. When using the SDK, simply call await client.documents.delete(filename="report.pdf").
Summary
- Use
DocumentService.process_upload_fileinsrc/services/document_service.pyfor server-side ingestion with automatic deduplication via SHA-256 hashing. - Configure token-aware batching through
chunk_texts_for_embeddingsto ensure chunks respect your embedding model's token limits before OpenSearch indexing. - Extract ephemeral content via
process_upload_contextfor chat-based applications without persisting vectors to the database. - Remove documents completely using
delete_by_querywithbuild_filename_delete_body, which targets all chunks matching the source filename in the OpenSearch index. - Leverage the Python SDK (
DocumentsClientinsdks/python/openrag_sdk/documents.py) for simplified HTTP integration with built-in polling and error handling.
Frequently Asked Questions
How does OpenRAG prevent duplicate document uploads?
DocumentService computes a deterministic SHA-256 hash of the file content using hash_id before processing. In process_upload_file, the service checks opensearch_client.exists for this hash. If found, it immediately returns status: "unchanged" unless the replace_duplicates parameter is set to true, in which case it reprocesses and overwrites the existing document chunks.
What chunking strategy does DocumentService employ?
After Docling conversion via extract_relevant in src/utils/document_processing.py, the service passes extracted text to chunk_texts_for_embeddings. This function uses tiktoken to calculate exact token counts per chunk and groups them into batches that respect the configured embedding model's maximum context window, ensuring efficient vectorization without truncation.
Can I extract document text without storing it in the vector database?
Yes. Call process_upload_context instead of process_upload_file. This method loads the file into memory, optionally runs Docling conversion, and returns a consolidated text string with page markers suitable for LLM context windows. It performs no OpenSearch writes, making it ideal for one-time chat interactions or temporary analysis.
How do I verify that all chunks were deleted for a specific file?
The delete_document_endpoint in src/api/v1/documents.py executes a delete_by_query operation using build_filename_delete_body to match all chunks where the filename field equals your target. The OpenSearch response includes a deleted count indicating exactly how many chunks were removed, which the SDK and API return in the response body for verification.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →