# Langflow Task Backends: How to Run Flow Execution with AnyIO or Celery

> Explore Langflow task backends for flow execution. Learn how to leverage AnyIO for local async processing or Celery for distributed queues with simple configuration.

- Repository: [Langflow/langflow](https://github.com/langflow-ai/langflow)
- Tags: how-to-guide
- Published: 2026-02-24

---

**Langflow supports two interchangeable task backends for flow execution—AnyIO for local asynchronous processing and Celery for distributed task queues—selected automatically based on the `celery_enabled` configuration flag.**

The `langflow-ai/langflow` repository implements a pluggable task execution architecture that adapts to both development and production workloads. Understanding the available **Langflow task backends** allows you to optimize flow execution for scalability, whether running lightweight local tests or managing high-throughput distributed pipelines.

## Supported Task Backends for Langflow Flow Execution

Langflow provides two distinct backends for executing flow tasks, each designed for specific operational contexts:

- **AnyIO Backend**: An in-process asynchronous task runner built on the AnyIO library. This is the default backend, suitable for development and low-load scenarios where tasks execute within the main Langflow process.

- **Celery Backend**: A distributed task queue wrapper that delegates execution to external Celery workers. This backend enables horizontal scaling and is ideal for production environments requiring robust task distribution across multiple nodes.

The system uses a unified interface defined in `TaskService`, ensuring that application code remains agnostic to which backend is active.

## How Langflow Selects the Task Backend

The backend selection logic resides in [`src/backend/base/langflow/services/task/service.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/task/service.py). During initialization, `TaskService` inspects the `celery_enabled` setting from the global configuration to determine which implementation to instantiate:

```python

# src/backend/base/langflow/services/task/service.py

self.use_celery = self.settings_service.settings.celery_enabled

def get_backend(self) -> TaskBackend:
    if self.use_celery:
        return CeleryBackend()
    return AnyIOBackend()

```

The configuration flag originates from [`src/lfx/src/lfx/services/settings/base.py`](https://github.com/langflow-ai/langflow/blob/main/src/lfx/src/lfx/services/settings/base.py), where `celery_enabled` defaults to `false`. When set to `true`, Langflow expects a running Celery worker process and a message broker such as Redis or RabbitMQ.

## AnyIO Backend: Local Asynchronous Execution

### Implementation Details

The `AnyIOBackend` class in [`src/backend/base/langflow/services/task/backends/anyio.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/task/backends/anyio.py) manages flow execution using in-process asynchronous primitives. It maintains a dictionary of `AnyIOTaskResult` objects and runs tasks within an AnyIO task group, providing cancellation scopes and result retrieval mechanisms without external dependencies.

### When to Use AnyIO

Use the AnyIO backend when running Langflow in development environments, executing unit tests, or deploying to low-traffic scenarios where distributed infrastructure is unnecessary. Since tasks run inside the main process, latency is minimal, but long-running tasks will consume the same event loop as the API server.

### Running Tasks with AnyIO

```python
from langflow.services.task.service import TaskService
from langflow.services.deps import get_settings_service

# Acquire the SettingsService (already instantiated by Langflow)

settings = get_settings_service()

# Build the TaskService (defaults to AnyIOBackend)

task_service = TaskService(settings)

async def process_data(x: int) -> int:
    return x * 2

# Fire-and-forget execution

task_id = await task_service.fire_and_forget_task(process_data, 5)
print(f"Launched task {task_id} using backend: {task_service.backend_name}")

# Output: Launched task <uuid> using backend: anyio

```

The returned `task_id` is a UUID generated locally by the `AnyIOBackend` instance.

## Celery Backend: Distributed Task Processing

### Implementation Details

The `CeleryBackend` in [`src/backend/base/langflow/services/task/backends/celery.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/task/backends/celery.py) provides a thin wrapper around a Celery application instance. It submits tasks using the standard `.delay()` method and returns Celery task IDs for tracking. The backend wraps Celery's `AsyncResult` objects to provide consistent status checking and revocation capabilities.

When Celery is enabled, Langflow delegates to the worker defined in `langflow.worker.celery_app`. If the broker connection fails, the backend raises Langflow-specific exceptions including `WorkflowServiceUnavailableError` or `WorkflowResourceError` to maintain consistent error handling across both backends.

### Configuring Celery

Enable distributed execution by setting the following environment variables or configuration options:

```yaml

# config.yaml

celery_enabled: true
redis_url: "redis://localhost:6379/0"

```

Or via shell environment:

```bash
export LANGFLOW_CELERY_ENABLED=true
export LANGFLOW_REDIS_URL=redis://localhost:6379/0

```

After restarting Langflow, `TaskService` automatically instantiates `CeleryBackend`:

```python
print(task_service.backend_name)  # Output: celery

```

### Submitting Celery Tasks

```python

# tasks.py - Celery-registered function

from langflow.worker import celery_app

@celery_app.task
def heavy_computation(x: int) -> int:
    import time
    time.sleep(5)  # Simulate intensive processing

    return x ** 3

```

```python

# Within Langflow service code

task_id = await task_service.fire_and_forget_task(heavy_computation, 4)
print(f"Celery task ID: {task_id}")

# Output: Celery task ID: <celery-task-id>

```

## Unified Task Management Across Backends

Regardless of which backend is active, `TaskService` exposes a consistent API through three primary methods: `fire_and_forget_task`, `launch_task`, and `revoke_task`. This abstraction allows you to cancel running operations without knowing which backend is handling execution:

```python

# Cancel a running task (works with AnyIO or Celery)

await task_service.revoke_task(task_id)
print("Task cancelled successfully")

```

The revocation behavior differs internally based on the active backend:

- **AnyIO**: Cancels the task's `anyio.CancelScope`, immediately stopping the coroutine within the local process.
- **Celery**: Invokes `AsyncResult(task_id).revoke(terminate=True)`, signaling the external worker process to abort the task.

## Summary

- Langflow provides two **task backends** for flow execution: `AnyIOBackend` for local processing and `CeleryBackend` for distributed queues.
- The `celery_enabled` flag in [`src/lfx/src/lfx/services/settings/base.py`](https://github.com/langflow-ai/langflow/blob/main/src/lfx/src/lfx/services/settings/base.py) controls backend selection via `TaskService.get_backend()` in [`src/backend/base/langflow/services/task/service.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/task/service.py).
- **AnyIO** is the default backend, running tasks in-process using AnyIO task groups and cancellation scopes.
- **Celery** requires external broker configuration (Redis/RabbitMQ) and delegates to `langflow.worker.celery_app` for scalable execution.
- Both backends implement the same `TaskBackend` interface, supporting `fire_and_forget_task`, `launch_task`, and `revoke_task` with identical method signatures.

## Frequently Asked Questions

### What is the default task backend in Langflow?

The default backend is **AnyIO**, which executes tasks asynchronously within the main Langflow process. This is determined by the `celery_enabled` setting defaulting to `false` in the configuration schema located at [`src/lfx/src/lfx/services/settings/base.py`](https://github.com/langflow-ai/langflow/blob/main/src/lfx/src/lfx/services/settings/base.py).

### How do I switch from AnyIO to Celery in Langflow?

Set the environment variable `LANGFLOW_CELERY_ENABLED=true` or add `celery_enabled: true` to your [`config.yaml`](https://github.com/langflow-ai/langflow/blob/main/config.yaml) file. You must also configure a message broker URL via `LANGFLOW_REDIS_URL` or equivalent settings. Upon restart, `TaskService` will instantiate `CeleryBackend` instead of `AnyIOBackend`.

### Can I use Redis as the broker for Celery in Langflow?

Yes, Langflow explicitly supports Redis as a broker for Celery. Configure the connection string using the `redis_url` setting or `LANGFLOW_REDIS_URL` environment variable (e.g., `redis://localhost:6379/0`). The Celery backend in [`src/backend/base/langflow/services/task/backends/celery.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/task/backends/celery.py) uses this configuration to establish the broker connection.

### How does task cancellation work in Langflow?

Task cancellation works through the unified `revoke_task(task_id)` method available on `TaskService`. When using **AnyIO**, this cancels the local AnyIO task scope immediately. When using **Celery**, this sends a revocation signal to the worker via `AsyncResult.revoke(terminate=True)`, allowing you to abort long-running distributed tasks.