How to Manage Langflow Flows in OpenRAG Using FlowsService
The FlowsService class in OpenRAG provides a centralized, async-ready interface to backup, reset, and modify Langflow flow configurations programmatically through methods like backup_all_flows(), reset_langflow_flow(), and change_langflow_model_value().
OpenRAG orchestrates complex retrieval-augmented generation pipelines by storing Langflow flows as JSON files under the flows/ directory. When you need to manage Langflow flows in OpenRAG using FlowsService, you interact with a singleton service that abstracts file I/O, version control, and API synchronization into atomic operations defined in src/services/flows_service.py.
FlowsService Architecture and Lifecycle
The FlowsService encapsulates every runtime interaction with Langflow flows, including backup generation, reset detection, and model value updates.
Core Components
FlowsServiceclass – Defined insrc/services/flows_service.py, this service discovers theflows/directory relative to the project root and exposes methods for flow manipulation.- Flows directory – Physical JSON representations of each flow are stored here, typically volume-mounted at
~/.openrag/flows/as referenced in the_get_flows_directory()method (lines 28‑34). - Backup subdirectory – Timestamped copies are stored on-the-fly via
_get_backup_directory()(lines 35‑40) for audit and recovery purposes.
Service Instantiation
The service is instantiated once during application startup. In src/main.py, the initialize_services() function (lines 19‑20) creates the FlowsService instance and injects it into the FastAPI application state, making it available to route handlers through the request context.
Backing Up and Resetting Flows
OpenRAG maintains flow integrity through automated backup procedures and reset capabilities that restore flows to their bundled defaults.
Creating Timestamped Backups
Use backup_all_flows() to snapshot every flow. When only_if_changed=True, the service compares the current flow against the last backup and skips unchanged files.
from services.flows_service import FlowsService
flows_service = FlowsService()
# Run inside an async context
backup_summary = await flows_service.backup_all_flows(only_if_changed=True)
print(backup_summary)
The method returns a dictionary with success, backed_up, skipped, and failed keys, detailing which flows were archived and their backup paths.
Detecting Manual Resets
The check_flows_reset() method (lines 49‑76 in src/services/flows_service.py) compares each live flow against its bundled JSON file to detect whether a user has manually modified or reset it outside the service.
reset_flows = await flows_service.check_flows_reset()
if reset_flows:
print("Flows requiring reconfiguration:", reset_flows)
Executing Flow Resets
To restore a flow to its shipped state, invoke reset_langflow_flow() (lines 334‑367). This method uploads the original bundled JSON back to the Langflow API, overwriting any local modifications.
result = await flows_service.reset_langflow_flow(flow_type="ingest")
print("Reset status:", result)
The method raises ValueError for invalid flow types or FileNotFoundError if the bundled JSON is missing.
Updating Model Configurations Across Flows
When you switch LLM or embedding providers, change_langflow_model_value() (lines 778‑834) walks through every flow, locates provider-specific component nodes, and updates dropdown values atomically.
Changing Embedding Models
Update the embedding model for a specific provider across all flows:
response = await flows_service.change_langflow_model_value(
provider="openai",
embedding_model="text-embedding-3-large",
force_embedding_update=True,
)
Batch Updating LLM and Embedding Models
Modify both model types in a single call to ensure consistency across retrieval and generation pipelines:
await flows_service.change_langflow_model_value(
provider="watsonx",
embedding_model="ibm/slate-125m-embedding",
llm_model="ibm/granite-13b-chat-v2",
force_embedding_update=True,
force_llm_update=True,
)
The returned dictionary contains per-flow success flags and lists of updated nodes, allowing you to verify which components received the new model values.
Key Source Files and References
Understanding the file structure helps when extending or debugging the service:
src/services/flows_service.py– Core logic forFlowsService, including backup routines, reset detection, and model updates.src/main.py– Application bootstrap that initializes the service viainitialize_services()and exposes it through FastAPI state.src/api/settings.py– Contains_get_flows_service()(lines 89‑94), a lazy import helper used by settings-related routes to access the service instance.
Summary
- Instantiate once: Create a single
FlowsServiceinstance during startup viainitialize_services()insrc/main.py. - Backup regularly: Use
backup_all_flows(only_if_changed=True)to maintain timestamped snapshots without redundant writes. - Detect drift: Call
check_flows_reset()to identify flows that deviate from their bundled configurations. - Restore defaults: Invoke
reset_langflow_flow(flow_type)to push original JSON files back to the Langflow API. - Update models: Use
change_langflow_model_value()withforce_embedding_updateorforce_llm_updateflags to propagate provider changes across all flow nodes.
Frequently Asked Questions
How do I access the FlowsService instance in my own scripts?
Import the class directly from src/services/flows_service.py and instantiate it with FlowsService(). The constructor requires no arguments because it dynamically discovers the flows/ directory relative to the project root using _get_flows_directory().
Where are flow backups physically stored on disk?
Backups are written to a backup/ subdirectory inside the flows directory, typically located at ~/.openrag/flows/backup/. The _get_backup_directory() method creates this folder automatically if it does not exist.
Can I reset multiple flows at once, or only individually?
The current implementation in src/services/flows_service.py processes resets individually through reset_langflow_flow(flow_type). To reset multiple flows, iterate over a list of flow types and call the method for each, handling any ValueError or FileNotFoundError exceptions that indicate missing flow definitions.
What happens if I update model values but the flow is currently running?
The change_langflow_model_value() method modifies the underlying JSON configuration and updates the Langflow API immediately. According to the source code, these changes take effect on the next execution cycle, but active running flows may continue using cached model references until they are restarted or the cache expires.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →