Handling asynchronous tasks efficiently is crucial in Node.js applications, especially when dealing with time-intensive operations like sending emails, processing images, or performing complex calculations. Without proper management, these tasks can block the event loop, leading to poor performance and a subpar user experience. This is where BullMQ comes into play.
BullMQ is a powerful Node.js package that offers a reliable and scalable queuing system powered by Redis. It enables developers to transfer heavy operations to a queue in the background, keeping the main application responsive. With BullMQ you can successfully manage async queues, plan processes, and easily keep an eye on their progress.
This tutorial will show you how to manage asynchronous tasks with Node.js and BullMQ. The process involves setting up a project folder, performing a time-intensive task without using BullMQ, and enhancing the application by incorporating BullMQ for running tasks in parallel.
Before you begin, ensure you:
Set up a Linux VPS server.
Set up Node.js on your server.
Set up Redis on your server, as BullMQ depends on Redis for managing queues.
Before you can use Node.js and BullMQ for asynchronous tasks, it is necessary to establish your project directory. Set up and customize your Node.js application using these guidelines.
Open your terminal and go to the location of your project. Create a fresh folder and navigate into it:
mkdir bullmq-demo && cd bullmq-demo
Set up a Node.js project using npm. It generates a package.json
file containing the default configurations:
npm init -y
Set up the required packages for your application:
npm install express bullmq ioredis
Here's what each package does:
express
: A fast Node.js web framework commonly used for server creation.bullmq
: An excellent tool for handling queues within Node.js programs.ioredis
: A Redis client for Node.js that BullMQ needs in order to establish a connection with Redis.Create an index.js
file as the primary access point for your application:
touch index.js
Alternatively, you have the option to generate this file by using your code editor.
To set up a simple Express server, include this code in your index.js
file:
const express = require('express');
const app = express();
const port = 3000;
app.use(express.json());
app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});
This code initiates an Express app on port 3000
which handles requests using JSON middleware.
Start the server by running:
node index.js
The below message should appear:
Open up your internet browser and go to either http://your_server_ip:3000
or http://localhost:3000
. You will receive a "Cannot GET /" message as there are no routes set up, as anticipated.
When ready to proceed, you can terminate the server using Ctrl + C
.
This part describes how to include a route in your Express app that performs a time-consuming task in a synchronous way. This will demonstrate how specific tasks can block the event loop and negatively affect your application's performance.
Create a function in the index.js
file that simulates a computationally intensive task:
// Function to simulate a heavy computation
function heavyComputation() {
const start = Date.now();
// Run a loop for 5 seconds
while (Date.now() - start < 5000) {
// Perform a CPU-intensive task
Math.sqrt(Math.random());
}
}
The function runs a loop for about five seconds, performing math operations to mimic a CPU-heavy task.
Create a fresh route in your Express application that calls the heavyComputation
function:
app.get('/heavy-task', (req, res) => {
heavyComputation();
res.send('Heavy computation finished');
});
This route is set up to receive GET requests specifically at the /heavy-task
endpoint. After receiving a request, it carries out the specified intensive computation and then provides a response.
To restart your server, execute the following command:
node index.js
Confirm the server is functioning before moving on to the next stage.
Open your internet browser and type in either http://your_server_ip:3000/heavy-task
or http://localhost:3000/heavy-task
to access the webpage.
The following message should be displayed:
It is important to observe that the response time is approximately five seconds. The delay is a result of the synchronous execution of the intensive computation process.
After the server is up and running, open a new tab on your internet browser and go to http://your_server_ip:3000/
. The response to this request may not be immediate. The system delays taking action until the extensive processing of the previous step.
This happens when the time-consuming task is blocking the Node.js event loop, stopping the server from processing additional incoming requests.
When the server performs a task that takes a lot of time in a synchronous manner, it is unable to respond to additional requests. The act of blocking could result in a suboptimal user experience, particularly in apps that need to be highly responsive.
We saw in the last section how synchronous execution of time-consuming operations can severely affect your application's performance by slowing down the event loop.
This section explains how to implement a high-performance asynchronous queue into your application using BullMQ.
Make changes to the index.js
file to include BullMQ in your application.
At the top of your index.js
file, you should include the following import statements:
const { Queue, Worker } = require('bullmq');
const Redis = require('ioredis');
Next, set up a connection with Redis:
const connection = new Redis();
Redis has been programmed to run on port 6379
and the localhost interface by default. To create a connection to a remote Redis server that has a different port, please enter the appropriate host address and port number:
const connection = new Redis({
host: '127.0.0.1',
port: 6379,
maxRetriesPerRequest: null,
});
Create a new queue called heavyTaskQueue
:
const heavyTaskQueue = new Queue('heavyTaskQueue', { connection });
Change the heavy-task route to add a job to the queue instead of running the task right away:
app.get('/heavy-task', async (req, res) => {
await heavyTaskQueue.add('heavyComputation', {});
res.send('Heavy computation job added to the queue');
});
The application will respond after a lengthy process has completed, handling requests asynchronously, when the /heavy-task
route is accessed.
The worker must be implemented in a separate file. This is essential to ensure that the worker does not coexist with the Express server process. A worker's use of the heavyComputation
function during execution won't interfere with the event loop of the main application.
The index.js
file is structured in the following way:
const express = require('express');
const app = express();
const port = 3000;
app.use(express.json());
const { Queue } = require('bullmq');
const Redis = require('ioredis');
const connection = new Redis({
host: '127.0.0.1',
port: 6379,
maxRetriesPerRequest: null,
});
const heavyTaskQueue = new Queue('heavyTaskQueue', { connection });
app.get('/heavy-task', async (req, res) => {
await heavyTaskQueue.add('heavyComputation', {});
res.send('Heavy computation job added to the queue');
});
app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});
Generate a fresh file and name it worker.js
. The file is intended for executing the worker code in charge of handling tasks obtained from the queue.
Create the worker.js
file:
touch worker.js
Add Worker Code to worker.js
:
const { Worker } = require('bullmq');
const Redis = require('ioredis');
const connection = new Redis({
host: '127.0.0.1',
port: 6379,
maxRetriesPerRequest: null,
});
// Function to simulate a heavy computation
function heavyComputation() {
const start = Date.now();
// Run a loop for 5 seconds
while (Date.now() - start < 5000) {
// Perform a CPU-intensive task
Math.sqrt(Math.random());
}
}
const worker = new Worker(
'heavyTaskQueue',
async job => {
// Time-intensive task here
heavyComputation();
console.log('Heavy computation completed');
},
{ connection }
);
worker.on('completed', job => {
console.log(`Job ${job.id} has completed`);
});
worker.on('failed', (job, err) => {
console.log(`Job ${job.id} has failed with error ${err.message}`);
});
You must now execute worker.js
as an independent Node.js process.
Open a new terminal window or tab, navigate to your project folder, and run the specified command:
node worker.js
Initiate the Express server in your original terminal window:
node index.js
Proceed to conduct testing of the application utilizing BullMQ.
Make a Request to /heavy-task
:
Open your internet browser and type in either http://your_server_ip:3000/heavy-task
or http://localhost:3000/heavy-task
in the URL bar. The following message should be displayed:
Heavy computation job added to the queue.
The rapid response time suggests that there is no blockage in the main thread.
Monitoring your application's queues and jobs is essential for ensuring they are functioning properly and for troubleshooting purposes. BullMQ comes with a functionality called Bull Board, which offers a visual interface for overseeing your queues.
This part explains how to incorporate a dashboard into your application.
Use npm to install the @bull-board/express
package:
npm install @bull-board/express
In order to set up the bull board application, follow these steps:
Insert the code provided at the top of your index.js
file:
const { createBullBoard } = require('@bull-board/api');
const { BullMQAdapter } = require('@bull-board/api/bullMQAdapter');
const { ExpressAdapter } = require('@bull-board/express');
Initialize the Express adapter:
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
Create the Bull Board instance and pass your queue:
createBullBoard({
queues: [new BullMQAdapter(heavyTaskQueue)],
serverAdapter: serverAdapter,
});
Add the following line to mount the dashboard at /admin/queues
:
app.use('/admin/queues', serverAdapter.getRouter());
Make sure to include this line following the setup of your queue and worker.
The final index.js
file looks like below:
// Import Express and Initialize App
const express = require('express');
const app = express();
const port = 3000;
app.use(express.json());
// Import BullMQ and Redis
const { Queue } = require('bullmq');
const Redis = require('ioredis');
// Redis Connection
const connection = new Redis({
host: '127.0.0.1',
port: 6379,
maxRetriesPerRequest: null,
});
// Initialize Queue
const heavyTaskQueue = new Queue('heavyTaskQueue', { connection });
// Define Route to Add Job to Queue
app.get('/heavy-task', async (req, res) => {
await heavyTaskQueue.add('heavyComputation', {});
res.send('Heavy computation job added to the queue');
});
// Import Bull Board and Set Up Dashboard
const { createBullBoard } = require('@bull-board/api');
const { BullMQAdapter } = require('@bull-board/api/bullMQAdapter');
const { ExpressAdapter } = require('@bull-board/express');
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [new BullMQAdapter(heavyTaskQueue)],
serverAdapter: serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());
// Start the Server
app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});
To access the dashboard, follow the steps listed below:
node index.js
Open your browser and go to http://your_server_ip:3000/admin/queues
.
You can easily manage your BullMQ queues by integrating Bull Board into your application. It is much easier to keep an eye on progress and identify issues when you can view your queues and tasks on the dashboard in real-time.
You have now learned how to use BullMQ with Node.js to manage asynchronous processes. Your application's responsiveness and efficiency have been enhanced by moving time-consuming operations to a separate queue.
Your Node.js app is now much more capable of handling heavy demands thanks to the usage of queues.