angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Celery, Redis and the (in)famous email task example

Starting with Celery is easy. But before you take Celery to production, you have to spend a lot of time to really understand it.

10 October 2020 Updated 11 October 2020
In Celery
post main image
https://unsplash.com/@designnnco

Being able to run asynchronous tasks from your web application is in many cases a must have. One way to achieve this is to use Celery. There are many articles on the internet and some examples are given. I really liked Miguel Grinberg's posts about Celery. Very clear, thank you Miguel.

But Miguel's post and most other articles are just an introduction. When you start with Celery you suddenly realize that it is not easy. In fact, you better prepare yourself for a lot of learning before using it in production.

In this post I write some notes that came up when implementing Celery with Flask and Redis. Most of this is a summary of the documents present on the Celery website and other posts on the internet.

Many others have written about problems and how they solved them

Someone wrote that when running Celery in production your first priority is to have a plan of what to do if your tasks suddenly stop working. I want to add that this means that you first must know how to check if your broker, workers and tasks are running.

Search the internet for 'Celery problem' and you get an overwhelming amount of pages. Many of these problems are already covered by the Celery documentation, TL;DR. A number of problems when sending email and to how tackle them are covered in a nice post 'Working with Asynchronous Celery Tasks - lessons learned', see the links below. I also suggest you check the website distributedpython, see links below. There are a lot of good recipes here.

What is all this talk about acks_late and retries?

By default Celery will acknowledge tasks immediately. This means a task will be removed from the broker's queue just before it is started. You can change this behavior by specifying acks_late, per task or system wide. In this case Celery acknowledges a task only after successful completion.

Why is this important?

If a task is interrupted abruptly, then by default Celery will not retry the task. If acks_late was specified, the task is still in the queue and Celery will retry the task. In the first case you can still retry the task by specifying retry parameters like retry_kwargs.

So what's the difference?

This means that acks_late changes the delivery from at-most-once to at-least-once. 

But acks_late may not work as expected

Here we must differentiate between several conditions:

  • Graceful
    The administrator stops the worker using the SIGTERM signal: kill <pid>.
    task not acked
  • Abrupt
    The administrator kills the worker using the SIGKILL signal: CTRL-C or using kill -9 <pid>,
    or,
    a system crash (out of memory, hardware failure) terminates the worker process.
    - If the parent process is running:
    task acked
    - If the parent process is not running, e.g. power failure):
    task not acked (probably)

In the first case the worker process is stopped gracefully and a running task that is stopped is not acked. This is because the administrator intended to (temporarily) stop task execution. Starting the worker again will start the task again.

In the second case with the parent process running, the parent process sees that the worker is lost, acks the task and sets the state to FAILURE. In case the administrator killed the worker, there was probably a good reason to do this. In case of a system crash, we probably do not want the task to be restarted. If the parent process is not running, for example because it suddenly died, then the task probably will not get acked.

Idempotency

Our tasks may run unexpectedly multiple times we must make sure that the result does not change. For example, if a task already sent an email, then running the same task again should not send this email again. This is called idempotency. Run multiple times without changing the result beyond the initial application (Wikipedia).

Acks_late versus retry

Assume a task is sending an email to an external service that is temporarily down. The task will try to connect but the connection will timeout. If we use the default behavior, Celery will mark the task failed. It will not be executed again.

But wait. Without acks_late we can also tell Celery to retry the task. What should you use? Depends. Read the documentation on the Celery website 'Should I use retry or acks_late?'.

How to make sure an email is send only once?

Assuming that the same task can run multiple times, we can only prevent sending an email two times or more by having a flag global to the tasks, outside the task. I already created a table send_email_table which holds records with (meta) data for every email. We just add a new (Boolean) column EmailHasBeenSent. This also means we must insert a new send_email_table record before (!) calling the task. Without using tasks my email send function looked like this:

class EmailSender:

    def send(params):
        # basic checks and insert the email parameters in a new send_email_table record
        preprocess(...)
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

After making changes to use a task for sending an email:

class EmailSender:

    def sender_init_function(params):
        # basic checks and insert the email parameters in a new send_email_table record
        preprocess(...)
        return send_email_table_record_id

    def sender_send(send_email_table_record_id):
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

And the code to send an email:

def send_email(**kwargs):

    email_sender = EmailSender()
    send_email_table_record_id = email_sender.sender_init(**kwargs)

    # pass to task using apply_async
	kwargs = {}
	kwargs['send_email_table_record_id'] = send_email_table_record_id
    task_email_sender_sender_send.apply_async(queue=celery_task_queue, kwargs=kwargs)


@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

    email_sender = EmailSender()
    id = email_sender.sender_send(kwargs['send_email_table_record_id'])

The sender_send() method checks the flag EmailHasBeenSent in the record that can be retrieved using the id. If True, the email already has been sent and the task returns without doing anything. Otherwise, the task will send the email and sets this flag True after sending the email.

How about retries?

Let us assume that our SMTP provider is not available for some time. Celery makes it very easy way to retry a task in case of an exception. We can raise and catch the exception ourselves or use the more easy autoretry_for parameter. From the Celery website:

@celery.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):

    if error_sending_email:
        raise Exception(f'error sending email')
    # done

For retries there are a few parameters you can use. My setting (during development):

retry_backoff = 2
retry_backoff_max = 10 * 60
retry_kwargs = {'max_retries': 5}

retry_backoff means that the first retry will delay 2 seconds, the second retry will delay 4 seconds, the next retries will delay 8, 16, 32, etc. seconds. retry_backoff_max specifies the maximum delay between task retries, in this case one hour.
max_retries is the maximum number of retries.

Redis and the visibility_timeout

Our email task is running a short time, maybe 10 seconds, even if the SMTP provider is down. But what happens when we have a task that is running a very long time? From the Celery website: 'The visibility timeout defines the number of seconds to wait for the worker to acknowledge the task before the message is redelivered to another worker.'. The default value of the visibility_timeout is one hour. This can become a problem if your long running task uses acks_late. If your task runs longer than the visibility_timeout, it gets redelivered and you suddenly have two tasks doing the same thing. You can increase the visibility_timeout but it is better to make the task execution atomic, example:

@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

	if get_lock('task_email_sender_sender_send'):
		# we were able to lock the task
	
		email_sender = EmailSender()
		id = email_sender.sender_send(kwargs['id'])
		...
		release_lock()

	else:
		# we could not lock the task
		# retry

It looks like this only can be a problem when we use acks_late for a task. For the email task I do not use acks_late.

Set a timeout for the task

Anything can go wrong and maybe one or more tasks suddenly never return. This will block or slowdown all other tasks. To avoid this, we can set timeouts for a task. When the task_soft_time_limit expires a SoftTimeLimitExceeded exception is raised. When the task_time_limit expires, the task is killed and replaced with a new one.

@celery.task(bind=True, task_time_limit=150, task_soft_time_limit=120, ...)
def task_email_sender_sender_send(self, **kwargs):

    try:
        a_long_time_to_finish()
    except SoftTimeLimitExceeded:
        recover()

Rate limiting, prevent overloading of the SMTP provider

Assume the SMTP provider is down for a long time or there is a crash in your system and it takes a few hours to fix it. After the problem is solved your emails will be send out at maximum speed. This may exceed limits of your SMTP provider Fortunately we can specify a rate_limit this for a task, for example limit to 50 tasks per minute:

@celery.task(bind=True, autoretry_for=(Exception,), rate_limit='50/m', retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):
    ....

This is a per worker rate limit, and not a global rate limit. If you have two workers sending email you can set the rate_limit to '25/m'.

The timestamp when the email initially was submitted can be important

In the block_check() function, see above, every recipient mail address is checked against an allow list and a deny list. But there are more checks. For example the number of messages send to a recipient is limited to X per Y minutes. If there is a problem somewhere, for example the worker crashed, and then restarted after some hours then all queued emails are sent one after another. If we would use the timestamp of the moment of sending, chances are that an email is blocked.

What we should do is use the timestamp when the message was submitted to the task queue. Fortunately all my database records have a created_on timestamp and we already know that we created a send_mail_table record before (!) passing the message to the task queue. In the task we can use this created_on timestamp to check for time limits.

Adding attachments

We can send our emails but what about attachments? If you run your application and workers on the same server then there is no problem, you just pass the links or paths of the files. But what if you run your web application on server X and workers on server Y? One way is to use a shared attachments volume you created somewhere. Another way is to store the attachment files in the send_email_table records. This is expensive in CPU cycles and memory but the advantage is that all data is in the database and the worker always has access to the attachments.

Can emails get lost?

As mentioned in the links below: 'Don't keep important data in your Celery queue'. Fortunately I am already inserting a send_mail_table record with the email parameters before (!) calling the task. There also is a flag EmailHasBeenSent in this record. This means that it is always possible to resend all emails that for whatever reason not have been sent.

Set Celery global settings

We can set the Celery parameters with the task decorator but it can be a good idea to set defaults for all your tasks.

    TaskBase = celery.Task
    class ContextTask(TaskBase):

        time_limit = 40
        soft_time_limit = 30
        autoretry_for = (Exception, SoftTimeLimitExceeded, TimeLimitExceeded)
        retry_backoff = 2
        retry_backoff_max = 10 * 60
        retry_kwargs = {'max_retries': 5}
        retry_jitter = False

Summary

Some time ago I thought, let's add Celery to my application. A week, maybe two, should be possible. Yes, it was up and running in this time, but there was still so much to learn. The email examples on the internet are only an introduction. Using Celery feels like learning another language. I think I understand it now. In the beginning there are so many parameters, conditions. Not reading does not help. Many problems mentioned on the internet are from people who did not read the mostly excellent documentation on the Celery website. Yes, there are some inconsistencies but they work on it.  Anyway, Celery is up and running and I am learning to make better tasks.

Links / credits

acks_late option seems to have no effect
https://celery-users.narkive.com/Cv8Juuxs/acks-late-option-seems-to-have-no-effect

Celery task exceptions and automatic retries
https://www.distributedpython.com/2018/09/04/error-handling-retry/

Celery throttling - setting rate limit for queues
https://alievmagomed.com/celery-throttling-setting-rate-limit-for-queues/

Custom Celery task states
https://www.distributedpython.com/2018/09/28/celery-task-states/

Distributed Task Locking in Celery
http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

distributedpython
https://www.distributedpython.com/

Don't keep important data in your Celery queue
https://www.caktusgroup.com/blog/2016/10/18/dont-keep-important-data-your-celery-queue/

Flask Celery task locking
https://stackoverflow.com/questions/53950548/flask-celery-task-locking

Should I use retry or acks_late?
https://docs.celeryproject.org/en/latest/faq.html#should-i-use-retry-or-acks-late

Working with Asynchronous Celery Tasks – lessons learned
https://blog.daftcode.pl/working-with-asynchronous-celery-tasks-lessons-learned-32bb7495586b

Leave a comment

Comment anonymously or log in to comment.

Comments

Leave a reply

Reply anonymously or log in to reply.