# How Langflow Integrates with OpenTelemetry for Telemetry Data Collection

> Discover how Langflow integrates with OpenTelemetry for seamless telemetry data collection. Learn about its thread-safe singleton and async TelemetryService for metric instrumentation and payload sending.

- Repository: [Langflow/langflow](https://github.com/langflow-ai/langflow)
- Tags: how-to-guide
- Published: 2026-02-24

---

**Langflow integrates with OpenTelemetry through a thread-safe singleton class that manages metric instruments and validates label schemas, coupled with an asynchronous `TelemetryService` that sends usage payloads and optionally exposes a Prometheus scraping endpoint.**

Langflow leverages OpenTelemetry as its vendor-agnostic telemetry backbone to collect metrics, traces, and logs throughout the Python backend. The integration centers on two core components: the `OpenTelemetry` singleton defined in [`src/backend/base/langflow/services/telemetry/opentelemetry.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/telemetry/opentelemetry.py) and the `TelemetryService` orchestrator implemented in [`src/backend/base/langflow/services/telemetry/service.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/telemetry/service.py).

## Core Architecture Components

The telemetry subsystem splits responsibilities between metric management and payload orchestration:

- **`OpenTelemetry` singleton** – Defines the metric registry, creates the OpenTelemetry **Meter**, and exposes strongly-typed helper methods like `increment_counter()` and `update_gauge()`.
- **`TelemetryService`** – Instantiates the singleton at startup, manages asynchronous sending of usage payloads (run events, component usage, exceptions), and wires metrics into the Langflow backend.

## Thread-Safe Singleton Meter Provider

The `OpenTelemetry` class implements a **thread-safe singleton** using `ThreadSafeSingletonMetaUsingWeakref`, ensuring a single `MeterProvider` per process even under heavy multithreading:

```python
class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref):
    _initialized: bool = False
    prometheus_enabled: bool = True

```

During construction (`__init__`), the class checks whether a provider already exists. If not, it initializes a new `MeterProvider` with an optional **PrometheusMetricReader** when `prometheus_enabled=True` according to the source code in [`src/backend/base/langflow/services/telemetry/opentelemetry.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/telemetry/opentelemetry.py). This design guarantees that all metrics share the same provider and export pipeline.

## Metric Registration and Validation

Metrics are declared in `_register_metric()` and stored in an internal `_metrics_registry`. Each definition specifies the instrument type, unit, description, and label schema:

```python
self._add_metric(
    name="file_uploads",
    description="The uploaded file size in bytes",
    unit="bytes",
    metric_type=MetricType.OBSERVABLE_GAUGE,
    labels={"flow_id": mandatory_label},
)

```

The `MetricType` enumeration supports **COUNTER**, **OBSERVABLE_GAUGE**, **HISTOGRAM**, and **UP_DOWN_COUNTER** instruments as defined in the source. Each metric’s label schema is validated at runtime via `validate_labels()`, ensuring mandatory dimensions (e.g., `flow_id`) are present before recording values.

## Lazy Instrument Creation

OpenTelemetry instruments are created lazily through `_create_metric()` when first accessed. The factory method uses the singleton’s `meter` to instantiate the appropriate instrument:

```python
if metric.type == MetricType.COUNTER:
    return self.meter.create_counter(...)
elif metric.type == MetricType.OBSERVABLE_GAUGE:
    return ObservableGaugeWrapper(...)

```

For observable gauges, the `ObservableGaugeWrapper` implements the OpenTelemetry callback pattern, returning an `Observation` list via its internal `_callback` method.

## Recording Telemetry Data

High-level helper methods abstract the OpenTelemetry API from the rest of the codebase:

```python
def increment_counter(self, metric_name, labels, value=1.0):
    self.validate_labels(metric_name, labels)
    self._metrics[metric_name].add(value, labels)

```

Equivalent methods exist for **up-down counters**, **gauges**, and **histograms**. These helpers enforce type safety and label validation, raising descriptive errors if the metric name is unknown or the instrument type mismatches the operation. The unit test suite in [`src/backend/tests/unit/test_telemetry.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/tests/unit/test_telemetry.py) verifies this validation logic and singleton behavior.

## Backend Integration and Prometheus Export

`TelemetryService` creates the `OpenTelemetry` singleton at startup, passing the Prometheus enable flag from user settings:

```python
self.ot = OpenTelemetry(prometheus_enabled=settings_service.settings.prometheus_enabled)

```

The service maintains a **background worker** that asynchronously pushes usage payloads—such as run events, component interactions, and exceptions—to remote telemetry endpoints. When `prometheus_enabled` is true, the `MeterProvider` includes a `PrometheusMetricReader`, automatically serving the `/metrics` endpoint for standard Prometheus scrapes without additional code.

## Practical Implementation Examples

### Recording a Custom Metric

Obtain the singleton anywhere in the backend to emit metrics:

```python
from langflow.services.telemetry.opentelemetry import OpenTelemetry

# Reuses the instance created by TelemetryService

otel = OpenTelemetry()

# Increment a counter for API calls per flow

otel.increment_counter(
    metric_name="api_calls",
    labels={"flow_id": "12345"},
    value=1,
)

# Update an observable gauge for queue length

otel.update_gauge(
    metric_name="queue_length",
    value=7,
    labels={"flow_id": "12345"},
)

```

### Adding a New Metric to the Registry

Declare the metric in `OpenTelemetry._register_metric()`:

```python
self._add_metric(
    name="api_latency",
    description="Latency of API calls in milliseconds",
    unit="ms",
    metric_type=MetricType.HISTOGRAM,
    labels={"flow_id": mandatory_label, "endpoint": mandatory_label},
)

```

Then record observations elsewhere:

```python
otel.observe_histogram(
    metric_name="api_latency",
    value=120.5,
    labels={"flow_id": "12345", "endpoint": "/run"},
)

```

The metric appears on the Prometheus `/metrics` endpoint after the next scrape.

### Logging Component Usage via TelemetryService

```python
from langflow.services.telemetry.service import TelemetryService
from langflow.services.telemetry.schema import ComponentPayload

telemetry = TelemetryService(settings_service)

payload = ComponentPayload(
    component_name="OpenAIChat",
    flow_id="12345",
    user_id="user_abc",
)

# Queued asynchronously (non-blocking)

await telemetry.log_package_component(payload)

```

## Summary

- **Singleton Pattern**: The `OpenTelemetry` class uses `ThreadSafeSingletonMetaUsingWeakref` to ensure a single thread-safe `MeterProvider` across the application.
- **Type-Safe Metrics**: The system supports **Counter**, **Observable Gauge**, **Histogram**, and **Up-Down Counter** instruments with runtime label validation.
- **Prometheus Integration**: Setting `prometheus_enabled=True` automatically configures a `PrometheusMetricReader` and exposes the `/metrics` endpoint.
- **Async Payloads**: `TelemetryService` manages background workers for non-blocking transmission of usage events while sharing the same metric instance.
- **Extensibility**: New metrics are registered via `_add_metric()` and recorded through strongly-typed helpers like `increment_counter()` and `observe_histogram()`.

## Frequently Asked Questions

### How does Langflow ensure thread safety for OpenTelemetry metrics?

Langflow implements the `OpenTelemetry` class as a **thread-safe singleton** using `ThreadSafeSingletonMetaUsingWeakref`. This metaclass ensures that only one instance—and therefore one `MeterProvider`—exists per process, preventing race conditions when multiple threads record metrics concurrently.

### What metric types does Langflow support through OpenTelemetry?

According to the source code in [`src/backend/base/langflow/services/telemetry/opentelemetry.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/telemetry/opentelemetry.py), Langflow supports four instrument types defined in the `MetricType` enum: **COUNTER** for monotonic sums, **OBSERVABLE_GAUGE** for point-in-time values, **HISTOGRAM** for distribution recording, and **UP_DOWN_COUNTER** for non-monotonic sums.

### How do I add a custom metric to Langflow's telemetry system?

To add a custom metric, modify the `_register_metric()` method in [`opentelemetry.py`](https://github.com/langflow-ai/langflow/blob/main/opentelemetry.py) to call `self._add_metric()` with the metric name, description, unit, `metric_type`, and label schema. Then use the appropriate helper method—such as `increment_counter()` or `observe_histogram()`—anywhere in the backend to record values with validated labels.

### Does Langflow support Prometheus scraping out of the box?

Yes. When the `prometheus_enabled` flag is set to `True` in settings, the `OpenTelemetry` singleton includes a `PrometheusMetricReader` in its `MeterProvider`. This automatically exposes a `/metrics` endpoint compatible with standard Prometheus scraping protocols, requiring no additional instrumentation code.