How Langflow Integrates with OpenTelemetry for Telemetry Data Collection

Langflow integrates with OpenTelemetry through a thread-safe singleton class that manages metric instruments and validates label schemas, coupled with an asynchronous TelemetryService that sends usage payloads and optionally exposes a Prometheus scraping endpoint.

Langflow leverages OpenTelemetry as its vendor-agnostic telemetry backbone to collect metrics, traces, and logs throughout the Python backend. The integration centers on two core components: the OpenTelemetry singleton defined in src/backend/base/langflow/services/telemetry/opentelemetry.py and the TelemetryService orchestrator implemented in src/backend/base/langflow/services/telemetry/service.py.

Core Architecture Components

The telemetry subsystem splits responsibilities between metric management and payload orchestration:

  • OpenTelemetry singleton – Defines the metric registry, creates the OpenTelemetry Meter, and exposes strongly-typed helper methods like increment_counter() and update_gauge().
  • TelemetryService – Instantiates the singleton at startup, manages asynchronous sending of usage payloads (run events, component usage, exceptions), and wires metrics into the Langflow backend.

Thread-Safe Singleton Meter Provider

The OpenTelemetry class implements a thread-safe singleton using ThreadSafeSingletonMetaUsingWeakref, ensuring a single MeterProvider per process even under heavy multithreading:

class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref):
    _initialized: bool = False
    prometheus_enabled: bool = True

During construction (__init__), the class checks whether a provider already exists. If not, it initializes a new MeterProvider with an optional PrometheusMetricReader when prometheus_enabled=True according to the source code in src/backend/base/langflow/services/telemetry/opentelemetry.py. This design guarantees that all metrics share the same provider and export pipeline.

Metric Registration and Validation

Metrics are declared in _register_metric() and stored in an internal _metrics_registry. Each definition specifies the instrument type, unit, description, and label schema:

self._add_metric(
    name="file_uploads",
    description="The uploaded file size in bytes",
    unit="bytes",
    metric_type=MetricType.OBSERVABLE_GAUGE,
    labels={"flow_id": mandatory_label},
)

The MetricType enumeration supports COUNTER, OBSERVABLE_GAUGE, HISTOGRAM, and UP_DOWN_COUNTER instruments as defined in the source. Each metric’s label schema is validated at runtime via validate_labels(), ensuring mandatory dimensions (e.g., flow_id) are present before recording values.

Lazy Instrument Creation

OpenTelemetry instruments are created lazily through _create_metric() when first accessed. The factory method uses the singleton’s meter to instantiate the appropriate instrument:

if metric.type == MetricType.COUNTER:
    return self.meter.create_counter(...)
elif metric.type == MetricType.OBSERVABLE_GAUGE:
    return ObservableGaugeWrapper(...)

For observable gauges, the ObservableGaugeWrapper implements the OpenTelemetry callback pattern, returning an Observation list via its internal _callback method.

Recording Telemetry Data

High-level helper methods abstract the OpenTelemetry API from the rest of the codebase:

def increment_counter(self, metric_name, labels, value=1.0):
    self.validate_labels(metric_name, labels)
    self._metrics[metric_name].add(value, labels)

Equivalent methods exist for up-down counters, gauges, and histograms. These helpers enforce type safety and label validation, raising descriptive errors if the metric name is unknown or the instrument type mismatches the operation. The unit test suite in src/backend/tests/unit/test_telemetry.py verifies this validation logic and singleton behavior.

Backend Integration and Prometheus Export

TelemetryService creates the OpenTelemetry singleton at startup, passing the Prometheus enable flag from user settings:

self.ot = OpenTelemetry(prometheus_enabled=settings_service.settings.prometheus_enabled)

The service maintains a background worker that asynchronously pushes usage payloads—such as run events, component interactions, and exceptions—to remote telemetry endpoints. When prometheus_enabled is true, the MeterProvider includes a PrometheusMetricReader, automatically serving the /metrics endpoint for standard Prometheus scrapes without additional code.

Practical Implementation Examples

Recording a Custom Metric

Obtain the singleton anywhere in the backend to emit metrics:

from langflow.services.telemetry.opentelemetry import OpenTelemetry

# Reuses the instance created by TelemetryService

otel = OpenTelemetry()

# Increment a counter for API calls per flow

otel.increment_counter(
    metric_name="api_calls",
    labels={"flow_id": "12345"},
    value=1,
)

# Update an observable gauge for queue length

otel.update_gauge(
    metric_name="queue_length",
    value=7,
    labels={"flow_id": "12345"},
)

Adding a New Metric to the Registry

Declare the metric in OpenTelemetry._register_metric():

self._add_metric(
    name="api_latency",
    description="Latency of API calls in milliseconds",
    unit="ms",
    metric_type=MetricType.HISTOGRAM,
    labels={"flow_id": mandatory_label, "endpoint": mandatory_label},
)

Then record observations elsewhere:

otel.observe_histogram(
    metric_name="api_latency",
    value=120.5,
    labels={"flow_id": "12345", "endpoint": "/run"},
)

The metric appears on the Prometheus /metrics endpoint after the next scrape.

Logging Component Usage via TelemetryService

from langflow.services.telemetry.service import TelemetryService
from langflow.services.telemetry.schema import ComponentPayload

telemetry = TelemetryService(settings_service)

payload = ComponentPayload(
    component_name="OpenAIChat",
    flow_id="12345",
    user_id="user_abc",
)

# Queued asynchronously (non-blocking)

await telemetry.log_package_component(payload)

Summary

  • Singleton Pattern: The OpenTelemetry class uses ThreadSafeSingletonMetaUsingWeakref to ensure a single thread-safe MeterProvider across the application.
  • Type-Safe Metrics: The system supports Counter, Observable Gauge, Histogram, and Up-Down Counter instruments with runtime label validation.
  • Prometheus Integration: Setting prometheus_enabled=True automatically configures a PrometheusMetricReader and exposes the /metrics endpoint.
  • Async Payloads: TelemetryService manages background workers for non-blocking transmission of usage events while sharing the same metric instance.
  • Extensibility: New metrics are registered via _add_metric() and recorded through strongly-typed helpers like increment_counter() and observe_histogram().

Frequently Asked Questions

How does Langflow ensure thread safety for OpenTelemetry metrics?

Langflow implements the OpenTelemetry class as a thread-safe singleton using ThreadSafeSingletonMetaUsingWeakref. This metaclass ensures that only one instance—and therefore one MeterProvider—exists per process, preventing race conditions when multiple threads record metrics concurrently.

What metric types does Langflow support through OpenTelemetry?

According to the source code in src/backend/base/langflow/services/telemetry/opentelemetry.py, Langflow supports four instrument types defined in the MetricType enum: COUNTER for monotonic sums, OBSERVABLE_GAUGE for point-in-time values, HISTOGRAM for distribution recording, and UP_DOWN_COUNTER for non-monotonic sums.

How do I add a custom metric to Langflow's telemetry system?

To add a custom metric, modify the _register_metric() method in opentelemetry.py to call self._add_metric() with the metric name, description, unit, metric_type, and label schema. Then use the appropriate helper method—such as increment_counter() or observe_histogram()—anywhere in the backend to record values with validated labels.

Does Langflow support Prometheus scraping out of the box?

Yes. When the prometheus_enabled flag is set to True in settings, the OpenTelemetry singleton includes a PrometheusMetricReader in its MeterProvider. This automatically exposes a /metrics endpoint compatible with standard Prometheus scraping protocols, requiring no additional instrumentation code.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →