Langflow Task Backends: How to Run Flow Execution with AnyIO or Celery

Langflow supports two interchangeable task backends for flow execution—AnyIO for local asynchronous processing and Celery for distributed task queues—selected automatically based on the celery_enabled configuration flag.

The langflow-ai/langflow repository implements a pluggable task execution architecture that adapts to both development and production workloads. Understanding the available Langflow task backends allows you to optimize flow execution for scalability, whether running lightweight local tests or managing high-throughput distributed pipelines.

Supported Task Backends for Langflow Flow Execution

Langflow provides two distinct backends for executing flow tasks, each designed for specific operational contexts:

  • AnyIO Backend: An in-process asynchronous task runner built on the AnyIO library. This is the default backend, suitable for development and low-load scenarios where tasks execute within the main Langflow process.

  • Celery Backend: A distributed task queue wrapper that delegates execution to external Celery workers. This backend enables horizontal scaling and is ideal for production environments requiring robust task distribution across multiple nodes.

The system uses a unified interface defined in TaskService, ensuring that application code remains agnostic to which backend is active.

How Langflow Selects the Task Backend

The backend selection logic resides in src/backend/base/langflow/services/task/service.py. During initialization, TaskService inspects the celery_enabled setting from the global configuration to determine which implementation to instantiate:


# src/backend/base/langflow/services/task/service.py

self.use_celery = self.settings_service.settings.celery_enabled

def get_backend(self) -> TaskBackend:
    if self.use_celery:
        return CeleryBackend()
    return AnyIOBackend()

The configuration flag originates from src/lfx/src/lfx/services/settings/base.py, where celery_enabled defaults to false. When set to true, Langflow expects a running Celery worker process and a message broker such as Redis or RabbitMQ.

AnyIO Backend: Local Asynchronous Execution

Implementation Details

The AnyIOBackend class in src/backend/base/langflow/services/task/backends/anyio.py manages flow execution using in-process asynchronous primitives. It maintains a dictionary of AnyIOTaskResult objects and runs tasks within an AnyIO task group, providing cancellation scopes and result retrieval mechanisms without external dependencies.

When to Use AnyIO

Use the AnyIO backend when running Langflow in development environments, executing unit tests, or deploying to low-traffic scenarios where distributed infrastructure is unnecessary. Since tasks run inside the main process, latency is minimal, but long-running tasks will consume the same event loop as the API server.

Running Tasks with AnyIO

from langflow.services.task.service import TaskService
from langflow.services.deps import get_settings_service

# Acquire the SettingsService (already instantiated by Langflow)

settings = get_settings_service()

# Build the TaskService (defaults to AnyIOBackend)

task_service = TaskService(settings)

async def process_data(x: int) -> int:
    return x * 2

# Fire-and-forget execution

task_id = await task_service.fire_and_forget_task(process_data, 5)
print(f"Launched task {task_id} using backend: {task_service.backend_name}")

# Output: Launched task <uuid> using backend: anyio

The returned task_id is a UUID generated locally by the AnyIOBackend instance.

Celery Backend: Distributed Task Processing

Implementation Details

The CeleryBackend in src/backend/base/langflow/services/task/backends/celery.py provides a thin wrapper around a Celery application instance. It submits tasks using the standard .delay() method and returns Celery task IDs for tracking. The backend wraps Celery's AsyncResult objects to provide consistent status checking and revocation capabilities.

When Celery is enabled, Langflow delegates to the worker defined in langflow.worker.celery_app. If the broker connection fails, the backend raises Langflow-specific exceptions including WorkflowServiceUnavailableError or WorkflowResourceError to maintain consistent error handling across both backends.

Configuring Celery

Enable distributed execution by setting the following environment variables or configuration options:


# config.yaml

celery_enabled: true
redis_url: "redis://localhost:6379/0"

Or via shell environment:

export LANGFLOW_CELERY_ENABLED=true
export LANGFLOW_REDIS_URL=redis://localhost:6379/0

After restarting Langflow, TaskService automatically instantiates CeleryBackend:

print(task_service.backend_name)  # Output: celery

Submitting Celery Tasks


# tasks.py - Celery-registered function

from langflow.worker import celery_app

@celery_app.task
def heavy_computation(x: int) -> int:
    import time
    time.sleep(5)  # Simulate intensive processing

    return x ** 3

# Within Langflow service code

task_id = await task_service.fire_and_forget_task(heavy_computation, 4)
print(f"Celery task ID: {task_id}")

# Output: Celery task ID: <celery-task-id>

Unified Task Management Across Backends

Regardless of which backend is active, TaskService exposes a consistent API through three primary methods: fire_and_forget_task, launch_task, and revoke_task. This abstraction allows you to cancel running operations without knowing which backend is handling execution:


# Cancel a running task (works with AnyIO or Celery)

await task_service.revoke_task(task_id)
print("Task cancelled successfully")

The revocation behavior differs internally based on the active backend:

  • AnyIO: Cancels the task's anyio.CancelScope, immediately stopping the coroutine within the local process.
  • Celery: Invokes AsyncResult(task_id).revoke(terminate=True), signaling the external worker process to abort the task.

Summary

  • Langflow provides two task backends for flow execution: AnyIOBackend for local processing and CeleryBackend for distributed queues.
  • The celery_enabled flag in src/lfx/src/lfx/services/settings/base.py controls backend selection via TaskService.get_backend() in src/backend/base/langflow/services/task/service.py.
  • AnyIO is the default backend, running tasks in-process using AnyIO task groups and cancellation scopes.
  • Celery requires external broker configuration (Redis/RabbitMQ) and delegates to langflow.worker.celery_app for scalable execution.
  • Both backends implement the same TaskBackend interface, supporting fire_and_forget_task, launch_task, and revoke_task with identical method signatures.

Frequently Asked Questions

What is the default task backend in Langflow?

The default backend is AnyIO, which executes tasks asynchronously within the main Langflow process. This is determined by the celery_enabled setting defaulting to false in the configuration schema located at src/lfx/src/lfx/services/settings/base.py.

How do I switch from AnyIO to Celery in Langflow?

Set the environment variable LANGFLOW_CELERY_ENABLED=true or add celery_enabled: true to your config.yaml file. You must also configure a message broker URL via LANGFLOW_REDIS_URL or equivalent settings. Upon restart, TaskService will instantiate CeleryBackend instead of AnyIOBackend.

Can I use Redis as the broker for Celery in Langflow?

Yes, Langflow explicitly supports Redis as a broker for Celery. Configure the connection string using the redis_url setting or LANGFLOW_REDIS_URL environment variable (e.g., redis://localhost:6379/0). The Celery backend in src/backend/base/langflow/services/task/backends/celery.py uses this configuration to establish the broker connection.

How does task cancellation work in Langflow?

Task cancellation works through the unified revoke_task(task_id) method available on TaskService. When using AnyIO, this cancels the local AnyIO task scope immediately. When using Celery, this sends a revocation signal to the worker via AsyncResult.revoke(terminate=True), allowing you to abort long-running distributed tasks.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →