Queues
Queues are a powerful design pattern that help you deal with common application scaling and performance challenges. Some examples of problems that Queues can help you solve are:
- Smooth out processing peaks. For example, if users can initiate resource-intensive tasks at arbitrary times, you can add these tasks to a queue instead of performing them synchronously. Then you can have worker processes pull tasks from the queue in a controlled manner. You can easily add new Queue consumers to scale up the back-end task handling as the application scales up.
- Break up monolithic tasks that may otherwise block the Node.js event loop. For example, if a user request requires CPU intensive work like audio transcoding, you can delegate this task to other processes, freeing up user-facing processes to remain responsive.
- Provide a reliable communication channel across various services. For example, you can queue tasks (jobs) in one process or service, and consume them in another. You can be notified (by listening for status events) upon completion, error or other state changes in the job life cycle from any process or service. When Queue producers or consumers fail, their state is preserved and task handling can restart automatically when nodes are restarted.
Nest provides the @nestjs/bullmq
package for BullMQ integration and @nestjs/bull
package for Bull integration. Both packages are abstractions/wrappers on top of their respective libraries, which were developed by the same team. Bull is currently in maintenance mode, with the team focusing on fixing bugs, while BullMQ is actively developed, featuring a modern TypeScript implementation and a different set of features. If Bull meets your requirements, it remains a reliable and battle-tested choice. The Nest packages make it easy to integrate both, BullMQ or Bull Queues, into your Nest application in a friendly way.
Both BullMQ and Bull use Redis to persist job data, so you'll need to have Redis installed on your system. Because they are Redis-backed, your Queue architecture can be completely distributed and platform-independent. For example, you can have some Queue producers and consumers and listeners running in Nest on one (or several) nodes, and other producers, consumers and listeners running on other Node.js platforms on other network nodes.
This chapter covers the @nestjs/bullmq
and @nestjs/bull
packages. We also recommend reading the BullMQ and Bull documentation for more background and specific implementation details.
BullMQ installation
To begin using BullMQ, we first install the required dependencies.
$ npm install --save @nestjs/bullmq bullmq
Once the installation process is complete, we can import the BullModule
into the root AppModule
.
@@filename(app.module)
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
The forRoot()
method is used to register a bullmq
package configuration object that will be used by all queues registered in the application (unless specified otherwise). For your reference, the following are a few of the properties within a configuration object:
connection: ConnectionOptions
- Options to configure the Redis connection. See Connections for more information. Optional.prefix: string
- Prefix for all queue keys. Optional.defaultJobOptions: JobOpts
- Options to control the default settings for new jobs. See JobOpts for more information. Optional.settings: AdvancedSettings
- Advanced Queue configuration settings. These should usually not be changed. See AdvancedSettings for more information. Optional.
All the options are optional, providing detailed control over queue behavior. These are passed directly to the BullMQ Queue
constructor. Read more about these options and other options here.
To register a queue, import the BullModule.registerQueue()
dynamic module, as follows:
BullModule.registerQueue({
name: 'audio',
});
info Hint Create multiple queues by passing multiple comma-separated configuration objects to the
registerQueue()
method.
The registerQueue()
method is used to instantiate and/or register queues. Queues are shared across modules and processes that connect to the same underlying Redis database with the same credentials. Each queue is unique by its name property. A queue name is used as both an injection token (for injecting the queue into controllers/providers), and as an argument to decorators to associate consumer classes and listeners with queues.
You can also override some of the pre-configured options for a specific queue, as follows:
BullModule.registerQueue({
name: 'audio',
connection: {
port: 6380,
},
});
BullMQ also supports parent - child relationships between jobs. This functionality enables the creation of flows where jobs are the node of trees of arbitrary depth. To read more about them check here.
To add a flow, you can do the following:
BullModule.registerFlowProducer({
name: 'flowProducerName',
});
Since jobs are persisted in Redis, each time a specific named queue is instantiated (e.g., when an app is started/restarted), it attempts to process any old jobs that may exist from a previous unfinished session.
Each queue can have one or many producers, consumers, and listeners. Consumers retrieve jobs from the queue in a specific order: FIFO (the default), LIFO, or according to priorities. Controlling queue processing order is discussed here.
Named configurations
If your queues connect to multiple different Redis instances, you can use a technique called named configurations. This feature allows you to register several configurations under specified keys, which then you can refer to in the queue options.
For example, assuming that you have an additional Redis instance (apart from the default one) used by a few queues registered in your application, you can register its configuration as follows:
BullModule.forRoot('alternative-config', {
connection: {
port: 6381,
},
});
In the example above, 'alternative-config'
is just a configuration key (it can be any arbitrary string).
With this in place, you can now point to this configuration in the registerQueue()
options object:
BullModule.registerQueue({
configKey: 'alternative-config',
name: 'video',
});
Producers
Job producers add jobs to queues. Producers are typically application services (Nest providers). To add jobs to a queue, first inject the queue into the service as follows:
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
info Hint The
@InjectQueue()
decorator identifies the queue by its name, as provided in theregisterQueue()
method call (e.g.,'audio'
).
Now, add a job by calling the queue's add()
method, passing a user-defined job object. Jobs are represented as serializable JavaScript objects (since that is how they are stored in the Redis database). The shape of the job you pass is arbitrary; use it to represent the semantics of your job object. You also need to give it a name. This allows you to create specialized consumers that will only process jobs with a given name.
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
Job options
Jobs can have additional options associated with them. Pass an options object after the job
argument in the Queue.add()
method. Some of the job options properties are:
priority
:number
- Optional priority value. Ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that using priorities has a slight impact on performance, so use them with caution.delay
:number
- An amount of time (milliseconds) to wait until this job can be processed. Note that for accurate delays, both server and clients should have their clocks synchronized.attempts
:number
- The total number of attempts to try the job until it completes.repeat
:RepeatOpts
- Repeat job according to a cron specification. See RepeatOpts.backoff
:number | BackoffOpts
- Backoff setting for automatic retries if the job fails. See BackoffOpts.lifo
:boolean
- If true, adds the job to the right end of the queue instead of the left (default false).jobId
:number
|string
- Override the job ID - by default, the job ID is a unique integer, but you can use this setting to override it. If you use this option, it is up to you to ensure the jobId is unique. If you attempt to add a job with an id that already exists, it will not be added.removeOnComplete
:boolean | number
- If true, removes the job when it successfully completes. A number specifies the amount of jobs to keep. Default behavior is to keep the job in the completed set.removeOnFail
:boolean | number
- If true, removes the job when it fails after all attempts. A number specifies the amount of jobs to keep. Default behavior is to keep the job in the failed set.stackTraceLimit
:number
- Limits the amount of stack trace lines that will be recorded in the stacktrace.
Here are a few examples of customizing jobs with job options.
To delay the start of a job, use the delay
configuration property.
const job = await this.audioQueue.add(
'transcode',
{
foo: 'bar',
},
{ delay: 3000 }, // 3 seconds delayed
);
To add a job to the right end of the queue (process the job as LIFO (Last In First Out)), set the lifo
property of the configuration object to true
.
const job = await this.audioQueue.add(
'transcode',
{
foo: 'bar',
},
{ lifo: true },
);
To prioritize a job, use the priority
property.
const job = await this.audioQueue.add(
'transcode',
{
foo: 'bar',
},
{ priority: 2 },
);
For a full list of options, check the API documentation here and here.
Consumers
A consumer is a class defining methods that either process jobs added into the queue, or listen for events on the queue, or both. Declare a consumer class using the @Processor()
decorator as follows:
import { Processor } from '@nestjs/bullmq';
@Processor('audio')
export class AudioConsumer {}
info Hint Consumers must be registered as
providers
so the@nestjs/bullmq
package can pick them up.
Where the decorator's string argument (e.g., 'audio'
) is the name of the queue to be associated with the class methods.
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('audio')
export class AudioConsumer extends WorkerHost {
async process(job: Job<any, any, string>): Promise<any> {
let progress = 0;
for (i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 1;
await job.progress(progress);
}
return {};
}
}
The process method is called whenever the worker is idle and there are jobs to process in the queue. This handler method receives the job
object as its only argument. The value returned by the handler method is stored in the job object and can be accessed later on, for example in a listener for the completed event.
Job
objects have multiple methods that allow you to interact with their state. For example, the above code uses the progress()
method to update the job's progress. See here for the complete Job
object API reference.
In the older version, Bull, you could designate that a job handler method will handle only jobs of a certain type (jobs with a specific name
) by passing that name
to the @Process()
decorator as shown below.
warning Warning This doesn't work with BullMQ, keep reading.
@Process('transcode')
async transcode(job: Job<unknown>) { ... }
This behavior is not supported in BullMQ due to confusions it generated. Instead, you need switch cases to call different services or logic for each job name:
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('audio')
export class AudioConsumer extends WorkerHost {
async process(job: Job<any, any, string>): Promise<any> {
switch (job.name) {
case 'transcode': {
let progress = 0;
for (i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 1;
await job.progress(progress);
}
return {};
}
case 'concatenate': {
await doSomeLogic2();
break;
}
}
}
}
This is covered in the named processor section of the BullMQ documentation.
Request-scoped consumers
When a consumer is flagged as request-scoped (learn more about the injection scopes here), a new instance of the class will be created exclusively for each job. The instance will be garbage-collected after the job has completed.
@Processor({
name: 'audio',
scope: Scope.REQUEST,
})
Since request-scoped consumer classes are instantiated dynamically and scoped to a single job, you can inject a JOB_REF
through the constructor using a standard approach.
constructor(@Inject(JOB_REF) jobRef: Job) {
console.log(jobRef);
}
info Hint The
JOB_REF
token is imported from the@nestjs/bullmq
package.
Event listeners
Bull generates a set of useful events when queue and/or job state changes occur. Nest provides the @OnQueueEvent(event)
decorator that allows subscribing to a core set of standard events.
Event listeners must be declared within a consumer class (i.e., within a class decorated with the @Processor()
decorator). To listen for an event, use the @OnQueueEvent(event)
decorator with the event you want to be handled. For example, to listen to the event emitted when a job enters the active state in the audio
queue, use the following construct:
import { Processor, Process, OnQueueEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('audio')
export class AudioConsumer {
@OnQueueEvent('active')
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
// ...
You can see the complete list of events as properties of QueueEventsListener here.
Queue management
Queues have an API that allows you to perform management functions like pausing and resuming, retrieving the count of jobs in various states, and several more. You can find the full queue API here. Invoke any of these methods directly on the Queue
object, as shown below with the pause/resume examples.
Pause a queue with the pause()
method call. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized.
await audioQueue.pause();
To resume a paused queue, use the resume()
method, as follows:
await audioQueue.resume();
Separate processes
Job handlers can also be run in a separate (forked) process (source). This has several advantages:
- The process is sandboxed so if it crashes it does not affect the worker.
- You can run blocking code without affecting the queue (jobs will not stall).
- Much better utilization of multi-core CPUs.
- Less connections to redis.
@@filename(app.module)
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { join } from 'path';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
processors: [join(__dirname, 'processor.js')],
}),
],
})
export class AppModule {}
warning Warning Please note that because your function is being executed in a forked process, Dependency Injection (and IoC container) won't be available. That means that your processor function will need to contain (or create) all instances of external dependencies it needs.
Async configuration
You may want to pass bullmq
options asynchronously instead of statically. In this case, use the forRootAsync()
method which provides several ways to deal with async configuration. Likewise, if you want to pass queue options asynchronously, use the registerQueueAsync()
method.
One approach is to use a factory function:
BullModule.forRootAsync({
useFactory: () => ({
connection: {
host: 'localhost',
port: 6379,
},
}),
});
Our factory behaves like any other asynchronous provider (e.g., it can be async
and it's able to inject dependencies through inject
).
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.get('QUEUE_HOST'),
port: configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});
Alternatively, you can use the useClass
syntax:
BullModule.forRootAsync({
useClass: BullConfigService,
});
The construction above will instantiate BullConfigService
inside BullModule
and use it to provide an options object by calling createSharedConfiguration()
. Note that this means that the BullConfigService
has to implement the SharedBullConfigurationFactory
interface, as shown below:
@Injectable()
class BullConfigService implements SharedBullConfigurationFactory {
createSharedConfiguration(): BullModuleOptions {
return {
connection: {
host: 'localhost',
port: 6379,
},
};
}
}
In order to prevent the creation of BullConfigService
inside BullModule
and use a provider imported from a different module, you can use the useExisting
syntax.
BullModule.forRootAsync({
imports: [ConfigModule],
useExisting: ConfigService,
});
This construction works the same as useClass
with one critical difference - BullModule
will lookup imported modules to reuse an existing ConfigService
instead of instantiating a new one.
Bull installation
warning Note If you decided to use BullMQ, skip this section and the following chapters.
To begin using Bull, we first install the required dependencies.
$ npm install --save @nestjs/bull bull
Once the installation process is complete, we can import the BullModule
into the root AppModule
.
@@filename(app.module)
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
The forRoot()
method is used to register a bull
package configuration object that will be used by all queues registered in the application (unless specified otherwise). A configuration object consists of the following properties:
limiter: RateLimiter
- Options to control the rate at which the queue's jobs are processed. See RateLimiter for more information. Optional.redis: RedisOpts
- Options to configure the Redis connection. See RedisOpts for more information. Optional.prefix: string
- Prefix for all queue keys. Optional.defaultJobOptions: JobOpts
- Options to control the default settings for new jobs. See JobOpts for more information. Optional.settings: AdvancedSettings
- Advanced Queue configuration settings. These should usually not be changed. See AdvancedSettings for more information. Optional.
All the options are optional, providing detailed control over queue behavior. These are passed directly to the Bull Queue
constructor. Read more about these options here.
To register a queue, import the BullModule.registerQueue()
dynamic module, as follows:
BullModule.registerQueue({
name: 'audio',
});
info Hint Create multiple queues by passing multiple comma-separated configuration objects to the
registerQueue()
method.
The registerQueue()
method is used to instantiate and/or register queues. Queues are shared across modules and processes that connect to the same underlying Redis database with the same credentials. Each queue is unique by its name property. A queue name is used as both an injection token (for injecting the queue into controllers/providers), and as an argument to decorators to associate consumer classes and listeners with queues.
You can also override some of the pre-configured options for a specific queue, as follows:
BullModule.registerQueue({
name: 'audio',
redis: {
port: 6380,
},
});
Since jobs are persisted in Redis, each time a specific named queue is instantiated (e.g., when an app is started/restarted), it attempts to process any old jobs that may exist from a previous unfinished session.
Each queue can have one or many producers, consumers, and listeners. Consumers retrieve jobs from the queue in a specific order: FIFO (the default), LIFO, or according to priorities. Controlling queue processing order is discussed here.
Named configurations
If your queues connect to multiple Redis instances, you can use a technique called named configurations. This feature allows you to register several configurations under specified keys, which then you can refer to in the queue options.
For example, assuming that you have an additional Redis instance (apart from the default one) used by a few queues registered in your application, you can register its configuration as follows:
BullModule.forRoot('alternative-config', {
redis: {
port: 6381,
},
});
In the example above, 'alternative-config'
is just a configuration key (it can be any arbitrary string).
With this in place, you can now point to this configuration in the registerQueue()
options object:
BullModule.registerQueue({
configKey: 'alternative-config',
name: 'video',
});
Producers
Job producers add jobs to queues. Producers are typically application services (Nest providers). To add jobs to a queue, first inject the queue into the service as follows:
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
info Hint The
@InjectQueue()
decorator identifies the queue by its name, as provided in theregisterQueue()
method call (e.g.,'audio'
).
Now, add a job by calling the queue's add()
method, passing a user-defined job object. Jobs are represented as serializable JavaScript objects (since that is how they are stored in the Redis database). The shape of the job you pass is arbitrary; use it to represent the semantics of your job object.
const job = await this.audioQueue.add({
foo: 'bar',
});
Named jobs
Jobs may have unique names. This allows you to create specialized consumers that will only process jobs with a given name.
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
Warning Warning When using named jobs, you must create processors for each unique name added to a queue, or the queue will complain that you are missing a processor for the given job. See here for more information on consuming named jobs.