How RAGFlow Manages State and Memory for Its AI Agents: A Deep Dive into the Document-Store Architecture
RAGFlow implements a document-level memory architecture that treats every AI agent interaction as a searchable vector document, using per-user indices and hybrid retrieval to maintain conversational state across sessions.
RAGFlow, the open-source RAG (Retrieval-Augmented Generation) engine developed by Infiniflow, stores AI agent state within the same vector-search infrastructure used for knowledge-base indexing. Unlike simple key-value session stores, RAGFlow persists every chat turn as a document-level memory backed by Elasticsearch, Infinity, or OceanBase, enabling semantic recall alongside lexical matching. This design unifies knowledge retrieval with conversational memory, allowing agents to reference prior interactions with the same precision used for document search.
The Four-Layer Memory Architecture
RAGFlow’s memory management operates through four tightly coupled layers that handle everything from physical storage provisioning to high-level semantic extraction.
Index Provisioning
Before storing any conversational data, RAGFlow creates a dedicated per-user message index named memory_<uid> that matches the dimensionality of the configured embedding model. In memory/services/messages.py, the MessageService.create_index() method initializes this index with the required vector size, ensuring each tenant’s state remains physically isolated from others.
Raw Message Ingestion
Every chat turn is stored as a raw document via MessageService.insert_message(), which assigns a deterministic composite ID (<memory_id>_<message_id>) and a binary status flag. This layer converts conversational payloads into the document schema expected by the underlying vector store, calling msgStoreConn.insert() to persist the data to the configured backend.
Vector-Enabled Retrieval
When an agent needs context, RAGFlow encodes the user’s latest utterance into a dense query vector and constructs a MatchDenseExpr merged with a full-text MatchTextExpr. The MsgTextQuery.question() method in memory/services/query.py orchestrates this hybrid expression building, combining weighted lexical terms with dense similarity filters for precise memory recall.
Structured Memory Extraction
Raw messages are transformed into typed knowledge through LLM-driven extraction. The PromptAssembler.assemble_system_prompt() function in memory/utils/prompt_util.py generates type-specific instructions that direct the model to categorize content as semantic, episodic, or procedural memory. The resulting JSON is stored back into the same index, creating a self-improving knowledge base distinct from unprocessed chat logs.
Memory Types and State Isolation
RAGFlow categorizes memory into distinct types defined in common/constants.py within the MemoryType enumeration: RAW (unprocessed chat turns), SEMANTIC (factual knowledge), EPISODIC (event sequences), and PROCEDURAL (task workflows). This taxonomy allows agents to retrieve specific categories of historical information rather than scanning entire conversation logs.
Per-User Indexing Strategy
State isolation is enforced through physical separation. All message operations prepend the user ID to the index name, creating memory_<uid> indices that prevent cross-tenant data leakage. During init_settings() in common/settings.py, the msgStoreConn is wired to the same backend as docStoreConn, ensuring that conversation memory and knowledge bases share identical query semantics, latency characteristics, and scaling properties.
Unified Document Store
By leveraging the same connection pool for both knowledge documents and conversation history, RAGFlow eliminates the architectural split common in other agent frameworks. The settings.msgStoreConn instance handles both message persistence and vector search, allowing a single query expression to traverse domain knowledge and personal conversation history simultaneously.
Retrieving Context: Recent Messages and Hybrid Search
Agents require different retrieval strategies depending on whether they need the latest turn or semantically relevant historical data.
Recent Message Shortcut
For immediate context windows, MessageService.get_recent_messages() fetches the N most recent raw messages across specified users and sessions, sorted by the valid_at timestamp. This bypasses vector search overhead when strict chronological recency is required, optimizing latency for ongoing dialogue maintenance.
Hybrid Vector + Text Retrieval
For deeper memory recall, MsgTextQuery.question() constructs a hybrid search combining MatchTextExpr for keyword matching and MatchDenseExpr for semantic similarity. This approach captures both exact terminology (e.g., specific error codes) and conceptual relationships (e.g., "deployment issues"), ensuring comprehensive context recovery even when users paraphrase previous statements.
Forgetting and Storage Policies
RAGFlow implements configurable eviction policies to prevent unbounded storage growth. When a memory store exceeds its quota, the system triggers MessageService.pick_messages_to_delete_by_fifo(), which enforces a FIFO (First-In-First-Out) policy as defined by MemoryStorageType.FIFO. This deterministic aging mechanism ensures that older, less relevant interactions are purged while maintaining the structured memories extracted from them, preserving critical knowledge even as raw chat history expires.
Implementing Memory Operations in Code
The following examples demonstrate how to interact with RAGFlow’s memory layer using the internal service APIs.
Creating Indices and Inserting Messages
Initialize the global settings once at application startup, then provision per-user indices and insert conversational data:
from memory.services.messages import MessageService
from common.settings import init_settings
# Initialize global connections (execute once at app start)
init_settings()
uid = "user_123"
memory_id = 42
vector_dim = 768 # Must match your embedding model output
# Create per-user index if it does not exist
if not MessageService.has_index(uid, memory_id):
MessageService.create_index(uid, memory_id, vector_dim)
# Prepare raw message payload
raw_msg = {
"message_id": 1,
"message_type": "raw",
"source_id": None,
"memory_id": memory_id,
"user_id": uid,
"agent_id": "assistant",
"session_id": "sess_abc",
"valid_at": "2024-10-01T12:00:00Z",
"content": "I just deployed version 2.1 of the service.",
"status": True,
"content_embed": [[0.12, 0.34, 0.56]] # Embedding generated upstream
}
MessageService.insert_message([raw_msg], uid, memory_id)
Source: Index existence check in memory/services/messages.py lines 30-33; insertion logic in lines 45-51.
Fetching Recent Context
Retrieve the latest N messages for immediate agent context:
from memory.services.messages import MessageService
recent = MessageService.get_recent_messages(
uid_list=[uid],
memory_ids=[memory_id],
agent_id="assistant",
session_id="sess_abc",
limit=5
)
for msg in recent:
print(msg["content"])
Source: Implementation in memory/services/messages.py lines 20-38.
Executing Hybrid Searches
Perform combined semantic and lexical queries against the memory store:
from memory.services.query import MsgTextQuery
from common.settings import settings
query_text = "How do I roll back a deployment?"
expr, keywords = MsgTextQuery().question(
txt=query_text,
tbl="messages",
min_match=0.6
)
# Execute via the configured retriever
results = settings.retriever.search(
index_names=[MessageService.index_name(uid)],
memory_ids=[memory_id],
match_expressions=[expr],
top_k=10
)
Source: Expression construction in memory/services/query.py lines 43-84.
Extracting Structured Memories
Transform raw conversations into typed memories using LLM prompts:
from memory.utils.prompt_util import PromptAssembler
conversation = """
User: I fixed the bug yesterday.
Assistant: Great! What was the root cause?
User: The config file had a wrong value.
"""
system_prompt = PromptAssembler.assemble_system_prompt({
"memory_type": ["semantic", "episodic"],
"timestamp_format": "ISO 8601",
"max_items_per_type": 3
})
user_prompt = PromptAssembler.assemble_user_prompt(
conversation=conversation,
conversation_time="2024-10-01T15:00:00Z"
)
# Concatenate prompts and send to your LLM provider
Source: Prompt generation logic in memory/utils/prompt_util.py lines 21-48.
Summary
- RAGFlow treats agent memory as a document store utilizing the same vector-search backend (Elasticsearch, Infinity, OceanBase) as knowledge-base indexing.
- Per-user isolation is enforced through dedicated indices named
memory_<uid>, preventing cross-tenant data access. - Four memory types (RAW, SEMANTIC, EPISODIC, PROCEDURAL) allow differentiated storage and retrieval of conversational data versus extracted knowledge.
- Hybrid retrieval combines
MatchTextExprfor keywords andMatchDenseExprfor semantic similarity, implemented inMsgTextQuery.question(). - FIFO eviction in
MessageService.pick_messages_to_delete_by_fifo()manages storage quotas while preserving structured memories. - Recent message shortcuts via
get_recent_messages()optimize latency for immediate context windows without vector search overhead.
Frequently Asked Questions
How does RAGFlow isolate memory between different users?
RAGFlow physically isolates user data by creating separate indices named memory_<uid> for each user or tenant. The MessageService class in memory/services/messages.py automatically prefixes all index operations with the user ID, ensuring that queries and insertions never cross tenant boundaries. This architecture allows horizontal scaling without risk of data leakage between users.
What types of memories does RAGFlow extract from conversations?
According to common/constants.py, RAGFlow defines four MemoryType categories: RAW (unprocessed chat turns), SEMANTIC (extracted facts and concepts), EPISODIC (event sequences and experiences), and PROCEDURAL (task workflows and methods). The PromptAssembler in memory/utils/prompt_util.py generates LLM prompts that instruct the model to classify and extract content into these specific types, creating a structured knowledge base beyond simple chat logs.
How does RAGFlow handle memory overflow or context limits?
When storage quotas are exceeded, RAGFlow invokes MessageService.pick_messages_to_delete_by_fifo() to enforce a FIFO (First-In-First-Out) eviction policy defined by MemoryStorageType.FIFO. This removes the oldest raw messages first while typically preserving structured memories (semantic, episodic, procedural) that have been extracted from those conversations, ensuring critical knowledge persists even as ephemeral chat history is purged.
Can RAGFlow retrieve memories using both semantic and keyword search?
Yes. The MsgTextQuery.question() method in memory/services/query.py constructs hybrid search expressions that combine MatchTextExpr for lexical matching with MatchDenseExpr for vector similarity. This allows agents to retrieve memories based on exact keyword matches (such as specific error codes) while also capturing semantically related content (such as conceptually similar troubleshooting scenarios), providing comprehensive context recovery regardless of phrasing variations.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →