Sam Clarke
Sam Clarke
1. Introduction to Celery
2. Adding Celery to your application
3. Celery in Production
4. Logging and Monitoring
Introducing Celery into your application is trade-off between performance and complexity.
Brokers & Backends
1. Introduction to Celery
2. Adding Celery to your application
3. Celery in Production
4. Logging and Monitoring
sudo apt-get install rabbitmq-server (redis)
pip install celery
#  app.py
from celery import Celery
celery = Celery(
   'app',
    broker="amqp://guest@localhost//",
    backend="redis://localhost:6379/0"
)
#  Note: in production, use a dedicated user/ vhost
#  with appropriate permissions for broker/ backend
#  app.py
from celery import Celery
celery = Celery()
celery.config_from_object('celery_config')
# celery_config.py import os # note settings are lower case since version >= 4.0 broker_url = os.environ['BROKER_URL'] result_backend = os.environ['BACKEND_URL'] worker_pool_restarts = True ...
celery worker -A app.celery -c 2
Example output from a Celery worker running locally.
Highlighted are the configuration and the registered tasks.
The Celery task decorator.
#  tasks.py
def send_email(user_id):
    user = User.objects.get(id=user_id)
    subject = 'Welcome, {}'.format(user.username)
    body = 'Hello from Celery'
    send_email(user.email, subject, body)
#  call task synchronously
send_email(user_id=1234)
#  tasks.py
from app import celery
@celery.task
def send_email(user_id):
    user = User.objects.get(id=user_id)
    subject = 'Welcome, {}'.format(user.username)
    body = 'Hello from Celery'
    send_email(user.email, subject, body)
#  call task asynchronously
send_email.delay(user_id=1234)
#  tasks.py
@celery.task
def send_email(user_id):
    user = User.objects.get(id=user_id)
    subject = 'Welcome, {}'.format(user.username)
    body = 'Hello from Celery'
    send_email(user.email, subject, body)
#  why not just pass in the user object?
send_email.delay(user_id=1234)
from app import celery
@celery.task
def add(x, y):
    return x + y
#  This is not what you may expect...
result = add.delay(5, 7)
from app import celery
@celery.task
def add(x, y):
    return x + y
#  store a reference to task AsyncResult
async_result = add.delay(5, 7)
#  get async result using the task's AsyncResult method
result = add.AsyncResult(async_result.id)
if result.ready():
    return result.get()
else:
    #  do something else
Possible default task states:
PENDING, STARTED, SUCCESS, FAILURE or REVOKED.
result = add.AsyncResult(async_result.id)
if result.state == 'SUCCESS':
    #  do something with the result
    return result.get()
else:
    #  do something else
    ...
Sometimes you need more detailed state messages.
#  This task specific status (filesize) external to Celery
#  'PROGRESS' is our custom state
...
percent_done = self._get_progress()
task.update_state(
    state='PROGRESS',
    meta={'progress': percent_done}
)
...
Here we don't rely on Celery's task status to let us know whether the task completed successfully.
user = User(
    username='foo', email='foo@bar.com', email_sent=False)
#  Note that we tell Celery to
#  ignore the result
@celery.task(ignore_result=True)
def send_email(user_id):
    user = User.objects.get(id=user_id)
    ...
    #  email sent, record state in User profile
    user.email_sent = True
    user.save()
This blog post: http://bit.ly/2eqd1DZ
.s() - shorthand for .signature()
from celery import chain
#  the second task takes result of first task and 10 as args
async_result = chain(
    add.s(5, 7),
    add.s(10)).delay()
#  this is the result of the chain (final task)
result_two = add.AsyncResult(async_result.id).result
#  we can traverse the task tree to get previous results
result_one = add.AsyncResult(async_result.parent.id).result
You probably don't want to wait for your unit tests to execute asynchronously.
1. Introduction to Celery
2. Adding Celery to your application
3. Celery in Production
4. Logging and Monitoring
description "celeryd"
start on runlevel [2345]
stop on runlevel [!2345]
setuid celery
setgid celery
respawn
chdir /path/to/app
script
    . /etc/environment
    export PATH=/path/to/.env/bin
    exec celery worker -A app.celery -c 4 -n worker1.%h
end script
1. Introduction to Celery
2. Adding Celery to your application
3. Celery in Production
4. Logging and Monitoring
import logging
from celery.utils.log import get_task_logger
#  Normal logging
handler = logging.StreamHandler()
logger.addHandler(handler)
#  CELERY logging
#  create a common logger for all of your tasks
celery_logger = get_task_logger(__name__)
@celery.task(ignore_result=True)
def send_email(user_id):
    ...
    celery_logger.info(
        'Email Sent to {}'.format(user.username))
pip install flower
celery flower -A app.celery
"RabbitMQ supports priorities since version 3.5.0, and the Redis transport emulates priority support."
In Production, it is best to route high-priority tasks to dedicated workers.
Even though it adds complexity, Celery is still one of my go-to tools for any new web app.
Even apps of modest size can benefit from a task queue.
@samclarkeg
github.com/samgclarke