How n8n Handles Scaling and Queue-Based Processing with Bull: Architecture Deep Dive
n8n leverages Bull queues backed by Redis to enable horizontal scaling, where a main process enqueues workflow executions and worker processes consume them, with automatic recovery, priority handling, and real-time progress reporting via Redis pub-sub.
The n8n-io/n8n repository implements a robust queue-mode architecture that separates workflow orchestration from execution. Understanding n8n scaling and queue-based processing with Bull is essential for deploying production-grade automation infrastructure that can handle high-throughput workloads across multiple worker instances while maintaining data consistency and fault tolerance.
Configuration: ScalingModeConfig and Redis Settings
All queue-related behavior in n8n is driven by the ScalingModeConfig, located in packages/@n8n/config/src/configs/scaling-mode.config.ts. This configuration class bundles Bull-specific options read from environment variables prefixed with QUEUE_.
The BullConfig nested class defines critical parameters including Redis connection details, key prefixes, and stalled-job detection intervals:
// packages/@n8n/config/src/configs/scaling-mode.config.ts
@Config
class BullConfig {
@Env('QUEUE_BULL_PREFIX')
prefix: string = 'bull'; // Redis key prefix
@Nested
redis: RedisConfig; // host, port, DB, TLS, etc.
@Env('QUEUE_WORKER_STALLED_INTERVAL')
stalledInterval: number = 30_000; // How often Bull checks for stalled jobs (0 = disabled)
// … other settings for lock duration, renew time, etc.
}
This configuration abstraction allows n8n to adapt to any Redis deployment topology—whether single-node, cluster, or TLS-secured instances—while standardizing queue behavior across the application.
Scaling Service: Bull Queue Management
The ScalingService (packages/cli/src/scaling/scaling.service.ts) acts as the central coordinator for queue operations. As a dependency-injected singleton, it creates the Bull queue instance, registers global event listeners, and manages periodic maintenance tasks.
Queue Initialization
The setupQueue() method dynamically imports Bull and initializes the queue with validated Redis configuration:
// packages/cli/src/scaling/scaling.service.ts
async setupQueue() {
const { default: BullQueue } = await import('bull');
const { RedisClientService } = await import('@/services/redis-client.service');
const bullPrefix = this.globalConfig.queue.bull.prefix;
const prefix = RedisClientService.toValidPrefix(bullPrefix);
this.queue = new BullQueue(QUEUE_NAME, {
prefix,
settings: { …this.globalConfig.queue.bull.settings, maxStalledCount: 0 },
createClient: (type) => RedisClientService.createClient({ type: `${type}(bull)` }),
});
this.registerListeners(); // worker vs main listeners
this.scheduleQueueMetrics(); // Prometheus
if (this.instanceSettings.isLeader) this.scheduleQueueRecovery(0);
}
Event Listeners and Process Communication
The service registers distinct listener sets depending on the instance type (main/webhook vs. worker). These listeners handle global:progress events for cross-process communication:
- Worker listeners respond to
abort-jobmessages to gracefully stop running workflows - Main listeners handle
send-chunk(UI streaming),respond-to-webhook(HTTP responses),job-finished(completion tracking), andmcp-response(AI tool results)
This pub-sub mechanism allows workers to report execution progress back to the main process without direct network calls.
Job Processor: Executing Workflows on Workers
The JobProcessor (packages/cli/src/scaling/job-processor.ts) contains the core execution logic that runs on worker instances. When a worker calls ScalingService.setupWorker(concurrency), it registers a Bull processor that delegates to JobProcessor.processJob().
Execution Lifecycle
The processJob() method orchestrates the following steps:
- Load execution data from the database using
ExecutionRepository - Instantiate the Workflow class with static data, connections, and node types
- Attach lifecycle hooks that funnel data through
job.progress()calls - Execute the workflow via
WorkflowExecuteormanualExecutionService.runManually - Report completion via progress messages containing execution results
// packages/cli/src/scaling/job-processor.ts (excerpt)
async processJob(job: Job): Promise<JobResult> {
// … load execution, create workflow, set up hooks …
const workflowRun = …; // PCancelable<IRun>
this.runningJobs[job.id] = { … };
const run = await workflowRun; // await completion
const props = await this.fetchJobFinishedResult(executionId);
const msg: JobFinishedMessage = {
kind: 'job-finished',
version: 2,
executionId,
workerId: this.instanceSettings.hostId,
...props,
};
await job.progress(msg); // send to main
return { success: true };
}
MCP (AI Tool) Integration
For executions triggered by Model Context Protocol (MCP) tool calls, the processor sends specialized mcp-response messages:
if (job.data.isMcpExecution && job.data.mcpSessionId) {
const mcpMsg: McpResponseMessage = {
kind: 'mcp-response',
executionId,
mcpType: job.data.mcpType ?? 'service',
sessionId: job.data.mcpSessionId,
messageId: job.data.mcpMessageId ?? '',
response: { success: props.success },
workerId: this.instanceSettings.hostId,
};
await job.progress(mcpMsg);
}
This enables n8n to integrate with LangChain and other AI frameworks while maintaining the queue-based execution model.
Queue Recovery and Metrics
The ScalingService implements automatic recovery to handle orphaned executions when worker processes crash or network partitions occur.
Recovery Mechanism
Running exclusively on the leader main instance, the recovery process:
- Queries the database for in-progress execution IDs
- Fetches active and waiting jobs from Bull using
findJobsByStatus(['active','waiting']) - Marks any execution present in the database but missing from the queue as crashed
// packages/cli/src/scaling/scaling.service.ts (excerpt)
async recoverFromQueue() {
const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
const runningJobs = await this.findJobsByStatus(['active','waiting']);
const queuedIds = new Set(runningJobs.map(job => job.data.executionId));
const danglingIds = storedIds.filter(id => !queuedIds.has(id));
await this.executionRepository.markAsCrashed(danglingIds);
}
Metrics Collection
The service emits queue metrics at configurable intervals (queueMetricsInterval), exposing active/waiting job counts and completion statistics for Prometheus monitoring.
Practical Implementation Examples
Enqueuing a Workflow Execution
Main processes enqueue jobs via the ScalingService:
// packages/cli/src/scaling/scaling.service.ts usage
import { ScalingService } from '@/scaling/scaling.service';
import { Container } from '@n8n/di';
async function enqueue(workflowId: string, executionId: string) {
const scaling = Container.get(ScalingService);
await scaling.addJob(
{
workflowId,
executionId,
loadStaticData: true,
},
{ priority: 1 } // highest priority
);
}
Graceful Worker Shutdown
Workers should handle SIGTERM to finish active jobs before exiting:
process.on('SIGTERM', async () => {
const scaling = Container.get(ScalingService);
await scaling.stop(); // pauses queue, waits for running jobs, then exits
});
Environment Configuration
Configure Bull behavior through environment variables:
export QUEUE_BULL_REDIS_HOST=redis.mycompany.net
export QUEUE_BULL_REDIS_PORT=6380
export QUEUE_BULL_PREFIX=n8n-bull
export QUEUE_WORKER_STALLED_INTERVAL=15000 # check stalled jobs every 15s
export QUEUE_WORKER_LOCK_DURATION=120000 # 2 min lock lease
Summary
- ScalingModeConfig centralizes Bull and Redis configuration, enabling flexible deployments across different infrastructure topologies.
- ScalingService manages the Bull queue lifecycle, registers cross-process event listeners, and schedules recovery and metrics tasks.
- JobProcessor executes workflows on worker instances, reporting progress via Bull's Redis pub-sub mechanism to the main process.
- Automatic recovery identifies and marks crashed executions by comparing database state against active queue jobs.
- MCP integration extends the queue system to support AI tool calls while maintaining separation between main and worker processes.
Frequently Asked Questions
How does n8n handle stalled jobs in Bull queues?
n8n configures Bull with maxStalledCount: 0 and uses the QUEUE_WORKER_STALLED_INTERVAL environment variable (defaulting to 30 seconds) to control how frequently Bull checks for stalled jobs. The recovery mechanism in ScalingService additionally detects executions that have disappeared from the queue entirely and marks them as crashed in the database.
Can workers report real-time progress to the n8n UI during queue-based execution?
Yes. Workers use Bull's job.progress() method to send send-chunk messages that stream execution data back to the main process. The main process then forwards this data to connected UI clients, enabling real-time log viewing and debugging even when workflows run on separate worker instances.
What happens to running jobs when a worker process shuts down?
The ScalingService's stop() method pauses the Bull queue to prevent picking up new jobs, then waits for currently running jobs to complete before exiting. This graceful shutdown ensures that workflow executions are not interrupted mid-process, preventing data inconsistency or partial execution states.
How does n8n prioritize jobs in the Bull queue?
Jobs can be enqueued with priority options via the second parameter of ScalingService.addJob(). Higher priority values (lower numbers) cause Bull to process those jobs before lower-priority items in the queue, allowing critical workflows to execute ahead of background tasks.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →