How to Monitor System Health with MonitorService in OpenRAG

The OpenRAG MonitorService creates and manages OpenSearch Alerting monitors that watch knowledge-filter queries and trigger webhooks when new documents match, enabling real-time system health monitoring with automated notifications.

OpenRAG provides a robust monitoring infrastructure through its dedicated MonitorService, allowing developers to track knowledge-base changes via OpenSearch Alerting. This service integrates seamlessly with the FastAPI dependency injection system to automate webhook-based alerts whenever monitored queries detect new data. By leveraging MonitorService methods in src/services/monitor_service.py, you can programmatically create, delete, and list monitors that serve as health checks for your RAG system's data ingestion pipelines.

Core Capabilities of MonitorService

The service exposes four primary operations for system health monitoring, all implemented in src/services/monitor_service.py:

  • Create a monitor: create_knowledge_filter_monitor generates document-level monitors for knowledge filters, registers webhook destinations, and returns monitor metadata including IDs and URLs.
  • Delete a monitor: delete_monitor removes existing monitors when subscriptions are cancelled, cleaning up OpenSearch Alerting resources.
  • List user monitors: list_user_monitors retrieves all monitors belonging to the authenticated user, filtered automatically by OpenSearch security contexts.
  • List monitors for a filter: list_monitors_for_filter returns monitors attached to specific knowledge filters, useful for debugging subscription states.

How MonitorService Works Under the Hood

The implementation follows an async pattern using the OpenSearch client's low-level transport. When creating a monitor, the service executes five sequential steps:

  1. Session Management: Receives a per-user OpenSearch client from SessionManager with proper JWT authentication.
  2. Webhook Provisioning: _get_or_create_webhook_destination checks for existing webhook destinations or creates new ones in the OpenSearch notifications plugin.
  3. Query Conversion: _convert_kf_query_to_monitor_query transforms knowledge-filter query definitions into valid OpenSearch monitor query DSL.
  4. Alerting API Submission: POSTs the monitor configuration to /_plugins/_alerting/monitors using transport.perform_request.
  5. Lifecycle Management: Stores returned _id values as monitor identifiers for future deletion or updates via DELETE requests to /_plugins/_alerting/monitors/{monitor_id}.

Creating Monitors for Knowledge Filters

When a client subscribes to a knowledge filter through the API endpoint in src/api/knowledge_filter.py, the system invokes create_knowledge_filter_monitor to establish health monitoring:


# From src/api/knowledge_filter.py - subscription handler

monitor_result = await monitor_service.create_knowledge_filter_monitor(
    filter_id=filter_id,
    filter_name=filter_doc["name"],
    query_data=filter_doc["query_data"],
    user_id=user.user_id,
    jwt_token=jwt_token,
    notification_config=body.notification_config,
)

The method returns a dictionary containing monitor_id, subscription_id, and webhook_url. If the operation fails, success returns False and the API responds with a 500 error. The generated webhook endpoint (/knowledge-filter/{filter_id}/webhook/{subscription_id}) receives POST requests whenever the underlying OpenSearch query matches new documents.

Deleting Monitors on Subscription Cancellation

Monitor cleanup occurs automatically when users cancel subscriptions. The delete_monitor method issues authenticated DELETE requests to OpenSearch's Alerting API:


# From src/api/knowledge_filter.py - cancellation handler

await monitor_service.delete_monitor(
    monitor_id=subscription["monitor_id"],
    user_id=user.user_id,
    jwt_token=jwt_token,
)

This returns {"success": True} upon successful removal or an error payload if the monitor ID is invalid or permissions are insufficient. Always verify the monitor exists via list_user_monitors before attempting deletion to avoid 404 errors from OpenSearch.

Listing and Debugging Monitors

For administrative interfaces or debugging purposes, the service provides two listing methods. Retrieve all monitors for the current user with:

async def list_my_monitors(user_id: str, jwt: str, monitor_service: MonitorService):
    return await monitor_service.list_user_monitors(user_id, jwt)

To inspect monitors attached to a specific knowledge filter, use list_monitors_for_filter, which helps diagnose webhook delivery issues or subscription synchronization problems between OpenRAG and OpenSearch.

Using MonitorService Outside the API Layer

While the service is injected into FastAPI routes via src/dependencies.py, you can instantiate it directly for CLI tools or background workers:

from services.monitor_service import MonitorService
from managers.session_manager import SessionManager

session_manager = SessionManager("my-secret")
monitor = MonitorService(session_manager)

# Create a monitor programmatically

result = await monitor.create_knowledge_filter_monitor(
    filter_id="123",
    filter_name="Demo Filter",
    query_data={"query": {"match_all": {}}},
    user_id="user-42",
    jwt_token="jwt-token-here",
)
print(result)  # {"success": True, "monitor_id": "...", "subscription_id": "...", "webhook_url": "..."}

This pattern is useful for batch operations or testing monitor configurations without invoking the full HTTP API stack defined in src/api/knowledge_filter.py.

Integration Points

The service integrates with the broader OpenRAG architecture through:

  • Dependency Injection: Registered in src/dependencies.py and src/main.py (lines 21-23) for FastAPI route accessibility
  • Knowledge Filter API: Consumed by endpoints in src/api/knowledge_filter.py handling subscription lifecycle events
  • OpenSearch Security: Leverages JWT tokens and user contexts from SessionManager to enforce multi-tenant isolation of monitoring resources

Summary

  • MonitorService in OpenRAG manages OpenSearch Alerting monitors through src/services/monitor_service.py, providing create, delete, and list operations.
  • Health monitoring works by converting knowledge-filter queries into OpenSearch monitors that trigger webhooks when matching documents appear.
  • Create monitors using create_knowledge_filter_monitor which provisions webhook destinations at /knowledge-filter/{filter_id}/webhook/{subscription_id}.
  • Delete monitors via delete_monitor when cleaning up cancelled subscriptions to prevent orphaned alerting rules.
  • List operations include list_user_monitors for user-specific views and list_monitors_for_filter for debugging specific knowledge filters.
  • Direct instantiation requires SessionManager injection and supports async usage outside the FastAPI context.

Frequently Asked Questions

How does MonitorService authenticate with OpenSearch?

MonitorService receives a per-user OpenSearch client from SessionManager initialized with JWT tokens. All API calls to /_plugins/_alerting/monitors include these credentials via the low-level transport's perform_request method, ensuring users can only access their own monitors through OpenSearch's security context filtering.

What webhook URL format does OpenRAG generate for monitor alerts?

The service generates unique webhook endpoints following the pattern /knowledge-filter/{filter_id}/webhook/{subscription_id}. These routes are registered as destinations in the OpenSearch notifications plugin via _get_or_create_webhook_destination, allowing external systems to receive real-time POST requests when monitored knowledge filters detect new matching documents.

Can I use MonitorService without the FastAPI dependency injection system?

Yes. While src/dependencies.py injects the service into API routes, you can instantiate it directly by passing a SessionManager instance to the constructor. This enables usage in CLI scripts, background Celery tasks, or testing environments without bootstrapping the full FastAPI application from src/main.py.

What happens if a monitor creation fails in OpenSearch?

If the POST request to /_plugins/_alerting/monitors fails or returns an error, create_knowledge_filter_monitor returns a dictionary with "success": False and error details. The Knowledge Filter API endpoint in src/api/knowledge_filter.py interprets this as a 500 Internal Server Error, preventing the subscription from being created in an inconsistent state.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →