How to Manage Documents Using DocumentService in OpenRAG: Ingestion, Retrieval, and Deletion

You can manage documents in OpenRAG by invoking the DocumentService class to handle file uploads with automatic SHA-256 deduplication, Docling-based chunking, and OpenSearch indexing, while the REST API and Python SDK provide high-level wrappers for these operations.

The langflow-ai/openrag repository provides a robust DocumentService that orchestrates the entire document lifecycle—from raw file upload to vector storage. Whether you are building custom FastAPI endpoints or integrating via the official SDK, understanding how to manage documents using DocumentService in OpenRAG is essential for production RAG pipelines.

DocumentService Architecture and Data Flow

The ingestion pipeline in src/services/document_service.py follows a deterministic sequence to ensure data integrity and efficient vectorization.

File Reception and Deduplication When process_upload_file receives a Starlette UploadFile, it immediately streams the content to temporary storage while computing a deterministic SHA-256 hash via hash_id. The service queries OpenSearch using opensearch_client.exists to check for existing documents. If a duplicate is found, the system returns status: "unchanged" unless the replace_duplicates flag is enabled.

Processing and Chunking For new documents, the service invokes TaskProcessor.process_document_standard, which converts files to a Docling-compatible representation. The extract_relevant function in src/utils/document_processing.py flattens text and tables into per-page chunks. Subsequently, chunk_texts_for_embeddings uses tiktoken to count tokens and group chunks into batches that respect the embedding model’s context window.

Indexing Finally, the service writes batches to the configured OpenSearch index (retrieved from config.settings.get_index_name) using the user-specific client from session_manager.get_user_opensearch_client.

Ingesting Documents into OpenRAG

You can ingest documents through three interfaces depending on your integration requirements.

Direct Service Invocation

For background workers or custom business logic, instantiate DocumentService directly and call process_upload_file.

import asyncio
from src.services.document_service import DocumentService
from src.session_manager import SessionManager

async def ingest_directly(upload_file):
    """
    upload_file: Starlette UploadFile instance
    """
    doc_service = DocumentService(session_manager=SessionManager())
    
    result = await doc_service.process_upload_file(
        upload_file,
        owner_user_id="user-123",
        jwt_token="eyJhbGci...",  # Optional OIDC token

        owner_name="Jane Doe",
        owner_email="[email protected]",
        delete_after_ingest=False,
        replace_duplicates=False
    )
    
    print(f"Ingestion status: {result['status']}")
    print(f"Document ID: {result.get('hash_id')}")

# asyncio.run(ingest_directly(my_file))

This method provides full control over metadata and processing flags while handling temporary file cleanup automatically via auto_cleanup_tempfile.

REST API Endpoint

The FastAPI endpoint in src/api/v1/documents.py exposes the same logic over HTTP at /api/v1/documents/ingest.

curl -X POST "https://your-openrag-instance/api/v1/documents/ingest" \
  -H "Authorization: Bearer <API_KEY>" \
  -F "file=@/path/to/annual_report.pdf" \
  -F "delete_after_ingest=true" \
  -F "replace_duplicates=true"

The ingest_endpoint forwards the multipart request to DocumentService, returning a task ID for asynchronous tracking.

Python SDK Implementation

The official SDK abstracts HTTP calls through the DocumentsClient class in sdks/python/openrag_sdk/documents.py.

import asyncio
from openrag_sdk.client import OpenRAGClient

async def sdk_ingestion():
    client = OpenRAGClient(
        api_key="YOUR_API_KEY",
        base_url="https://your-openrag-instance"
    )
    
    # Ingest with polling until completion

    status = await client.documents.ingest(
        file_path="data/manual.pdf",
        wait=True,
        metadata={"department": "engineering"}
    )
    
    print(f"Task completed: {status.status}")
    print(f"Chunks indexed: {status.chunks_count}")

# asyncio.run(sdk_ingestion())

The SDK handles multipart encoding, status polling via wait_for_task, and error retry logic automatically.

Extracting Context for Chat Applications

For conversational interfaces that require document content without persistent indexing, use process_upload_context instead of the full ingestion pipeline.

import asyncio
from src.services.document_service import DocumentService
from src.session_manager import SessionManager

async def extract_chat_context(upload_file):
    doc_service = DocumentService(session_manager=SessionManager())
    
    context = await doc_service.process_upload_context(
        upload_file,
        filename="temporary_notes.md",
        use_docling=True  # Convert PDFs to structured text

    )
    
    # Returns dict with "content" key containing consolidated text

    print(f"Extracted {len(context['content'])} characters")
    return context["content"]

# asyncio.run(extract_chat_context(my_upload))

This method reads the file into memory, optionally runs Docling conversion via extract_relevant, and returns a single consolidated string with page number markers—ideal for injecting into LLM prompts without OpenSearch overhead.

Deleting Documents and Indexed Chunks

Document deletion removes all associated chunks from the vector index using a filename-based query.

from src.api.v1.documents import delete_document_endpoint
from src.utils.opensearch_queries import build_filename_delete_body
from config.settings import get_index_name

async def delete_by_filename(session_manager, user_id, filename):
    client = session_manager.get_user_opensearch_client(
        user_id=user_id,
        jwt_token=None
    )
    
    query = build_filename_delete_body(filename)
    
    result = await client.delete_by_query(
        index=get_index_name(),
        body=query,
        conflicts="proceed"
    )
    
    deleted_count = result.get("deleted", 0)
    print(f"Removed {deleted_count} chunks for {filename}")
    return deleted_count

The delete_document_endpoint in src/api/v1/documents.py wraps this logic, accepting a filename parameter and returning the deletion count. When using the SDK, simply call await client.documents.delete(filename="report.pdf").

Summary

  • Use DocumentService.process_upload_file in src/services/document_service.py for server-side ingestion with automatic deduplication via SHA-256 hashing.
  • Configure token-aware batching through chunk_texts_for_embeddings to ensure chunks respect your embedding model's token limits before OpenSearch indexing.
  • Extract ephemeral content via process_upload_context for chat-based applications without persisting vectors to the database.
  • Remove documents completely using delete_by_query with build_filename_delete_body, which targets all chunks matching the source filename in the OpenSearch index.
  • Leverage the Python SDK (DocumentsClient in sdks/python/openrag_sdk/documents.py) for simplified HTTP integration with built-in polling and error handling.

Frequently Asked Questions

How does OpenRAG prevent duplicate document uploads?

DocumentService computes a deterministic SHA-256 hash of the file content using hash_id before processing. In process_upload_file, the service checks opensearch_client.exists for this hash. If found, it immediately returns status: "unchanged" unless the replace_duplicates parameter is set to true, in which case it reprocesses and overwrites the existing document chunks.

What chunking strategy does DocumentService employ?

After Docling conversion via extract_relevant in src/utils/document_processing.py, the service passes extracted text to chunk_texts_for_embeddings. This function uses tiktoken to calculate exact token counts per chunk and groups them into batches that respect the configured embedding model's maximum context window, ensuring efficient vectorization without truncation.

Can I extract document text without storing it in the vector database?

Yes. Call process_upload_context instead of process_upload_file. This method loads the file into memory, optionally runs Docling conversion, and returns a consolidated text string with page markers suitable for LLM context windows. It performs no OpenSearch writes, making it ideal for one-time chat interactions or temporary analysis.

How do I verify that all chunks were deleted for a specific file?

The delete_document_endpoint in src/api/v1/documents.py executes a delete_by_query operation using build_filename_delete_body to match all chunks where the filename field equals your target. The OpenSearch response includes a deleted count indicating exactly how many chunks were removed, which the SDK and API return in the response body for verification.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →