# How to Monitor System Health with MonitorService in OpenRAG

> Learn how to monitor system health with MonitorService in OpenRAG. Automate notifications by triggering webhooks for new document matches in your knowledge-filter queries.

- Repository: [Langflow/openrag](https://github.com/langflow-ai/openrag)
- Tags: how-to-guide
- Published: 2026-03-13

---

**The OpenRAG MonitorService creates and manages OpenSearch Alerting monitors that watch knowledge-filter queries and trigger webhooks when new documents match, enabling real-time system health monitoring with automated notifications.**

OpenRAG provides a robust monitoring infrastructure through its dedicated MonitorService, allowing developers to track knowledge-base changes via OpenSearch Alerting. This service integrates seamlessly with the FastAPI dependency injection system to automate webhook-based alerts whenever monitored queries detect new data. By leveraging `MonitorService` methods in [`src/services/monitor_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/monitor_service.py), you can programmatically create, delete, and list monitors that serve as health checks for your RAG system's data ingestion pipelines.

## Core Capabilities of MonitorService

The service exposes four primary operations for system health monitoring, all implemented in [`src/services/monitor_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/monitor_service.py):

- **Create a monitor**: `create_knowledge_filter_monitor` generates document-level monitors for knowledge filters, registers webhook destinations, and returns monitor metadata including IDs and URLs.
- **Delete a monitor**: `delete_monitor` removes existing monitors when subscriptions are cancelled, cleaning up OpenSearch Alerting resources.
- **List user monitors**: `list_user_monitors` retrieves all monitors belonging to the authenticated user, filtered automatically by OpenSearch security contexts.
- **List monitors for a filter**: `list_monitors_for_filter` returns monitors attached to specific knowledge filters, useful for debugging subscription states.

## How MonitorService Works Under the Hood

The implementation follows an async pattern using the OpenSearch client's low-level transport. When creating a monitor, the service executes five sequential steps:

1. **Session Management**: Receives a per-user OpenSearch client from `SessionManager` with proper JWT authentication.
2. **Webhook Provisioning**: `_get_or_create_webhook_destination` checks for existing webhook destinations or creates new ones in the OpenSearch notifications plugin.
3. **Query Conversion**: `_convert_kf_query_to_monitor_query` transforms knowledge-filter query definitions into valid OpenSearch monitor query DSL.
4. **Alerting API Submission**: POSTs the monitor configuration to `/_plugins/_alerting/monitors` using `transport.perform_request`.
5. **Lifecycle Management**: Stores returned `_id` values as monitor identifiers for future deletion or updates via `DELETE` requests to `/_plugins/_alerting/monitors/{monitor_id}`.

## Creating Monitors for Knowledge Filters

When a client subscribes to a knowledge filter through the API endpoint in [`src/api/knowledge_filter.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/knowledge_filter.py), the system invokes `create_knowledge_filter_monitor` to establish health monitoring:

```python

# From src/api/knowledge_filter.py - subscription handler

monitor_result = await monitor_service.create_knowledge_filter_monitor(
    filter_id=filter_id,
    filter_name=filter_doc["name"],
    query_data=filter_doc["query_data"],
    user_id=user.user_id,
    jwt_token=jwt_token,
    notification_config=body.notification_config,
)

```

The method returns a dictionary containing `monitor_id`, `subscription_id`, and `webhook_url`. If the operation fails, `success` returns `False` and the API responds with a 500 error. The generated webhook endpoint (`/knowledge-filter/{filter_id}/webhook/{subscription_id}`) receives POST requests whenever the underlying OpenSearch query matches new documents.

## Deleting Monitors on Subscription Cancellation

Monitor cleanup occurs automatically when users cancel subscriptions. The `delete_monitor` method issues authenticated DELETE requests to OpenSearch's Alerting API:

```python

# From src/api/knowledge_filter.py - cancellation handler

await monitor_service.delete_monitor(
    monitor_id=subscription["monitor_id"],
    user_id=user.user_id,
    jwt_token=jwt_token,
)

```

This returns `{"success": True}` upon successful removal or an error payload if the monitor ID is invalid or permissions are insufficient. Always verify the monitor exists via `list_user_monitors` before attempting deletion to avoid 404 errors from OpenSearch.

## Listing and Debugging Monitors

For administrative interfaces or debugging purposes, the service provides two listing methods. Retrieve all monitors for the current user with:

```python
async def list_my_monitors(user_id: str, jwt: str, monitor_service: MonitorService):
    return await monitor_service.list_user_monitors(user_id, jwt)

```

To inspect monitors attached to a specific knowledge filter, use `list_monitors_for_filter`, which helps diagnose webhook delivery issues or subscription synchronization problems between OpenRAG and OpenSearch.

## Using MonitorService Outside the API Layer

While the service is injected into FastAPI routes via [`src/dependencies.py`](https://github.com/langflow-ai/openrag/blob/main/src/dependencies.py), you can instantiate it directly for CLI tools or background workers:

```python
from services.monitor_service import MonitorService
from managers.session_manager import SessionManager

session_manager = SessionManager("my-secret")
monitor = MonitorService(session_manager)

# Create a monitor programmatically

result = await monitor.create_knowledge_filter_monitor(
    filter_id="123",
    filter_name="Demo Filter",
    query_data={"query": {"match_all": {}}},
    user_id="user-42",
    jwt_token="jwt-token-here",
)
print(result)  # {"success": True, "monitor_id": "...", "subscription_id": "...", "webhook_url": "..."}

```

This pattern is useful for batch operations or testing monitor configurations without invoking the full HTTP API stack defined in [`src/api/knowledge_filter.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/knowledge_filter.py).

## Integration Points

The service integrates with the broader OpenRAG architecture through:

- **Dependency Injection**: Registered in [`src/dependencies.py`](https://github.com/langflow-ai/openrag/blob/main/src/dependencies.py) and [`src/main.py`](https://github.com/langflow-ai/openrag/blob/main/src/main.py) (lines 21-23) for FastAPI route accessibility
- **Knowledge Filter API**: Consumed by endpoints in [`src/api/knowledge_filter.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/knowledge_filter.py) handling subscription lifecycle events
- **OpenSearch Security**: Leverages JWT tokens and user contexts from `SessionManager` to enforce multi-tenant isolation of monitoring resources

## Summary

- **MonitorService** in OpenRAG manages OpenSearch Alerting monitors through [`src/services/monitor_service.py`](https://github.com/langflow-ai/openrag/blob/main/src/services/monitor_service.py), providing create, delete, and list operations.
- **Health monitoring** works by converting knowledge-filter queries into OpenSearch monitors that trigger webhooks when matching documents appear.
- **Create monitors** using `create_knowledge_filter_monitor` which provisions webhook destinations at `/knowledge-filter/{filter_id}/webhook/{subscription_id}`.
- **Delete monitors** via `delete_monitor` when cleaning up cancelled subscriptions to prevent orphaned alerting rules.
- **List operations** include `list_user_monitors` for user-specific views and `list_monitors_for_filter` for debugging specific knowledge filters.
- **Direct instantiation** requires `SessionManager` injection and supports async usage outside the FastAPI context.

## Frequently Asked Questions

### How does MonitorService authenticate with OpenSearch?

MonitorService receives a per-user OpenSearch client from `SessionManager` initialized with JWT tokens. All API calls to `/_plugins/_alerting/monitors` include these credentials via the low-level transport's `perform_request` method, ensuring users can only access their own monitors through OpenSearch's security context filtering.

### What webhook URL format does OpenRAG generate for monitor alerts?

The service generates unique webhook endpoints following the pattern `/knowledge-filter/{filter_id}/webhook/{subscription_id}`. These routes are registered as destinations in the OpenSearch notifications plugin via `_get_or_create_webhook_destination`, allowing external systems to receive real-time POST requests when monitored knowledge filters detect new matching documents.

### Can I use MonitorService without the FastAPI dependency injection system?

Yes. While [`src/dependencies.py`](https://github.com/langflow-ai/openrag/blob/main/src/dependencies.py) injects the service into API routes, you can instantiate it directly by passing a `SessionManager` instance to the constructor. This enables usage in CLI scripts, background Celery tasks, or testing environments without bootstrapping the full FastAPI application from [`src/main.py`](https://github.com/langflow-ai/openrag/blob/main/src/main.py).

### What happens if a monitor creation fails in OpenSearch?

If the POST request to `/_plugins/_alerting/monitors` fails or returns an error, `create_knowledge_filter_monitor` returns a dictionary with `"success": False` and error details. The Knowledge Filter API endpoint in [`src/api/knowledge_filter.py`](https://github.com/langflow-ai/openrag/blob/main/src/api/knowledge_filter.py) interprets this as a 500 Internal Server Error, preventing the subscription from being created in an inconsistent state.