How to Manage Langflow Flows in OpenRAG Using FlowsService

The FlowsService class in OpenRAG provides a centralized, async-ready interface to backup, reset, and modify Langflow flow configurations programmatically through methods like backup_all_flows(), reset_langflow_flow(), and change_langflow_model_value().

OpenRAG orchestrates complex retrieval-augmented generation pipelines by storing Langflow flows as JSON files under the flows/ directory. When you need to manage Langflow flows in OpenRAG using FlowsService, you interact with a singleton service that abstracts file I/O, version control, and API synchronization into atomic operations defined in src/services/flows_service.py.

FlowsService Architecture and Lifecycle

The FlowsService encapsulates every runtime interaction with Langflow flows, including backup generation, reset detection, and model value updates.

Core Components

  • FlowsService class – Defined in src/services/flows_service.py, this service discovers the flows/ directory relative to the project root and exposes methods for flow manipulation.
  • Flows directory – Physical JSON representations of each flow are stored here, typically volume-mounted at ~/.openrag/flows/ as referenced in the _get_flows_directory() method (lines 28‑34).
  • Backup subdirectory – Timestamped copies are stored on-the-fly via _get_backup_directory() (lines 35‑40) for audit and recovery purposes.

Service Instantiation

The service is instantiated once during application startup. In src/main.py, the initialize_services() function (lines 19‑20) creates the FlowsService instance and injects it into the FastAPI application state, making it available to route handlers through the request context.

Backing Up and Resetting Flows

OpenRAG maintains flow integrity through automated backup procedures and reset capabilities that restore flows to their bundled defaults.

Creating Timestamped Backups

Use backup_all_flows() to snapshot every flow. When only_if_changed=True, the service compares the current flow against the last backup and skips unchanged files.

from services.flows_service import FlowsService

flows_service = FlowsService()

# Run inside an async context

backup_summary = await flows_service.backup_all_flows(only_if_changed=True)
print(backup_summary)

The method returns a dictionary with success, backed_up, skipped, and failed keys, detailing which flows were archived and their backup paths.

Detecting Manual Resets

The check_flows_reset() method (lines 49‑76 in src/services/flows_service.py) compares each live flow against its bundled JSON file to detect whether a user has manually modified or reset it outside the service.

reset_flows = await flows_service.check_flows_reset()
if reset_flows:
    print("Flows requiring reconfiguration:", reset_flows)

Executing Flow Resets

To restore a flow to its shipped state, invoke reset_langflow_flow() (lines 334‑367). This method uploads the original bundled JSON back to the Langflow API, overwriting any local modifications.

result = await flows_service.reset_langflow_flow(flow_type="ingest")
print("Reset status:", result)

The method raises ValueError for invalid flow types or FileNotFoundError if the bundled JSON is missing.

Updating Model Configurations Across Flows

When you switch LLM or embedding providers, change_langflow_model_value() (lines 778‑834) walks through every flow, locates provider-specific component nodes, and updates dropdown values atomically.

Changing Embedding Models

Update the embedding model for a specific provider across all flows:

response = await flows_service.change_langflow_model_value(
    provider="openai",
    embedding_model="text-embedding-3-large",
    force_embedding_update=True,
)

Batch Updating LLM and Embedding Models

Modify both model types in a single call to ensure consistency across retrieval and generation pipelines:

await flows_service.change_langflow_model_value(
    provider="watsonx",
    embedding_model="ibm/slate-125m-embedding",
    llm_model="ibm/granite-13b-chat-v2",
    force_embedding_update=True,
    force_llm_update=True,
)

The returned dictionary contains per-flow success flags and lists of updated nodes, allowing you to verify which components received the new model values.

Key Source Files and References

Understanding the file structure helps when extending or debugging the service:

  • src/services/flows_service.py – Core logic for FlowsService, including backup routines, reset detection, and model updates.
  • src/main.py – Application bootstrap that initializes the service via initialize_services() and exposes it through FastAPI state.
  • src/api/settings.py – Contains _get_flows_service() (lines 89‑94), a lazy import helper used by settings-related routes to access the service instance.

Summary

  • Instantiate once: Create a single FlowsService instance during startup via initialize_services() in src/main.py.
  • Backup regularly: Use backup_all_flows(only_if_changed=True) to maintain timestamped snapshots without redundant writes.
  • Detect drift: Call check_flows_reset() to identify flows that deviate from their bundled configurations.
  • Restore defaults: Invoke reset_langflow_flow(flow_type) to push original JSON files back to the Langflow API.
  • Update models: Use change_langflow_model_value() with force_embedding_update or force_llm_update flags to propagate provider changes across all flow nodes.

Frequently Asked Questions

How do I access the FlowsService instance in my own scripts?

Import the class directly from src/services/flows_service.py and instantiate it with FlowsService(). The constructor requires no arguments because it dynamically discovers the flows/ directory relative to the project root using _get_flows_directory().

Where are flow backups physically stored on disk?

Backups are written to a backup/ subdirectory inside the flows directory, typically located at ~/.openrag/flows/backup/. The _get_backup_directory() method creates this folder automatically if it does not exist.

Can I reset multiple flows at once, or only individually?

The current implementation in src/services/flows_service.py processes resets individually through reset_langflow_flow(flow_type). To reset multiple flows, iterate over a list of flow types and call the method for each, handling any ValueError or FileNotFoundError exceptions that indicate missing flow definitions.

What happens if I update model values but the flow is currently running?

The change_langflow_model_value() method modifies the underlying JSON configuration and updates the Langflow API immediately. According to the source code, these changes take effect on the next execution cycle, but active running flows may continue using cached model references until they are restarted or the cache expires.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →