angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Celery, Redis и пример (in)известной почтовой задачи

Начиная с Celery легко. Но прежде чем взять Celery на производство, нужно потратить много времени, чтобы по-настоящему его понять.

10 октября 2020
В Celery
post main image
https://unsplash.com/@designnnco

Возможность запуска асинхронных задач из вашего веб-приложения во многих случаях является обязательным условием. Одним из способов достижения этого является использование Celery. В интернете есть много статей и приведены некоторые примеры. Мне очень понравились записи Miguel Grinberg о Celery. Очень понятно, спасибо Miguel.

Но пост Miguel и большинство других статей - это всего лишь введение. Когда вы начинаете с Celery , вы вдруг понимаете, что это непросто. На самом деле, вам лучше подготовиться к многому обучению, прежде чем использовать его в производстве.

В этом посте я пишу некоторые заметки, которые появляются при реализации Celery с Flask и Redis. Большая часть этих документов представляет собой резюме документов, размещенных на веб-сайте Celery , а также других сообщений в Интернете.

Многие другие писали о проблемах и о том, как они их решали.

Кто-то написал, что при запуске Celery в производстве вашим первым приоритетом является план того, что делать, если ваши задачи внезапно перестанут работать. Хочу добавить, что это означает, что сначала вы должны знать, как проверить, работает ли ваш брокер, рабочие и задачи.

Ищите в интернете 'Celery problem' и получаете огромное количество страниц. Многие из этих проблем уже освещены в документации Celery , TL;DR. Ряд проблем при отправке электронной почты и как их решать освещены в хорошем посте 'Работа с асинхронным Celery Задачи - усвоенные уроки', смотрите ссылки ниже. Также предлагаю вам посетить сайт distributedpython, смотрите ссылки ниже. Здесь много хороших рецептов.

Что это за разговоры о acks_late и повторах?

По умолчанию Celery сразу же подтверждает задачи. Это означает, что задача будет удалена из очереди брокера непосредственно перед ее запуском. Вы можете изменить такое поведение, указав acks_late, для каждой задачи или системы в целом. В этом случае Celery признает задачу только после успешного завершения.

Почему это важно?

Если задача прерывается внезапно, то по умолчанию Celery не будет делать повторных попыток. Если была указана acks_late , то задача все еще находится в очереди и Celery повторит попытку. В первом случае вы все равно можете повторить попытку, указав параметры повторной попытки, такие как retry_kwargs.

Так в чем же разница?

Это означает, что acks_late изменяет доставку с at-most-once на at-least-once.

Но acks_late может работать не так, как ожидалось.

Здесь мы должны различать несколько условий:

  • Graceful
    Администратор останавливает работника, используя сигнал SIGTERM : kill <pid>.
    задача not acked
  • Abrupt
    Администратор убивает работника, используя сигнал SIGKILL : CTRL-C или используя kill -9 <pid>,
    или,
    - падение системы (из памяти, отказ оборудования) завершает рабочий процесс.
    - Если запущен родительский процесс:
    task acked
    - Если не запущен родительский процесс, например, отключение питания):
    задача not acked (возможно)

В первом случае рабочий процесс останавливается грациозно, и выполняющаяся задача, которая останавливается, не является acked. Это связано с тем, что администратор намеревался (временно) остановить выполнение задачи. При повторном запуске рабочего процесса задача будет запущена заново.

Во втором случае при запуске родительского процесса родительский процесс видит, что работник потерян, принимает задачу и устанавливает состояние FAILURE. В случае, если администратор убил рабочего, вероятно, на то была веская причина. В случае падения системы мы, скорее всего, не хотим перезапускать задачу. Если родительский процесс не запущен, например, из-за внезапного самопроизвольного самопроизвольного самопроизвольного выхода из строя, то задача, скорее всего, не получит acked.

Idempotency

Наши задачи могут выполняться неожиданноМногократное повторение должно быть уверенным, что результат не изменится. Например, если задача уже отправила письмо, то при повторном запуске этой же задачи это письмо больше не должно быть отправлено. Это называется idempotency. Запускайте несколько раз, не изменяя результат за пределами исходного приложения (Википедия).

Acks_late против повторной попытки

Предположим, что задача отправляет электронное сообщение внешней службе, которая временно не работает. Задача попытается подключиться, но соединение будет зависеть от таймаута. Если мы используем поведение по умолчанию, Celery пометит задачу как неудачную. Она не будет выполнена снова.

Но подождите. Без acks_late мы также можем сказать Celery повторить попытку. Что следует использовать? Зависит. Смотрите документацию на сайте Celery 'Should I use retry or acks_late?'.

Как убедиться, что электронное письмо отправляется только один раз?

Предполагая, что одна и та же задача может выполняться несколько раз, мы можем предотвратить отправку письма только два и более раз, имея глобальный флаг для задач, не входящих в задачу. Я уже создал таблицу send_email_table , которая содержит записи с (мета) данными для каждого письма. Мы просто добавляем новую (булевую) колонку EmailHasBeenSent. Это также означает, что мы должны вставить новую запись send_email_table перед (!) вызовом задачи. Без использования задач моя функция отправки электронной почты выглядела следующим образом:

class EmailSender:

    def send(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

После внесения изменений для использования задачи для отправки сообщения электронной почты:

class EmailSender:

    def sender_init_function(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        return  send_email_table_record_id

    def sender_send(send_email_table_record_id):
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

И код для отправки сообщения:

def send_email(**kwargs):

    email_sender = EmailSender()
     send_email_table_record_id = email_sender.sender_init(**kwargs)

    # pass to task using apply_async
	kwargs  = {}
	kwargs['send_email_table_record_id'] =  send_email_table_record_id
    task_email_sender_sender_send.apply_async(queue=celery_task_queue,  kwargs=kwargs)


@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

    email_sender = EmailSender()
    id = email_sender.sender_send(kwargs['send_email_table_record_id'])

Метод sender_send() проверяет флаг EmailHasBeenSent в записи, которая может быть получена с помощью идентификатора. Если True, то сообщение уже отправлено, и задача возвращается, ничего не делая. В противном случае задача отправит письмо и установит флаг True после отправки письма.

Как насчет повторных попыток?

Предположим, что наш провайдер SMTP некоторое время недоступен. Celery делает очень простой способ повторных попыток в случае исключения. Мы можем сами поднять и перехватить исключение или использовать более простой параметр autoretry_for . Сайт Celery :

@celery.task(bind=True,  autoretry_for=(Exception,),  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):

    if error_sending_email:
        raise Exception(f'error sending email')
    # done

Для повторных попыток есть несколько параметров, которые вы можете использовать. Моя настройка (во время разработки):

retry_backoff  = 2
retry_backoff_max = 10 * 60
retry_kwargs  = {'max_retries': 5}

retry_backoff означает, что первая повторная попытка будет задержана на 2 секунды, вторая повторная попытка будет задержана на 4 секунды, следующие повторные попытки будут задержаны на 8, 16, 32 и т.д. секунды. retry_backoff_max определяет максимальную задержку между повторными попытками задачи, в данном случае один час.
max_retries - максимальное количество повторных попыток.

Redis и visibility_timeout

Наша задача по работе с электронной почтой выполняется в течение короткого времени, может быть, 10 секунд, даже если провайдер SMTP не работает. Но что произойдет, когда у нас будет задача, которая выполняется очень долго? Сайт Celery : 'Таймаут видимости определяет количество секунд, чтобы дождаться, пока работник подтвердит задачу, прежде чем сообщение будет повторно доставлено другому работнику.'. Значение по умолчанию visibility_timeout - один час. Это может стать проблемой, если ваша долго работающая задача использует acks_late. Если ваша задача работает дольше, чем visibility_timeout, то она перезапускается, и у вас внезапно возникают две задачи, выполняющие одно и то же действие. Вы можете увеличить visibility_timeout , но лучше сделать выполнение задачи атомарным, например:

@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

	if get_lock('task_email_sender_sender_send'):
		# we were able to lock the task
	
		email_sender = EmailSender()
		id = email_sender.sender_send(kwargs['id'])
		...
		release_lock()

	else:
		# we could not lock the task
		# retry

Похоже, что проблема может возникнуть только в том случае, если мы используем acks_late для задачи. Для почтовой задачи я не использую acks_late.

Установите таймаут для задачи

Все может пойти не так, и, возможно, одно или несколько заданий внезапно никогда не вернутся. Это заблокирует или замедлит работу всех остальных задач. Чтобы избежать этого, мы можем установить таймауты для задачи. По истечении task_soft_time_limit возникает исключение SoftTimeLimitExceeded . По истечении task_time_limit задача убивается и заменяется на новую.

@celery.task(bind=True,  task_time_limit=150,  task_soft_time_limit=120, ...)
def task_email_sender_sender_send(self, **kwargs):

    try:
        a_long_time_to_finish()
    except  SoftTimeLimitExceeded:
        recover()

Ограничение скорости, предотвращение перегрузки провайдера SMTP .

Предположим, что провайдер SMTP долгое время не работает или в системе произошел сбой, и на его исправление уходит несколько часов. После того, как проблема будет решена, ваши письма будут отправляться на максимальной скорости. Это может превысить лимит вашего провайдера SMTP . К счастью, для задачи можно установить лимит rate_limit, например, лимит до 50 задач в минуту:

@celery.task(bind=True,  autoretry_for=(Exception,), rate_limit='50/m',  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):
    ....

Это лимит на одного работника, а не глобальный лимит. Если у вас два работника, отправляющих электронную почту, вы можете установить лимит ставки в '25/m'.

Штемпель времени, когда электронное письмо было отправлено изначально, может быть важна.

В функции block_check(), см. выше, каждый почтовый адрес получателя проверяется на соответствие списку разрешенных и запрещенных адресов. Но есть и другие проверки. Например, количество сообщений, отправляемых получателю, ограничено до X в Y минут. Если где-то возникла проблема, например, рабочий разбился, а затем перезагрузился через несколько часов, то все сообщения в очереди отправляются одно за другим. Если бы мы использовали временную метку момента отправки, есть вероятность того, что письмо будет заблокировано.

Мы должны использовать метку времени, когда сообщение было отправлено в очередь заданий. К счастью, все записи в моей базе данных имеют временную метку created_on , и мы уже знаем, что мы создали запись send_mail_table перед (!) передачей сообщения в очередь задачи. В задаче мы можем использовать эту метку времени created_on для проверки временных ограничений.

Добавление вложений

Мы можем отправлять наши электронные письма, но как насчет вложений? Если вы запускаете ваше приложение и сотрудников на одном сервере, то проблем нет, вы просто передаете ссылки или пути к файлам. Но что, если вы запустите ваше веб-приложение на сервере X, а сотрудники - на сервере Y? Одним из способов является использование тома общих прикрепленных файлов, который вы где-то создали. Другой способ - хранить файлы вложений в записях send_email_table . Это дорого стоит в циклах и памяти CPU , но преимущество заключается в том, что все данные находятся в базе данных и работник всегда имеет доступ к прикрепленным файлам.

Может ли электронная почта потеряться?

Как указано в ссылках ниже: 'Не храните важные данные в очереди Celery '. К счастью, я уже вставляю запись send_mail_table с параметрами письма перед (!) вызовом задачи. В этой записи также присутствует флаг EmailHasBeenSent . Это означает, что всегда можно переслать все письма, которые по каким-либо причинам не были отправлены.

Установите глобальные настройки Celery

Мы можем установить параметры Celery с декоратором задач, но это может быть хорошей идеей, чтобы установить значения по умолчанию для всех ваших задач.

    TaskBase = celery.Task
    class ContextTask(TaskBase):

        time_limit = 40
        soft_time_limit = 30
         autoretry_for  = (Exception,  SoftTimeLimitExceeded, TimeLimitExceeded)
         retry_backoff  = 2
         retry_backoff_max = 10 * 60
         retry_kwargs  = {'max_retries': 5}
        retry_jitter = False

Резюме

Некоторое время назад я подумал, давайте добавим Celery в мое приложение. Неделя, может быть две, должна быть возможной. Да, в это время она была запущена, но еще столько всего нужно было узнать. Примеры электронной почты в интернете - это только введение. Использование Celery похоже на изучение другого языка. Думаю, теперь я это понимаю. В начале так много параметров, условий. Не помогает чтение. Многие проблемы, упомянутые в интернете, от людей, которые не прочитали в основном отличную документацию на сайте Celery . Да, есть некоторые несоответствия, но они работают над этим. В любом случае, Celery работает и я учусь делать задачи получше.

Ссылки / кредиты

acks_late option seems to have no effect
https://celery-users.narkive.com/Cv8Juuxs/acks-late-option-seems-to-have-no-effect

Celery task exceptions and automatic retries
https://www.distributedpython.com/2018/09/04/error-handling-retry/

Celery throttling - setting rate limit for queues
https://alievmagomed.com/celery-throttling-setting-rate-limit-for-queues/

Custom Celery task states
https://www.distributedpython.com/2018/09/28/celery-task-states/

Distributed Task Locking in Celery
http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

distributedpython
https://www.distributedpython.com/

Don't keep important data in your Celery queue
https://www.caktusgroup.com/blog/2016/10/18/dont-keep-important-data-your-celery-queue/

Flask Celery task locking
https://stackoverflow.com/questions/53950548/flask-celery-task-locking

Should I use retry or acks_late?
https://docs.celeryproject.org/en/latest/faq.html#should-i-use-retry-or-acks-late

Working with Asynchronous Celery Tasks – lessons learned
https://blog.daftcode.pl/working-with-asynchronous-celery-tasks-lessons-learned-32bb7495586b

Подробнее

Celery Email Redis

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.