How to Monitor System Health with MonitorService in OpenRAG
The OpenRAG MonitorService creates and manages OpenSearch Alerting monitors that watch knowledge-filter queries and trigger webhooks when new documents match, enabling real-time system health monitoring with automated notifications.
OpenRAG provides a robust monitoring infrastructure through its dedicated MonitorService, allowing developers to track knowledge-base changes via OpenSearch Alerting. This service integrates seamlessly with the FastAPI dependency injection system to automate webhook-based alerts whenever monitored queries detect new data. By leveraging MonitorService methods in src/services/monitor_service.py, you can programmatically create, delete, and list monitors that serve as health checks for your RAG system's data ingestion pipelines.
Core Capabilities of MonitorService
The service exposes four primary operations for system health monitoring, all implemented in src/services/monitor_service.py:
- Create a monitor:
create_knowledge_filter_monitorgenerates document-level monitors for knowledge filters, registers webhook destinations, and returns monitor metadata including IDs and URLs. - Delete a monitor:
delete_monitorremoves existing monitors when subscriptions are cancelled, cleaning up OpenSearch Alerting resources. - List user monitors:
list_user_monitorsretrieves all monitors belonging to the authenticated user, filtered automatically by OpenSearch security contexts. - List monitors for a filter:
list_monitors_for_filterreturns monitors attached to specific knowledge filters, useful for debugging subscription states.
How MonitorService Works Under the Hood
The implementation follows an async pattern using the OpenSearch client's low-level transport. When creating a monitor, the service executes five sequential steps:
- Session Management: Receives a per-user OpenSearch client from
SessionManagerwith proper JWT authentication. - Webhook Provisioning:
_get_or_create_webhook_destinationchecks for existing webhook destinations or creates new ones in the OpenSearch notifications plugin. - Query Conversion:
_convert_kf_query_to_monitor_querytransforms knowledge-filter query definitions into valid OpenSearch monitor query DSL. - Alerting API Submission: POSTs the monitor configuration to
/_plugins/_alerting/monitorsusingtransport.perform_request. - Lifecycle Management: Stores returned
_idvalues as monitor identifiers for future deletion or updates viaDELETErequests to/_plugins/_alerting/monitors/{monitor_id}.
Creating Monitors for Knowledge Filters
When a client subscribes to a knowledge filter through the API endpoint in src/api/knowledge_filter.py, the system invokes create_knowledge_filter_monitor to establish health monitoring:
# From src/api/knowledge_filter.py - subscription handler
monitor_result = await monitor_service.create_knowledge_filter_monitor(
filter_id=filter_id,
filter_name=filter_doc["name"],
query_data=filter_doc["query_data"],
user_id=user.user_id,
jwt_token=jwt_token,
notification_config=body.notification_config,
)
The method returns a dictionary containing monitor_id, subscription_id, and webhook_url. If the operation fails, success returns False and the API responds with a 500 error. The generated webhook endpoint (/knowledge-filter/{filter_id}/webhook/{subscription_id}) receives POST requests whenever the underlying OpenSearch query matches new documents.
Deleting Monitors on Subscription Cancellation
Monitor cleanup occurs automatically when users cancel subscriptions. The delete_monitor method issues authenticated DELETE requests to OpenSearch's Alerting API:
# From src/api/knowledge_filter.py - cancellation handler
await monitor_service.delete_monitor(
monitor_id=subscription["monitor_id"],
user_id=user.user_id,
jwt_token=jwt_token,
)
This returns {"success": True} upon successful removal or an error payload if the monitor ID is invalid or permissions are insufficient. Always verify the monitor exists via list_user_monitors before attempting deletion to avoid 404 errors from OpenSearch.
Listing and Debugging Monitors
For administrative interfaces or debugging purposes, the service provides two listing methods. Retrieve all monitors for the current user with:
async def list_my_monitors(user_id: str, jwt: str, monitor_service: MonitorService):
return await monitor_service.list_user_monitors(user_id, jwt)
To inspect monitors attached to a specific knowledge filter, use list_monitors_for_filter, which helps diagnose webhook delivery issues or subscription synchronization problems between OpenRAG and OpenSearch.
Using MonitorService Outside the API Layer
While the service is injected into FastAPI routes via src/dependencies.py, you can instantiate it directly for CLI tools or background workers:
from services.monitor_service import MonitorService
from managers.session_manager import SessionManager
session_manager = SessionManager("my-secret")
monitor = MonitorService(session_manager)
# Create a monitor programmatically
result = await monitor.create_knowledge_filter_monitor(
filter_id="123",
filter_name="Demo Filter",
query_data={"query": {"match_all": {}}},
user_id="user-42",
jwt_token="jwt-token-here",
)
print(result) # {"success": True, "monitor_id": "...", "subscription_id": "...", "webhook_url": "..."}
This pattern is useful for batch operations or testing monitor configurations without invoking the full HTTP API stack defined in src/api/knowledge_filter.py.
Integration Points
The service integrates with the broader OpenRAG architecture through:
- Dependency Injection: Registered in
src/dependencies.pyandsrc/main.py(lines 21-23) for FastAPI route accessibility - Knowledge Filter API: Consumed by endpoints in
src/api/knowledge_filter.pyhandling subscription lifecycle events - OpenSearch Security: Leverages JWT tokens and user contexts from
SessionManagerto enforce multi-tenant isolation of monitoring resources
Summary
- MonitorService in OpenRAG manages OpenSearch Alerting monitors through
src/services/monitor_service.py, providing create, delete, and list operations. - Health monitoring works by converting knowledge-filter queries into OpenSearch monitors that trigger webhooks when matching documents appear.
- Create monitors using
create_knowledge_filter_monitorwhich provisions webhook destinations at/knowledge-filter/{filter_id}/webhook/{subscription_id}. - Delete monitors via
delete_monitorwhen cleaning up cancelled subscriptions to prevent orphaned alerting rules. - List operations include
list_user_monitorsfor user-specific views andlist_monitors_for_filterfor debugging specific knowledge filters. - Direct instantiation requires
SessionManagerinjection and supports async usage outside the FastAPI context.
Frequently Asked Questions
How does MonitorService authenticate with OpenSearch?
MonitorService receives a per-user OpenSearch client from SessionManager initialized with JWT tokens. All API calls to /_plugins/_alerting/monitors include these credentials via the low-level transport's perform_request method, ensuring users can only access their own monitors through OpenSearch's security context filtering.
What webhook URL format does OpenRAG generate for monitor alerts?
The service generates unique webhook endpoints following the pattern /knowledge-filter/{filter_id}/webhook/{subscription_id}. These routes are registered as destinations in the OpenSearch notifications plugin via _get_or_create_webhook_destination, allowing external systems to receive real-time POST requests when monitored knowledge filters detect new matching documents.
Can I use MonitorService without the FastAPI dependency injection system?
Yes. While src/dependencies.py injects the service into API routes, you can instantiate it directly by passing a SessionManager instance to the constructor. This enables usage in CLI scripts, background Celery tasks, or testing environments without bootstrapping the full FastAPI application from src/main.py.
What happens if a monitor creation fails in OpenSearch?
If the POST request to /_plugins/_alerting/monitors fails or returns an error, create_knowledge_filter_monitor returns a dictionary with "success": False and error details. The Knowledge Filter API endpoint in src/api/knowledge_filter.py interprets this as a 500 Internal Server Error, preventing the subscription from being created in an inconsistent state.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →