How n8n Handles Scaling and Queue-Based Processing with Bull: Architecture Deep Dive

n8n leverages Bull queues backed by Redis to enable horizontal scaling, where a main process enqueues workflow executions and worker processes consume them, with automatic recovery, priority handling, and real-time progress reporting via Redis pub-sub.

The n8n-io/n8n repository implements a robust queue-mode architecture that separates workflow orchestration from execution. Understanding n8n scaling and queue-based processing with Bull is essential for deploying production-grade automation infrastructure that can handle high-throughput workloads across multiple worker instances while maintaining data consistency and fault tolerance.

Configuration: ScalingModeConfig and Redis Settings

All queue-related behavior in n8n is driven by the ScalingModeConfig, located in packages/@n8n/config/src/configs/scaling-mode.config.ts. This configuration class bundles Bull-specific options read from environment variables prefixed with QUEUE_.

The BullConfig nested class defines critical parameters including Redis connection details, key prefixes, and stalled-job detection intervals:

// packages/@n8n/config/src/configs/scaling-mode.config.ts
@Config
class BullConfig {
    @Env('QUEUE_BULL_PREFIX')
    prefix: string = 'bull';                     // Redis key prefix

    @Nested
    redis: RedisConfig;                          // host, port, DB, TLS, etc.

    @Env('QUEUE_WORKER_STALLED_INTERVAL')
    stalledInterval: number = 30_000;            // How often Bull checks for stalled jobs (0 = disabled)
    
    // … other settings for lock duration, renew time, etc.
}

This configuration abstraction allows n8n to adapt to any Redis deployment topology—whether single-node, cluster, or TLS-secured instances—while standardizing queue behavior across the application.

Scaling Service: Bull Queue Management

The ScalingService (packages/cli/src/scaling/scaling.service.ts) acts as the central coordinator for queue operations. As a dependency-injected singleton, it creates the Bull queue instance, registers global event listeners, and manages periodic maintenance tasks.

Queue Initialization

The setupQueue() method dynamically imports Bull and initializes the queue with validated Redis configuration:

// packages/cli/src/scaling/scaling.service.ts
async setupQueue() {
    const { default: BullQueue } = await import('bull');
    const { RedisClientService } = await import('@/services/redis-client.service');

    const bullPrefix = this.globalConfig.queue.bull.prefix;
    const prefix = RedisClientService.toValidPrefix(bullPrefix);

    this.queue = new BullQueue(QUEUE_NAME, {
        prefix,
        settings: { …this.globalConfig.queue.bull.settings, maxStalledCount: 0 },
        createClient: (type) => RedisClientService.createClient({ type: `${type}(bull)` }),
    });

    this.registerListeners();          // worker vs main listeners
    this.scheduleQueueMetrics();       // Prometheus
    if (this.instanceSettings.isLeader) this.scheduleQueueRecovery(0);
}

Event Listeners and Process Communication

The service registers distinct listener sets depending on the instance type (main/webhook vs. worker). These listeners handle global:progress events for cross-process communication:

  • Worker listeners respond to abort-job messages to gracefully stop running workflows
  • Main listeners handle send-chunk (UI streaming), respond-to-webhook (HTTP responses), job-finished (completion tracking), and mcp-response (AI tool results)

This pub-sub mechanism allows workers to report execution progress back to the main process without direct network calls.

Job Processor: Executing Workflows on Workers

The JobProcessor (packages/cli/src/scaling/job-processor.ts) contains the core execution logic that runs on worker instances. When a worker calls ScalingService.setupWorker(concurrency), it registers a Bull processor that delegates to JobProcessor.processJob().

Execution Lifecycle

The processJob() method orchestrates the following steps:

  1. Load execution data from the database using ExecutionRepository
  2. Instantiate the Workflow class with static data, connections, and node types
  3. Attach lifecycle hooks that funnel data through job.progress() calls
  4. Execute the workflow via WorkflowExecute or manualExecutionService.runManually
  5. Report completion via progress messages containing execution results
// packages/cli/src/scaling/job-processor.ts (excerpt)
async processJob(job: Job): Promise<JobResult> {
    // … load execution, create workflow, set up hooks …
    const workflowRun = …;                     // PCancelable<IRun>
    this.runningJobs[job.id] = { … };

    const run = await workflowRun;             // await completion

    const props = await this.fetchJobFinishedResult(executionId);
    const msg: JobFinishedMessage = {
        kind: 'job-finished',
        version: 2,
        executionId,
        workerId: this.instanceSettings.hostId,
        ...props,
    };
    await job.progress(msg);                  // send to main
    return { success: true };
}

MCP (AI Tool) Integration

For executions triggered by Model Context Protocol (MCP) tool calls, the processor sends specialized mcp-response messages:

if (job.data.isMcpExecution && job.data.mcpSessionId) {
    const mcpMsg: McpResponseMessage = {
        kind: 'mcp-response',
        executionId,
        mcpType: job.data.mcpType ?? 'service',
        sessionId: job.data.mcpSessionId,
        messageId: job.data.mcpMessageId ?? '',
        response: { success: props.success },
        workerId: this.instanceSettings.hostId,
    };
    await job.progress(mcpMsg);
}

This enables n8n to integrate with LangChain and other AI frameworks while maintaining the queue-based execution model.

Queue Recovery and Metrics

The ScalingService implements automatic recovery to handle orphaned executions when worker processes crash or network partitions occur.

Recovery Mechanism

Running exclusively on the leader main instance, the recovery process:

  1. Queries the database for in-progress execution IDs
  2. Fetches active and waiting jobs from Bull using findJobsByStatus(['active','waiting'])
  3. Marks any execution present in the database but missing from the queue as crashed
// packages/cli/src/scaling/scaling.service.ts (excerpt)
async recoverFromQueue() {
    const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
    const runningJobs = await this.findJobsByStatus(['active','waiting']);
    const queuedIds = new Set(runningJobs.map(job => job.data.executionId));

    const danglingIds = storedIds.filter(id => !queuedIds.has(id));
    await this.executionRepository.markAsCrashed(danglingIds);
}

Metrics Collection

The service emits queue metrics at configurable intervals (queueMetricsInterval), exposing active/waiting job counts and completion statistics for Prometheus monitoring.

Practical Implementation Examples

Enqueuing a Workflow Execution

Main processes enqueue jobs via the ScalingService:

// packages/cli/src/scaling/scaling.service.ts usage
import { ScalingService } from '@/scaling/scaling.service';
import { Container } from '@n8n/di';

async function enqueue(workflowId: string, executionId: string) {
    const scaling = Container.get(ScalingService);
    await scaling.addJob(
        {
            workflowId,
            executionId,
            loadStaticData: true,
        },
        { priority: 1 }               // highest priority
    );
}

Graceful Worker Shutdown

Workers should handle SIGTERM to finish active jobs before exiting:

process.on('SIGTERM', async () => {
    const scaling = Container.get(ScalingService);
    await scaling.stop();   // pauses queue, waits for running jobs, then exits
});

Environment Configuration

Configure Bull behavior through environment variables:

export QUEUE_BULL_REDIS_HOST=redis.mycompany.net
export QUEUE_BULL_REDIS_PORT=6380
export QUEUE_BULL_PREFIX=n8n-bull
export QUEUE_WORKER_STALLED_INTERVAL=15000   # check stalled jobs every 15s

export QUEUE_WORKER_LOCK_DURATION=120000    # 2 min lock lease

Summary

  • ScalingModeConfig centralizes Bull and Redis configuration, enabling flexible deployments across different infrastructure topologies.
  • ScalingService manages the Bull queue lifecycle, registers cross-process event listeners, and schedules recovery and metrics tasks.
  • JobProcessor executes workflows on worker instances, reporting progress via Bull's Redis pub-sub mechanism to the main process.
  • Automatic recovery identifies and marks crashed executions by comparing database state against active queue jobs.
  • MCP integration extends the queue system to support AI tool calls while maintaining separation between main and worker processes.

Frequently Asked Questions

How does n8n handle stalled jobs in Bull queues?

n8n configures Bull with maxStalledCount: 0 and uses the QUEUE_WORKER_STALLED_INTERVAL environment variable (defaulting to 30 seconds) to control how frequently Bull checks for stalled jobs. The recovery mechanism in ScalingService additionally detects executions that have disappeared from the queue entirely and marks them as crashed in the database.

Can workers report real-time progress to the n8n UI during queue-based execution?

Yes. Workers use Bull's job.progress() method to send send-chunk messages that stream execution data back to the main process. The main process then forwards this data to connected UI clients, enabling real-time log viewing and debugging even when workflows run on separate worker instances.

What happens to running jobs when a worker process shuts down?

The ScalingService's stop() method pauses the Bull queue to prevent picking up new jobs, then waits for currently running jobs to complete before exiting. This graceful shutdown ensures that workflow executions are not interrupted mid-process, preventing data inconsistency or partial execution states.

How does n8n prioritize jobs in the Bull queue?

Jobs can be enqueued with priority options via the second parameter of ScalingService.addJob(). Higher priority values (lower numbers) cause Bull to process those jobs before lower-priority items in the queue, allowing critical workflows to execute ahead of background tasks.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →