Task queues are an essential component in distributed systems, enabling efficient management and execution of asynchronous tasks across multiple nodes or workers. This tutorial will guide you through setting up a task queue using Celery, a powerful Python task-queue system, along with RabbitMQ, a robust message broker.
Celery is a powerful and versatile Python task-queue system designed to streamline and manage asynchronous task execution. It serves as a robust framework for distributing tasks among workers, facilitating parallel processing and enhancing overall system scalability. With Celery, developers can orchestrate complex workflows, schedule tasks, and handle task priorities effectively, making it an essential tool for building responsive and efficient applications.
On the other hand, RabbitMQ stands as a reliable message broker that plays a pivotal role in coordinating communication between Celery tasks and workers. As a high-performance messaging platform, RabbitMQ efficiently manages task queues, ensuring seamless message delivery and optimal utilization of resources. Its advanced features, such as message routing, fault tolerance, and clustering, make RabbitMQ a preferred choice for building robust distributed systems that rely on asynchronous task processing.
Together, Celery and RabbitMQ form a potent combination, empowering developers to create scalable, responsive, and fault-tolerant applications capable of handling complex workflows and high volumes of tasks with ease.
Before starting, ensure you have:
sudo
privileges1. Update your package index:
sudo apt update
2. Install RabbitMQ:
sudo apt install rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
3. Enable and start the RabbitMQ service:
sudo systemctl enable rabbitmq-server && sudo systemctl start rabbitmq-server
sudo systemctl status rabbitmq-server
Install pip if it is not yet installed:
sudo apt install python3-pip
Create a Python virtual environment. If virtual environment is not install use below command to install
sudo apt install python3.8-venv
Then run:
python -m venv celeryenv
Activate the virtual environment:
source celeryenv/bin/activate
If at any time you need to deactivate the environment (not now), you can type:
deactivate
Install Celery within the virtual environment:
pip install celery
A Celery application consists of two main components:
Tasks are defined in a module used by both workers and clients. Workers execute the task code, while clients use the function definitions to expose tasks and abstract RabbitMQ publishing complexity.
Create a directory for the Python module (celeryapp
) and another for storing downloaded files (downloaded_files
):
mkdir ~/downloaded_files ~/celeryapp
cd ~/celeryapp
Create a module file celeryapp.py
containing two functions (download
and list
) that will serve as asynchronous tasks. Replace celery
in the BASEDIR
path with your system username.
from celery import Celery
import urllib.request
import os
# Directory to store downloaded files
BASEDIR = "/home/demo1/downloaded_files"
# Create the Celery app and set the broker location (RabbitMQ)
app = Celery('celeryapp',
backend='rpc://',
broker='pyamqp://guest@localhost//')
@app.task
def download(url, filename):
"""
Download a page and save it to the BASEDIR directory
:param url: URL to download
:param filename: Filename to save the URL content in BASEDIR
"""
response = urllib.request.urlopen(url)
data = response.read()
with open(os.path.join(BASEDIR, filename), 'wb') as file:
file.write(data)
@app.task
def list_files():
"""Return an array of all downloaded files."""
return os.listdir(BASEDIR)
Explanation of the Code
Creating the Celery App: The line app = Celery('downloaded_files', backend='rpc://', broker='pyamqp://guest@localhost//')
does the following:
Name: Creates a Celery application named celeryapp
.
Broker: Specifies the broker (RabbitMQ) on localhost
using the AMQP protocol.
Backend: Configures the response backend to store task return values. The rpc
backend sends responses to a RabbitMQ queue following a Remote Procedure Call (RPC) pattern.
Task Functions:
download
: Downloads content from a URL and saves it to a specified file in BASEDIR
.list_files
: Lists all files in BASEDIR
.To start the Celery worker and process tasks, run the following command within virtual environment.:
source celeryenv/bin/activate
celery -A celeryapp worker
You can also start worker in debug mode by running below command:
celery -A celeryapp worker --loglevel=debug
This command initializes the Celery worker with the celeryapp
module, enabling it to execute tasks defined in celeryapp.py
.
1. Create the Service Definition File
Using sudo
, create a new service definition file named celeryd.service
in the directory /etc/systemd/system/
. You can use a text editor like nano
or vim
to create and edit the file:
sudo nano /etc/systemd/system/celeryd.service
2. Edit the Service File
Inside the celeryd.service
file, add the following configuration. Remember to replace demo1
with your actual user and group names.
[Unit]
Description=Celery Daemon
After=network.target
[Service]
Type=forking
User=demo1
Group=demo1
Environment="PATH=/home/demo1/celeryenv/bin"
WorkingDirectory=/home/demo1/celeryapp
ExecStart=/home/demo1/celeryenv/bin/celery multi start worker1 -A celeryapp --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n.log
ExecStop=/home/demo1/celeryenv/bin/celery multi stopwait worker1 --pidfile=/var/run/celery/%n.pid
[Install]
WantedBy=multi-user.target
3. Save the changes in the editor and close the file.
4. Create log folder and assign necessary permission:
sudo mkdir -p /var/log/celery /var/run/celery
sudo chown demo1:demo1 /var/log/celery /var/run/celery
5. Reload systemd
After creating or modifying a systemd service file, it's essential to reload systemd for the changes to take effect:
sudo systemctl daemon-reload
5. Start the Celery Daemon Service
sudo systemctl start celeryd
Run below command to check service status:
sudo systemctl status celeryd
6. Enable Autostart (Optional)
If you want the Celery daemon service to start automatically at server boot, enable it:
sudo systemctl enable celeryd
Now your Celery workers should be daemonized and automatically started on server startup. Adjust the paths, usernames, and group names as per your actual setup.
Monitoring is essential for maintaining performance and identifying issues. Here are some commands you can use with the celery binary to monitor your Celery workers and tasks:
Use the status command to get a list of workers and their status:
celery -A celeryapp status
This command will display the status of each worker, indicating whether they are operational (OK) or if there are any issues.
The inspect active command allows you to see what tasks each worker is currently executing:
celery -A celeryapp inspect active
This command provides insight into the active tasks on each worker, helping you track progress and identify any potential bottlenecks.
Utilize the inspect stats command to gather statistics about your Celery workers, including resource usage and completed tasks:
celery -A celeryapp inspect stats
This command provides comprehensive information such as worker resource usage (rusage
key) and the total number of completed tasks (total
key). It's useful for performance monitoring and capacity planning.
To monitor a Celery Cluster with Flower, you can follow these steps:
1. Begin by installing Flower within the virtual environment
pip install wheel flower
2. If you're using a firewall, ensure that the firewall is open on Flower's default port 5555. To allow traffic on port 5555, run:
sudo ufw allow 5555/tcp
To verify that the rule has been added, you can list the current rules:
sudo ufw status
This should display an entry for port 5555 allowing TCP traffic.
3. Navigate to your Celery app directory. For example:
cd /home/celery/celeryapp
4. Start Flower using the following command. You can change the port using the --port
flag if needed:
celery -A celeryapp flower --port=5555
5. Access the Flower dashboard by opening your browser and going to localhost:5555.
Celery is not just about executing tasks; it empowers you to create complex workflows, group tasks efficiently, and integrate seamlessly across different programming languages using webhooks. Its advanced features like task chaining, scheduling, result storage, and real-time monitoring make it a powerful tool for building scalable and distributed systems. While this introduction covers the basics, there's a lot more to explore in Celery for maximizing productivity in asynchronous task handling.