Setting Up a Task Queue with Celery and RabbitMQ on Ubuntu 20.04
Task queues are an essential component in distributed systems, enabling efficient management and execution of asynchronous tasks across multiple nodes or workers. This tutorial will guide you through setting up a task queue using Celery, a powerful Python task-queue system, along with RabbitMQ, a robust message broker.
Celery and RabbitMQ
Celery is a powerful and versatile Python task-queue system designed to streamline and manage asynchronous task execution. It serves as a robust framework for distributing tasks among workers, facilitating parallel processing and enhancing overall system scalability. With Celery, developers can orchestrate complex workflows, schedule tasks, and handle task priorities effectively, making it an essential tool for building responsive and efficient applications.
On the other hand, RabbitMQ stands as a reliable message broker that plays a pivotal role in coordinating communication between Celery tasks and workers. As a high-performance messaging platform, RabbitMQ efficiently manages task queues, ensuring seamless message delivery and optimal utilization of resources. Its advanced features, such as message routing, fault tolerance, and clustering, make RabbitMQ a preferred choice for building robust distributed systems that rely on asynchronous task processing.
Together, Celery and RabbitMQ form a potent combination, empowering developers to create scalable, responsive, and fault-tolerant applications capable of handling complex workflows and high volumes of tasks with ease.
Prerequisites
Before starting, ensure you have:
- A cloud server with Ubuntu 20.04 installed.
- Set hostname of the server
- A non-root user with
sudo
privileges - Basic knowledge of Python and terminal commands.
Installing RabbitMQ
1. Update your package index:
sudo apt update
2. Install RabbitMQ:
sudo apt install rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
3. Enable and start the RabbitMQ service:
sudo systemctl enable rabbitmq-server && sudo systemctl start rabbitmq-server
sudo systemctl status rabbitmq-server
Setting Up Celery
Install pip if it is not yet installed:
sudo apt install python3-pip
Create a Python virtual environment. If virtual environment is not install use below command to install
sudo apt install python3.8-venv
Then run:
python -m venv celeryenv
Activate the virtual environment:
source celeryenv/bin/activate
If at any time you need to deactivate the environment (not now), you can type:
deactivate
Install Celery within the virtual environment:
pip install celery
Writing a Celery Application
A Celery application consists of two main components:
- Workers: These wait for messages from RabbitMQ and execute the tasks.
- Clients: These submit messages to RabbitMQ to trigger task execution and retrieve the result later.
Tasks are defined in a module used by both workers and clients. Workers execute the task code, while clients use the function definitions to expose tasks and abstract RabbitMQ publishing complexity.
Step 1. Create Directories
Create a directory for the Python module (celeryapp
) and another for storing downloaded files (downloaded_files
):
mkdir ~/downloaded_files ~/celeryapp
cd ~/celeryapp
Step 2. Create the Module
Create a module file celeryapp.py
containing two functions (download
and list
) that will serve as asynchronous tasks. Replace celery
in the BASEDIR
path with your system username.
from celery import Celery
import urllib.request
import os
# Directory to store downloaded files
BASEDIR = "/home/demo1/downloaded_files"
# Create the Celery app and set the broker location (RabbitMQ)
app = Celery('celeryapp',
backend='rpc://',
broker='pyamqp://guest@localhost//')
@app.task
def download(url, filename):
"""
Download a page and save it to the BASEDIR directory
:param url: URL to download
:param filename: Filename to save the URL content in BASEDIR
"""
response = urllib.request.urlopen(url)
data = response.read()
with open(os.path.join(BASEDIR, filename), 'wb') as file:
file.write(data)
@app.task
def list_files():
"""Return an array of all downloaded files."""
return os.listdir(BASEDIR)
Explanation of the Code
-
Creating the Celery App: The line
app = Celery('downloaded_files', backend='rpc://', broker='pyamqp://guest@localhost//')
does the following:-
Name: Creates a Celery application named
celeryapp
. -
Broker: Specifies the broker (RabbitMQ) on
localhost
using the AMQP protocol. -
Backend: Configures the response backend to store task return values. The
rpc
backend sends responses to a RabbitMQ queue following a Remote Procedure Call (RPC) pattern.
-
-
Task Functions:
download
: Downloads content from a URL and saves it to a specified file inBASEDIR
.list_files
: Lists all files inBASEDIR
.
Running the Celery Worker
To start the Celery worker and process tasks, run the following command within virtual environment.:
source celeryenv/bin/activate
celery -A celeryapp worker
You can also start worker in debug mode by running below command:
celery -A celeryapp worker --loglevel=debug
This command initializes the Celery worker with the celeryapp
module, enabling it to execute tasks defined in celeryapp.py
.
Start the Workers as Daemons
1. Create the Service Definition File
Using sudo
, create a new service definition file named celeryd.service
in the directory /etc/systemd/system/
. You can use a text editor like nano
or vim
to create and edit the file:
sudo nano /etc/systemd/system/celeryd.service
2. Edit the Service File
Inside the celeryd.service
file, add the following configuration. Remember to replace demo1
with your actual user and group names.
[Unit]
Description=Celery Daemon
After=network.target
[Service]
Type=forking
User=demo1
Group=demo1
Environment="PATH=/home/demo1/celeryenv/bin"
WorkingDirectory=/home/demo1/celeryapp
ExecStart=/home/demo1/celeryenv/bin/celery multi start worker1 -A celeryapp --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n.log
ExecStop=/home/demo1/celeryenv/bin/celery multi stopwait worker1 --pidfile=/var/run/celery/%n.pid
[Install]
WantedBy=multi-user.target
- Description: Describes the service.
- After: Specifies that the service should start after the network is up.
- Type: Indicates the forking type.
- User and Group: Specify the user and group under which the Celery workers should run.
- Environment: Sets the environment variables, including the Python virtual environment path.
- WorkingDirectory: Points to the directory where your Celery application is located.
- ExecStart: Command to start the Celery workers using the celery multi start command.
- ExecStop: Command to stop the Celery workers using the celery multi stopwait command.
- WantedBy: Specifies the target that should be reached for this service to start.
3. Save the changes in the editor and close the file.
4. Create log folder and assign necessary permission:
sudo mkdir -p /var/log/celery /var/run/celery
sudo chown demo1:demo1 /var/log/celery /var/run/celery
5. Reload systemd
After creating or modifying a systemd service file, it's essential to reload systemd for the changes to take effect:
sudo systemctl daemon-reload
5. Start the Celery Daemon Service
sudo systemctl start celeryd
Run below command to check service status:
sudo systemctl status celeryd
6. Enable Autostart (Optional)
If you want the Celery daemon service to start automatically at server boot, enable it:
sudo systemctl enable celeryd
Now your Celery workers should be daemonized and automatically started on server startup. Adjust the paths, usernames, and group names as per your actual setup.
Monitor your Celery Cluster
Monitoring is essential for maintaining performance and identifying issues. Here are some commands you can use with the celery binary to monitor your Celery workers and tasks:
Check Worker Status
Use the status command to get a list of workers and their status:
celery -A celeryapp status
This command will display the status of each worker, indicating whether they are operational (OK) or if there are any issues.
Inspect Active Tasks
The inspect active command allows you to see what tasks each worker is currently executing:
celery -A celeryapp inspect active
This command provides insight into the active tasks on each worker, helping you track progress and identify any potential bottlenecks.
Get Worker Statistics
Utilize the inspect stats command to gather statistics about your Celery workers, including resource usage and completed tasks:
celery -A celeryapp inspect stats
This command provides comprehensive information such as worker resource usage (rusage
key) and the total number of completed tasks (total
key). It's useful for performance monitoring and capacity planning.
Monitor a Celery Cluster with Flower
To monitor a Celery Cluster with Flower, you can follow these steps:
1. Begin by installing Flower within the virtual environment
pip install wheel flower
2. If you're using a firewall, ensure that the firewall is open on Flower's default port 5555. To allow traffic on port 5555, run:
sudo ufw allow 5555/tcp
To verify that the rule has been added, you can list the current rules:
sudo ufw status
This should display an entry for port 5555 allowing TCP traffic.
3. Navigate to your Celery app directory. For example:
cd /home/celery/celeryapp
4. Start Flower using the following command. You can change the port using the --port
flag if needed:
celery -A celeryapp flower --port=5555
5. Access the Flower dashboard by opening your browser and going to localhost:5555.
Conclusion
Celery is not just about executing tasks; it empowers you to create complex workflows, group tasks efficiently, and integrate seamlessly across different programming languages using webhooks. Its advanced features like task chaining, scheduling, result storage, and real-time monitoring make it a powerful tool for building scalable and distributed systems. While this introduction covers the basics, there's a lot more to explore in Celery for maximizing productivity in asynchronous task handling.