Celery, Redis and the (in)famous email task example
Starting with Celery is easy. But before you take Celery to production, you have to spend a lot of time to really understand it.
Being able to run asynchronous tasks from your web application is in many cases a must have. One way to achieve this is to use Celery. There are many articles on the internet and some examples are given. I really liked Miguel Grinberg's posts about Celery. Very clear, thank you Miguel.
But Miguel's post and most other articles are just an introduction. When you start with Celery you suddenly realize that it is not easy. In fact, you better prepare yourself for a lot of learning before using it in production.
In this post I write some notes that came up when implementing Celery with Flask and Redis. Most of this is a summary of the documents present on the Celery website and other posts on the internet.
Many others have written about problems and how they solved them
Someone wrote that when running Celery in production your first priority is to have a plan of what to do if your tasks suddenly stop working. I want to add that this means that you first must know how to check if your broker, workers and tasks are running.
Search the internet for 'Celery problem' and you get an overwhelming amount of pages. Many of these problems are already covered by the Celery documentation, TL;DR. A number of problems when sending email and to how tackle them are covered in a nice post 'Working with Asynchronous Celery Tasks - lessons learned', see the links below. I also suggest you check the website distributedpython, see links below. There are a lot of good recipes here.
What is all this talk about acks_late and retries?
By default Celery will acknowledge tasks immediately. This means a task will be removed from the broker's queue just before it is started. You can change this behavior by specifying acks_late, per task or system wide. In this case Celery acknowledges a task only after successful completion.
Why is this important?
If a task is interrupted abruptly, then by default Celery will not retry the task. If acks_late was specified, the task is still in the queue and Celery will retry the task. In the first case you can still retry the task by specifying retry parameters like retry_kwargs.
So what's the difference?
This means that acks_late changes the delivery from at-most-once to at-least-once.
But acks_late may not work as expected
Here we must differentiate between several conditions:
- Graceful
The administrator stops the worker using the SIGTERM signal: kill <pid>.
task not acked - Abrupt
The administrator kills the worker using the SIGKILL signal: CTRL-C or using kill -9 <pid>,
or,
a system crash (out of memory, hardware failure) terminates the worker process.
- If the parent process is running:
task acked
- If the parent process is not running, e.g. power failure):
task not acked (probably)
In the first case the worker process is stopped gracefully and a running task that is stopped is not acked. This is because the administrator intended to (temporarily) stop task execution. Starting the worker again will start the task again.
In the second case with the parent process running, the parent process sees that the worker is lost, acks the task and sets the state to FAILURE. In case the administrator killed the worker, there was probably a good reason to do this. In case of a system crash, we probably do not want the task to be restarted. If the parent process is not running, for example because it suddenly died, then the task probably will not get acked.
Idempotency
Our tasks may run unexpectedly multiple times we must make sure that the result does not change. For example, if a task already sent an email, then running the same task again should not send this email again. This is called idempotency. Run multiple times without changing the result beyond the initial application (Wikipedia).
Acks_late versus retry
Assume a task is sending an email to an external service that is temporarily down. The task will try to connect but the connection will timeout. If we use the default behavior, Celery will mark the task failed. It will not be executed again.
But wait. Without acks_late we can also tell Celery to retry the task. What should you use? Depends. Read the documentation on the Celery website 'Should I use retry or acks_late?'.
How to make sure an email is send only once?
Assuming that the same task can run multiple times, we can only prevent sending an email two times or more by having a flag global to the tasks, outside the task. I already created a table send_email_table which holds records with (meta) data for every email. We just add a new (Boolean) column EmailHasBeenSent. This also means we must insert a new send_email_table record before (!) calling the task. Without using tasks my email send function looked like this:
class EmailSender:
def send(params):
# basic checks and insert the email parameters in a new send_email_table record
preprocess(...)
# perform checks if we can send the email using an allow list, deny list, etc.
block_check(...)
# construct actual email message to send
prepare(...)
# send the email to smtp provider
send_email()
After making changes to use a task for sending an email:
class EmailSender:
def sender_init_function(params):
# basic checks and insert the email parameters in a new send_email_table record
preprocess(...)
return send_email_table_record_id
def sender_send(send_email_table_record_id):
# perform checks if we can send the email using an allow list, deny list, etc.
block_check(...)
# construct actual email message to send
prepare(...)
# send the email to smtp provider
send_email()
And the code to send an email:
def send_email(**kwargs):
email_sender = EmailSender()
send_email_table_record_id = email_sender.sender_init(**kwargs)
# pass to task using apply_async
kwargs = {}
kwargs['send_email_table_record_id'] = send_email_table_record_id
task_email_sender_sender_send.apply_async(queue=celery_task_queue, kwargs=kwargs)
@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):
email_sender = EmailSender()
id = email_sender.sender_send(kwargs['send_email_table_record_id'])
The sender_send() method checks the flag EmailHasBeenSent in the record that can be retrieved using the id. If True, the email already has been sent and the task returns without doing anything. Otherwise, the task will send the email and sets this flag True after sending the email.
How about retries?
Let us assume that our SMTP provider is not available for some time. Celery makes it very easy way to retry a task in case of an exception. We can raise and catch the exception ourselves or use the more easy autoretry_for parameter. From the Celery website:
@celery.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):
if error_sending_email:
raise Exception(f'error sending email')
# done
For retries there are a few parameters you can use. My setting (during development):
retry_backoff = 2
retry_backoff_max = 10 * 60
retry_kwargs = {'max_retries': 5}
retry_backoff means that the first retry will delay 2 seconds, the second retry will delay 4 seconds, the next retries will delay 8, 16, 32, etc. seconds. retry_backoff_max specifies the maximum delay between task retries, in this case one hour.
max_retries is the maximum number of retries.
Redis and the visibility_timeout
Our email task is running a short time, maybe 10 seconds, even if the SMTP provider is down. But what happens when we have a task that is running a very long time? From the Celery website: 'The visibility timeout defines the number of seconds to wait for the worker to acknowledge the task before the message is redelivered to another worker.'. The default value of the visibility_timeout is one hour. This can become a problem if your long running task uses acks_late. If your task runs longer than the visibility_timeout, it gets redelivered and you suddenly have two tasks doing the same thing. You can increase the visibility_timeout but it is better to make the task execution atomic, example:
@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):
if get_lock('task_email_sender_sender_send'):
# we were able to lock the task
email_sender = EmailSender()
id = email_sender.sender_send(kwargs['id'])
...
release_lock()
else:
# we could not lock the task
# retry
It looks like this only can be a problem when we use acks_late for a task. For the email task I do not use acks_late.
Set a timeout for the task
Anything can go wrong and maybe one or more tasks suddenly never return. This will block or slowdown all other tasks. To avoid this, we can set timeouts for a task. When the task_soft_time_limit expires a SoftTimeLimitExceeded exception is raised. When the task_time_limit expires, the task is killed and replaced with a new one.
@celery.task(bind=True, task_time_limit=150, task_soft_time_limit=120, ...)
def task_email_sender_sender_send(self, **kwargs):
try:
a_long_time_to_finish()
except SoftTimeLimitExceeded:
recover()
Rate limiting, prevent overloading of the SMTP provider
Assume the SMTP provider is down for a long time or there is a crash in your system and it takes a few hours to fix it. After the problem is solved your emails will be send out at maximum speed. This may exceed limits of your SMTP provider Fortunately we can specify a rate_limit this for a task, for example limit to 50 tasks per minute:
@celery.task(bind=True, autoretry_for=(Exception,), rate_limit='50/m', retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):
....
This is a per worker rate limit, and not a global rate limit. If you have two workers sending email you can set the rate_limit to '25/m'.
The timestamp when the email initially was submitted can be important
In the block_check() function, see above, every recipient mail address is checked against an allow list and a deny list. But there are more checks. For example the number of messages send to a recipient is limited to X per Y minutes. If there is a problem somewhere, for example the worker crashed, and then restarted after some hours then all queued emails are sent one after another. If we would use the timestamp of the moment of sending, chances are that an email is blocked.
What we should do is use the timestamp when the message was submitted to the task queue. Fortunately all my database records have a created_on timestamp and we already know that we created a send_mail_table record before (!) passing the message to the task queue. In the task we can use this created_on timestamp to check for time limits.
Adding attachments
We can send our emails but what about attachments? If you run your application and workers on the same server then there is no problem, you just pass the links or paths of the files. But what if you run your web application on server X and workers on server Y? One way is to use a shared attachments volume you created somewhere. Another way is to store the attachment files in the send_email_table records. This is expensive in CPU cycles and memory but the advantage is that all data is in the database and the worker always has access to the attachments.
Can emails get lost?
As mentioned in the links below: 'Don't keep important data in your Celery queue'. Fortunately I am already inserting a send_mail_table record with the email parameters before (!) calling the task. There also is a flag EmailHasBeenSent in this record. This means that it is always possible to resend all emails that for whatever reason not have been sent.
Set Celery global settings
We can set the Celery parameters with the task decorator but it can be a good idea to set defaults for all your tasks.
TaskBase = celery.Task
class ContextTask(TaskBase):
time_limit = 40
soft_time_limit = 30
autoretry_for = (Exception, SoftTimeLimitExceeded, TimeLimitExceeded)
retry_backoff = 2
retry_backoff_max = 10 * 60
retry_kwargs = {'max_retries': 5}
retry_jitter = False
Summary
Some time ago I thought, let's add Celery to my application. A week, maybe two, should be possible. Yes, it was up and running in this time, but there was still so much to learn. The email examples on the internet are only an introduction. Using Celery feels like learning another language. I think I understand it now. In the beginning there are so many parameters, conditions. Not reading does not help. Many problems mentioned on the internet are from people who did not read the mostly excellent documentation on the Celery website. Yes, there are some inconsistencies but they work on it. Anyway, Celery is up and running and I am learning to make better tasks.
Links / credits
acks_late option seems to have no effect
https://celery-users.narkive.com/Cv8Juuxs/acks-late-option-seems-to-have-no-effect
Celery task exceptions and automatic retries
https://www.distributedpython.com/2018/09/04/error-handling-retry/
Celery throttling - setting rate limit for queues
https://alievmagomed.com/celery-throttling-setting-rate-limit-for-queues/
Custom Celery task states
https://www.distributedpython.com/2018/09/28/celery-task-states/
Distributed Task Locking in Celery
http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html
distributedpython
https://www.distributedpython.com/
Don't keep important data in your Celery queue
https://www.caktusgroup.com/blog/2016/10/18/dont-keep-important-data-your-celery-queue/
Flask Celery task locking
https://stackoverflow.com/questions/53950548/flask-celery-task-locking
Should I use retry or acks_late?
https://docs.celeryproject.org/en/latest/faq.html#should-i-use-retry-or-acks-late
Working with Asynchronous Celery Tasks – lessons learned
https://blog.daftcode.pl/working-with-asynchronous-celery-tasks-lessons-learned-32bb7495586b
Recent
- Hiding database UUID primary keys of your web application
- Don't Repeat Yourself (DRY) with Jinja2
- SQLAlchemy, PostgreSQL, maximum number of rows per user
- Show the values in SQLAlchemy dynamic filters
- Secure data transfer with Public Key encryption and pyNaCl
- rqlite: a high-availability and distributed SQLite alternative
Most viewed
- Using Python's pyOpenSSL to verify SSL certificates downloaded from a host
- Using UUIDs instead of Integer Autoincrement Primary Keys with SQLAlchemy and MariaDb
- Connect to a service on a Docker host from a Docker container
- Using PyInstaller and Cython to create a Python executable
- SQLAlchemy: Using Cascade Deletes to delete related objects
- Flask RESTful API request parameter validation with Marshmallow schemas