Log In

Setting Up a Task Queue with Celery and RabbitMQ on Ubuntu 20.04

Setting Up a Task Queue with Celery and RabbitMQ on Ubuntu 20.04
30.05.2024
Reading time: 8 min
Hostman Team
Technical writer

Task queues are an essential component in distributed systems, enabling efficient management and execution of asynchronous tasks across multiple nodes or workers. This tutorial will guide you through setting up a task queue using Celery, a powerful Python task-queue system, along with RabbitMQ, a robust message broker.

Celery and RabbitMQ

Celery is a powerful and versatile Python task-queue system designed to streamline and manage asynchronous task execution. It serves as a robust framework for distributing tasks among workers, facilitating parallel processing and enhancing overall system scalability. With Celery, developers can orchestrate complex workflows, schedule tasks, and handle task priorities effectively, making it an essential tool for building responsive and efficient applications.

On the other hand, RabbitMQ stands as a reliable message broker that plays a pivotal role in coordinating communication between Celery tasks and workers. As a high-performance messaging platform, RabbitMQ efficiently manages task queues, ensuring seamless message delivery and optimal utilization of resources. Its advanced features, such as message routing, fault tolerance, and clustering, make RabbitMQ a preferred choice for building robust distributed systems that rely on asynchronous task processing.

Together, Celery and RabbitMQ form a potent combination, empowering developers to create scalable, responsive, and fault-tolerant applications capable of handling complex workflows and high volumes of tasks with ease.

Prerequisites

Before starting, ensure you have:

  • A cloud server with Ubuntu 20.04 installed.
  • Set hostname of the server
  • A non-root user with sudo privileges
  • Basic knowledge of Python and terminal commands.

Installing RabbitMQ

1. Update your package index:

sudo apt update

Image4

2. Install RabbitMQ:

sudo apt install rabbitmq-server

sudo rabbitmq-plugins enable rabbitmq_management

Image5

3. Enable and start the RabbitMQ service:

sudo systemctl enable rabbitmq-server && sudo systemctl start rabbitmq-server

Image8

sudo systemctl status rabbitmq-server

Image7

Setting Up Celery

Install pip if it is not yet installed:

sudo apt install python3-pip

Create a Python virtual environment. If virtual environment is not install use below command to install

sudo apt install python3.8-venv

Then run:

python -m venv celeryenv

Activate the virtual environment:

source celeryenv/bin/activate

Image2

If at any time you need to deactivate the environment (not now), you can type:

deactivate

Install Celery within the virtual environment:

pip install celery

Writing a Celery Application

A Celery application consists of two main components:

  1. Workers: These wait for messages from RabbitMQ and execute the tasks.
  2. Clients: These submit messages to RabbitMQ to trigger task execution and retrieve the result later.

Tasks are defined in a module used by both workers and clients. Workers execute the task code, while clients use the function definitions to expose tasks and abstract RabbitMQ publishing complexity.

Step 1. Create Directories

Create a directory for the Python module (celeryapp) and another for storing downloaded files (downloaded_files):

mkdir ~/downloaded_files  ~/celeryapp
cd ~/celeryapp

Step 2. Create the Module

Create a module file celeryapp.py containing two functions (download and list) that will serve as asynchronous tasks. Replace celery in the BASEDIR path with your system username.

 from celery import Celery
   import urllib.request
   import os

   # Directory to store downloaded files
   BASEDIR = "/home/demo1/downloaded_files"

   # Create the Celery app and set the broker location (RabbitMQ)
   app = Celery('celeryapp',
                backend='rpc://',
                broker='pyamqp://guest@localhost//')

   @app.task
   def download(url, filename):
       """
       Download a page and save it to the BASEDIR directory
       :param url: URL to download
       :param filename: Filename to save the URL content in BASEDIR
       """
       response = urllib.request.urlopen(url)
       data = response.read()
       with open(os.path.join(BASEDIR, filename), 'wb') as file:
           file.write(data)

   @app.task
   def list_files():
       """Return an array of all downloaded files."""
       return os.listdir(BASEDIR)

Explanation of the Code

  • Creating the Celery App: The line app = Celery('downloaded_files', backend='rpc://', broker='pyamqp://guest@localhost//') does the following:

    • Name: Creates a Celery application named celeryapp.

    • Broker: Specifies the broker (RabbitMQ) on localhost using the AMQP protocol.

    • Backend: Configures the response backend to store task return values. The rpc backend sends responses to a RabbitMQ queue following a Remote Procedure Call (RPC) pattern.

  • Task Functions:

    • download: Downloads content from a URL and saves it to a specified file in BASEDIR.
    • list_files: Lists all files in BASEDIR.

Running the Celery Worker

To start the Celery worker and process tasks, run the following command within virtual environment.:

source celeryenv/bin/activate
celery -A celeryapp worker

You can also start worker in debug mode by running below command:

celery -A celeryapp worker --loglevel=debug

This command initializes the Celery worker with the celeryapp module, enabling it to execute tasks defined in celeryapp.py.

Start the Workers as Daemons

1. Create the Service Definition File

Using sudo, create a new service definition file named celeryd.service in the directory /etc/systemd/system/. You can use a text editor like nano or vim to create and edit the file:

sudo nano /etc/systemd/system/celeryd.service

2. Edit the Service File

Inside the celeryd.service file, add the following configuration. Remember to replace demo1 with your actual user and group names.

[Unit]
Description=Celery Daemon
After=network.target

[Service]
Type=forking
User=demo1
Group=demo1
Environment="PATH=/home/demo1/celeryenv/bin"
WorkingDirectory=/home/demo1/celeryapp
ExecStart=/home/demo1/celeryenv/bin/celery multi start worker1 -A celeryapp --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n.log
ExecStop=/home/demo1/celeryenv/bin/celery multi stopwait worker1 --pidfile=/var/run/celery/%n.pid

[Install]
WantedBy=multi-user.target

  • Description: Describes the service.
  • After: Specifies that the service should start after the network is up.
  • Type: Indicates the forking type.
  • User and Group: Specify the user and group under which the Celery workers should run.
  • Environment: Sets the environment variables, including the Python virtual environment path.
  • WorkingDirectory: Points to the directory where your Celery application is located.
  • ExecStart: Command to start the Celery workers using the celery multi start command.
  • ExecStop: Command to stop the Celery workers using the celery multi stopwait command.
  • WantedBy: Specifies the target that should be reached for this service to start.

3. Save the changes in the editor and close the file.

4. Create log folder and assign necessary permission:

sudo mkdir -p /var/log/celery /var/run/celery
sudo chown demo1:demo1 /var/log/celery /var/run/celery

5. Reload systemd

After creating or modifying a systemd service file, it's essential to reload systemd for the changes to take effect:

sudo systemctl daemon-reload

5. Start the Celery Daemon Service

sudo systemctl start celeryd

Run below command to check service status:

sudo systemctl status celeryd

6. Enable Autostart (Optional)

If you want the Celery daemon service to start automatically at server boot, enable it:

sudo systemctl enable celeryd

Now your Celery workers should be daemonized and automatically started on server startup. Adjust the paths, usernames, and group names as per your actual setup.

Monitor your Celery Cluster

Monitoring is essential for maintaining performance and identifying issues. Here are some commands you can use with the celery binary to monitor your Celery workers and tasks:

Check Worker Status

Use the status command to get a list of workers and their status:

celery -A celeryapp status

This command will display the status of each worker, indicating whether they are operational (OK) or if there are any issues.

Inspect Active Tasks

The inspect active command allows you to see what tasks each worker is currently executing:

celery -A celeryapp inspect active

This command provides insight into the active tasks on each worker, helping you track progress and identify any potential bottlenecks.

Get Worker Statistics

Utilize the inspect stats command to gather statistics about your Celery workers, including resource usage and completed tasks:

celery -A celeryapp inspect stats

This command provides comprehensive information such as worker resource usage (rusage key) and the total number of completed tasks (total key). It's useful for performance monitoring and capacity planning.

Monitor a Celery Cluster with Flower

To monitor a Celery Cluster with Flower, you can follow these steps:

1. Begin by installing Flower within the virtual environment

pip install wheel flower

2. If you're using a firewall, ensure that the firewall is open on Flower's default port 5555. To allow traffic on port 5555, run:

sudo ufw allow 5555/tcp

To verify that the rule has been added, you can list the current rules:

sudo ufw status

This should display an entry for port 5555 allowing TCP traffic.

3. Navigate to your Celery app directory. For example:

cd /home/celery/celeryapp

4. Start Flower using the following command. You can change the port using the --port flag if needed:

celery -A celeryapp flower --port=5555

5. Access the Flower dashboard by opening your browser and going to localhost:5555.

Image3

Conclusion

Celery is not just about executing tasks; it empowers you to create complex workflows, group tasks efficiently, and integrate seamlessly across different programming languages using webhooks. Its advanced features like task chaining, scheduling, result storage, and real-time monitoring make it a powerful tool for building scalable and distributed systems. While this introduction covers the basics, there's a lot more to explore in Celery for maximizing productivity in asynchronous task handling.


Share