Setting Up a Task Queue with Celery and RabbitMQ on Ubuntu 20.04

Setting Up a Task Queue with Celery and RabbitMQ on Ubuntu 20.04
Shahid Ali
Technical writer
RabbitMQ
30.05.2024
Reading time: 8 min

Task queues are an essential component in distributed systems, enabling efficient management and execution of asynchronous tasks across multiple nodes or workers. This tutorial will guide you through setting up a task queue using Celery, a powerful Python task-queue system, along with RabbitMQ, a robust message broker.

Celery and RabbitMQ

Celery is a powerful and versatile Python task-queue system designed to streamline and manage asynchronous task execution. It serves as a robust framework for distributing tasks among workers, facilitating parallel processing and enhancing overall system scalability. With Celery, developers can orchestrate complex workflows, schedule tasks, and handle task priorities effectively, making it an essential tool for building responsive and efficient applications.

On the other hand, RabbitMQ stands as a reliable message broker that plays a pivotal role in coordinating communication between Celery tasks and workers. As a high-performance messaging platform, RabbitMQ efficiently manages task queues, ensuring seamless message delivery and optimal utilization of resources. Its advanced features, such as message routing, fault tolerance, and clustering, make RabbitMQ a preferred choice for building robust distributed systems that rely on asynchronous task processing.

Together, Celery and RabbitMQ form a potent combination, empowering developers to create scalable, responsive, and fault-tolerant applications capable of handling complex workflows and high volumes of tasks with ease.

Prerequisites

Before starting, ensure you have:

  • A cloud server with Ubuntu 20.04 installed.
  • Set hostname of the server
  • A non-root user with sudo privileges
  • Basic knowledge of Python and terminal commands.

Installing RabbitMQ

1. Update your package index:

sudo apt update

Image4

2. Install RabbitMQ:

sudo apt install rabbitmq-server

sudo rabbitmq-plugins enable rabbitmq_management

Image5

3. Enable and start the RabbitMQ service:

sudo systemctl enable rabbitmq-server && sudo systemctl start rabbitmq-server

Image8

sudo systemctl status rabbitmq-server

Image7

Setting Up Celery

Install pip if it is not yet installed:

sudo apt install python3-pip

Create a Python virtual environment. If virtual environment is not install use below command to install

sudo apt install python3.8-venv

Then run:

python -m venv celeryenv

Activate the virtual environment:

source celeryenv/bin/activate

Image2

If at any time you need to deactivate the environment (not now), you can type:

deactivate

Install Celery within the virtual environment:

pip install celery

Writing a Celery Application

A Celery application consists of two main components:

  1. Workers: These wait for messages from RabbitMQ and execute the tasks.
  2. Clients: These submit messages to RabbitMQ to trigger task execution and retrieve the result later.

Tasks are defined in a module used by both workers and clients. Workers execute the task code, while clients use the function definitions to expose tasks and abstract RabbitMQ publishing complexity.

Step 1. Create Directories

Create a directory for the Python module (celeryapp) and another for storing downloaded files (downloaded_files):

mkdir ~/downloaded_files  ~/celeryapp
cd ~/celeryapp

Step 2. Create the Module

Create a module file celeryapp.py containing two functions (download and list) that will serve as asynchronous tasks. Replace celery in the BASEDIR path with your system username.

 from celery import Celery
   import urllib.request
   import os

   # Directory to store downloaded files
   BASEDIR = "/home/demo1/downloaded_files"

   # Create the Celery app and set the broker location (RabbitMQ)
   app = Celery('celeryapp',
                backend='rpc://',
                broker='pyamqp://guest@localhost//')

   @app.task
   def download(url, filename):
       """
       Download a page and save it to the BASEDIR directory
       :param url: URL to download
       :param filename: Filename to save the URL content in BASEDIR
       """
       response = urllib.request.urlopen(url)
       data = response.read()
       with open(os.path.join(BASEDIR, filename), 'wb') as file:
           file.write(data)

   @app.task
   def list_files():
       """Return an array of all downloaded files."""
       return os.listdir(BASEDIR)

Explanation of the Code

  • Creating the Celery App: The line app = Celery('downloaded_files', backend='rpc://', broker='pyamqp://guest@localhost//') does the following:

    • Name: Creates a Celery application named celeryapp.

    • Broker: Specifies the broker (RabbitMQ) on localhost using the AMQP protocol.

    • Backend: Configures the response backend to store task return values. The rpc backend sends responses to a RabbitMQ queue following a Remote Procedure Call (RPC) pattern.

  • Task Functions:

    • download: Downloads content from a URL and saves it to a specified file in BASEDIR.
    • list_files: Lists all files in BASEDIR.

Running the Celery Worker

To start the Celery worker and process tasks, run the following command within virtual environment.:

source celeryenv/bin/activate
celery -A celeryapp worker

You can also start worker in debug mode by running below command:

celery -A celeryapp worker --loglevel=debug

This command initializes the Celery worker with the celeryapp module, enabling it to execute tasks defined in celeryapp.py.

Start the Workers as Daemons

1. Create the Service Definition File

Using sudo, create a new service definition file named celeryd.service in the directory /etc/systemd/system/. You can use a text editor like nano or vim to create and edit the file:

sudo nano /etc/systemd/system/celeryd.service

2. Edit the Service File

Inside the celeryd.service file, add the following configuration. Remember to replace demo1 with your actual user and group names.

[Unit]
Description=Celery Daemon
After=network.target

[Service]
Type=forking
User=demo1
Group=demo1
Environment="PATH=/home/demo1/celeryenv/bin"
WorkingDirectory=/home/demo1/celeryapp
ExecStart=/home/demo1/celeryenv/bin/celery multi start worker1 -A celeryapp --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n.log
ExecStop=/home/demo1/celeryenv/bin/celery multi stopwait worker1 --pidfile=/var/run/celery/%n.pid

[Install]
WantedBy=multi-user.target

  • Description: Describes the service.
  • After: Specifies that the service should start after the network is up.
  • Type: Indicates the forking type.
  • User and Group: Specify the user and group under which the Celery workers should run.
  • Environment: Sets the environment variables, including the Python virtual environment path.
  • WorkingDirectory: Points to the directory where your Celery application is located.
  • ExecStart: Command to start the Celery workers using the celery multi start command.
  • ExecStop: Command to stop the Celery workers using the celery multi stopwait command.
  • WantedBy: Specifies the target that should be reached for this service to start.

3. Save the changes in the editor and close the file.

4. Create log folder and assign necessary permission:

sudo mkdir -p /var/log/celery /var/run/celery
sudo chown demo1:demo1 /var/log/celery /var/run/celery

5. Reload systemd

After creating or modifying a systemd service file, it's essential to reload systemd for the changes to take effect:

sudo systemctl daemon-reload

5. Start the Celery Daemon Service

sudo systemctl start celeryd

Run below command to check service status:

sudo systemctl status celeryd

6. Enable Autostart (Optional)

If you want the Celery daemon service to start automatically at server boot, enable it:

sudo systemctl enable celeryd

Now your Celery workers should be daemonized and automatically started on server startup. Adjust the paths, usernames, and group names as per your actual setup.

Monitor your Celery Cluster

Monitoring is essential for maintaining performance and identifying issues. Here are some commands you can use with the celery binary to monitor your Celery workers and tasks:

Check Worker Status

Use the status command to get a list of workers and their status:

celery -A celeryapp status

This command will display the status of each worker, indicating whether they are operational (OK) or if there are any issues.

Inspect Active Tasks

The inspect active command allows you to see what tasks each worker is currently executing:

celery -A celeryapp inspect active

This command provides insight into the active tasks on each worker, helping you track progress and identify any potential bottlenecks.

Get Worker Statistics

Utilize the inspect stats command to gather statistics about your Celery workers, including resource usage and completed tasks:

celery -A celeryapp inspect stats

This command provides comprehensive information such as worker resource usage (rusage key) and the total number of completed tasks (total key). It's useful for performance monitoring and capacity planning.

Monitor a Celery Cluster with Flower

To monitor a Celery Cluster with Flower, you can follow these steps:

1. Begin by installing Flower within the virtual environment

pip install wheel flower

2. If you're using a firewall, ensure that the firewall is open on Flower's default port 5555. To allow traffic on port 5555, run:

sudo ufw allow 5555/tcp

To verify that the rule has been added, you can list the current rules:

sudo ufw status

This should display an entry for port 5555 allowing TCP traffic.

3. Navigate to your Celery app directory. For example:

cd /home/celery/celeryapp

4. Start Flower using the following command. You can change the port using the --port flag if needed:

celery -A celeryapp flower --port=5555

5. Access the Flower dashboard by opening your browser and going to localhost:5555.

Image3

Conclusion

Celery is not just about executing tasks; it empowers you to create complex workflows, group tasks efficiently, and integrate seamlessly across different programming languages using webhooks. Its advanced features like task chaining, scheduling, result storage, and real-time monitoring make it a powerful tool for building scalable and distributed systems. While this introduction covers the basics, there's a lot more to explore in Celery for maximizing productivity in asynchronous task handling.

RabbitMQ
30.05.2024
Reading time: 8 min

Similar

Microservices

How to Install, Configure, and Use RabbitMQ

Message brokers are intermediary applications used in various software architectures, such as microservices. They transfer information in the form of messages from one service to another. A common concept associated with message brokers is the "publisher-subscriber" pattern, where events are sent from so-called "publishers" (Producers) to consumers (Consumers). Typically, an intermediary component—a message broker—participates in implementing this pattern. To simplify, let’s use an analogy. Think of YouTube: there are channels you can subscribe to for notifications. When a new video is published (an event), you’re notified because you’re subscribed. How Applications Work with Message Brokers Three main terms are relevant here: Exchange, Queue, and Binding. Exchange. This is where message events are sent. The Exchange decides which events go to which queue based on connections—bindings—which we’ll discuss shortly. Queue. The queue is a data structure that operates on a "first in, first out" (FIFO) basis. It stores data on disk or in memory, directly representing the messages. The queue provides copies of these messages to consumers. Binding. Binding refers to the set of rules that tell the Exchange which queues messages should be routed to. There can be multiple bindings with different parameters between an Exchange and a Queue. Other important terms are: Producer is a service that generates messages and sends them to the broker. Consumer is another service that begins processing it upon receiving a message. In our YouTube analogy, the "publisher" is YouTube itself, and the "consumer" is your phone, receiving push notifications. Here’s a step-by-step breakdown of how the process works: The Producer sends a message, which goes to the Exchange. The Queue stores this message, for instance, on disk. When a consumer is ready to receive a message (either because it has finished processing the previous message or the event-handling application has started), the server sends a copy of the data from the queue to the consumer. The Consumer receives the message, processes it, and then sends an acknowledgment (ACK) back to the broker. After the broker receives the acknowledgement, the message is deleted. What is RabbitMQ? RabbitMQ is a popular open-source message broker built on the AMQP (Advanced Message Queuing Protocol), an open protocol for transmitting event messages through a dedicated broker. AMQP provides a wide range of capabilities, and besides RabbitMQ, it’s implemented by other systems like Apache Qpid. RabbitMQ is written in the Erlang programming language, and its main advantages are its high throughput and maximum routing flexibility. RabbitMQ offers numerous configuration options for defining rules on which messages are sent where for processing. Connections to RabbitMQ are made over the TCP protocol: the client initiates a connection to the host address and keeps this connection open for as long as interaction with RabbitMQ is needed. RabbitMQ also supports authentication, such as login and password, for secure access. Installation on Different Operating Systems RabbitMQ consists of the server and the WebUI (admin interface). The WebUI allows you to monitor what’s happening within the broker, such as checking if the nodes in the cluster are active, seeing how many messages are in process, and more. Let’s go over how to install RabbitMQ on various operating systems. Before we begin, we need to update the server hostname. Run the command: sudo hostnamectl set-hostname <new hostname> For example, we will use rabbitmq as a hostname for this server. sudo hostnamectl set-hostname rabbitmq Install nano: sudo apt install nano And enter your new hostname in the /etc/hosts file: sudo nano /etc/hosts So it looks like this: 127.0.0.1 <new hostname> In our example: 127.0.0.1 rabbitmq Ubuntu / Debian We will be installing RabbitMQ on Ubuntu 22.04. For other Ubuntu/Debian distributions, check the guide on the official website.  Update the package list and install the dependencies: sudo apt-get update -ysudo apt-get install curl gnupg -ysudo apt-get install apt-transport-https Add repository signing keys: curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null Add a repository (Apt Source List) file. sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.rabbitmq.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.rabbitmq.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.rabbitmq.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.rabbitmq.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.rabbitmq.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.rabbitmq.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.rabbitmq.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.rabbitmq.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main EOF Update the package list yet again: sudo apt-get update -y Install Erlang: sudo apt-get install -y erlang-base \ erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \ erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \ erlang-runtime-tools erlang-snmp erlang-ssl \ erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl Finally, install RabbitMQ: sudo apt-get install rabbitmq-server -y --fix-missing After installation is complete, you can check that rabbitmq-server is up and running: systemctl status rabbitmq-server Create a custom user: sudo rabbitmqctl add_user <your username> <your password> And give the necessary permissions: sudo rabbitmqctl set_user_tags  <your username> administratorsudo rabbitmqctl set_permissions -p /  <your username> ".*" ".*" ".*" Now you can enable RabbitMQ Management Console: sudo rabbitmq-plugins enable rabbitmq_management And visit <your server IP>:15672. Enter your username and password and access the RabbitMQ web UI. Windows You can find the installation guide on the RabbitMQ website. Basically, you can either use Chocolatey or a simple installer.  However, before installing RabbitMQ, we first need to install Erlang/OTP from the official Erlang website. Run the downloaded file and go through the installation process. After installing Erlang, we can install RabbitMQ, for example, using the installer from the official website. Next, let's enable the WebUI: cd 'C:\Program Files\RabbitMQ Server\rabbitmq_server-3.13.0\sbin'./rabbitmq-plugins.bat enable rabbitmq_management Now, you can visit localhost:15672 and access the RabbitMQ web UI using the default username and password guest:guest. Docker Installing via Docker is probably the simplest and most convenient way to install RabbitMQ.  We can install RabbitMQ using the following docker run command: docker run --rm -p 15672:15672 rabbitmq:3.13.7-management The admin interface will be available on port 15672, where you can log in with the default username and password guest:guest. However, this method of running RabbitMQ is not suitable for a production environment. Docker assigns a server name to the container, and the message broker stores its state in a folder with that name. With every new container build, RabbitMQ will lose its state information. We can conveniently define all necessary service parameters with docker-compose, such as changing the default login and password, mounting a folder for state persistence, etc. Here's an example of what the docker-compose.yml file might look like (this can be created on your server where Docker is installed or on your local machine). version: "3.3" services: rabbit: image: rabbitmq:3.13.7-management environment: - RABBITMQ_DEFAULT_USER=admin #enter your username - RABBITMQ_DEFAULT_PASS=password #enter your password volumes: - ./rabbit:/var/lib/rabbitmq # mounting folder for state persistence ports: - 15672:15672 # expose port for the admin interface Run: docker compose up -d The RabbitMQ web UI is now available. Overview of RabbitMQ Management Let's take a look at the main features of the RabbitMQ management interface. On the main page, you can view general information such as nodes, their status, total message count, and more. Connections  is the list of connections to the cluster. Channels is the list of channels. Multiple channels can be created within a single connection through which messages are sent. This is done to avoid creating multiple TCP connections. Exchanges Let’s look more closely at Exchanges. There are different types of exchanges, each with its own message filtering mechanism. Based on these rules, events are routed to specific queues. Direct Exchange Each message has a key, called the Routing Key. These messages will be routed to queues where the binding to the Exchange specifies the same key. Topic Exchange Routing is done based on a pattern key. When creating the pattern, you can use 0 or more words (Latin letters in different cases and numbers) separated by dots (e.g., "key.event"), and the symbols # and *. Fanout Exchange No filtering rules are applied. Every message sent to a Fanout Exchange is routed to all queues. Headers Exchange Uses message headers and binding headers, comparing key-value pairs in those headers. Queues When viewing a specific queue, you can see a graph showing how many messages are in the queue, statistics about delivery times, and message acceptance times. Messages can have two statuses: Ready – waiting to be processed. Unacked – being processed by the consumer. Implementing Publisher-Subscriber in Python Let’s go through how to use RabbitMQ to implement the Pub-Sub pattern in Python. You should have Python installed on your system. This tutorial uses version 3.11.5 of Python. We will be using the Pika library.  Create or select a folder where the application code will reside. For example: mkdir rabbitmq-article Open this folder in your IDE. It can be Visual Studio Code, PyCharm, or other development environments. Install the Pika library by running the following command in the terminal: pip install pika Now, let's create two files: sender.py and receiver.py. sender.py code from pika import BlockingConnection, ConnectionParameters from pika.exchange_type import ExchangeType # Create connection, specifying parameters in the ConnectionParameters object connection = BlockingConnection(ConnectionParameters(host='localhost')) # Create a channel channel = connection.channel() # Declare an exchange channel.exchange_declare('new_exchange', ExchangeType.direct) # Declare a queue queue = channel.queue_declare(queue='new_queue') # Bind the queue to the exchange channel.queue_bind(exchange='new_exchange', queue='new_queue', routing_key='key') # Publish a message channel.basic_publish(exchange='new_exchange', routing_key='key', body='Hello World!') print("Message 'Hello World!' sent") connection.close() In the above sender.py code: We establish a connection to RabbitMQ on localhost. We declare a direct exchange called new_exchange. We declare a queue called new_queue. We bind the queue to the exchange using a routing key key. Finally, we publish the message 'Hello World!' to the exchange with the specified routing key. Now, let's implement the receiver.py to consume the message. receiver.py code from pika import BlockingConnection, ConnectionParameters from pika.exchange_type import ExchangeType import sys, os def main(): # Create connection connection = BlockingConnection(ConnectionParameters(host='localhost')) # Create channel channel = connection.channel() # Declare an exchange channel.exchange_declare('new_exchange', ExchangeType.direct) # Declare a queue queue = channel.queue_declare(queue='new_queue') # Bind the queue to the exchange channel.queue_bind(exchange='new_exchange', queue='new_queue', routing_key='key') # Function to handle incoming messages def handle(ch, method, properties, body): print(f"Received message: {body.decode()}") # Bind the callback function and queue channel.basic_consume(queue='new_queue', on_message_callback=handle, auto_ack=True) print('Waiting for messages. Press Ctrl+C to exit.') channel.start_consuming() if __name__ == '__main__': try: main() except KeyboardInterrupt: try: sys.exit(0) except SystemExit: os._exit(0) In the above receiver.py code: We establish a connection and channel to RabbitMQ, just as we did in the sender. We declare the direct exchange (new_exchange) and queue (new_queue). The queue is bound to the exchange using the routing key key. The handle function processes the message by printing it when received. The basic_consume method subscribes the callback function to the queue. We use start_consuming to keep the process waiting for messages indefinitely. Running the Code Start the receiver first. Open a terminal and run receiver.py: python receiver.py You should see the message: Waiting for messages. Press Ctrl+C to exit. Now, run the sender. Open a second terminal and run sender.py: python sender.py You should see the message: Message 'Hello World!' sent In the terminal where the receiver is running, you will see: Received message: Hello World! In RabbitMQ Management, you can go to the created exchange (new_exchange) and see that it is bound to the queue (new_queue) using the routing key (key). Conclusion In this article, we explored what message brokers are, how applications interact with them, and how to install and use RabbitMQ as a message broker. We successfully implemented a Publisher-Subscriber pattern using Python, RabbitMQ, and the Pika library, where the sender publishes messages to an exchange and the receiver consumes them.
08 November 2024 · 12 min to read

Do you have questions,
comments, or concerns?

Our professionals are available to assist you at any moment,
whether you need help or are just unsure of where to start.
Email us
Hostman's Support