angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Celery, Redis y el (in)famoso ejemplo de tarea de correo electrónico

Empezar con Celery es fácil. Pero antes de llevar Celery a la producción, tienes que pasar mucho tiempo para entenderlo realmente.

10 octubre 2020
En Celery
post main image
https://unsplash.com/@designnnco

Ser capaz de ejecutar tareas asincrónicas desde su aplicación web es en muchos casos una necesidad. Una forma de lograrlo es usar Celery. Hay muchos artículos en Internet y se dan algunos ejemplos. Me gustó mucho Miguel Grinberg's posts sobre Celery. Muy claro, gracias Miguel.

Pero el post de Miguel y la mayoría de los otros artículos son sólo una introducción. Cuando empiezas con Celery te das cuenta de repente que no es fácil. De hecho, es mejor que te prepares para aprender mucho antes de usarlo en la producción.

En este post escribo algunas notas que surgieron al implementar Celery con Flask y Redis. La mayor parte de esto es un resumen de los documentos presentes en el sitio web de Celery y otras publicaciones en Internet.

Muchos otros han escrito sobre problemas y cómo los resolvieron

Alguien escribió que cuando se ejecuta Celery en producción tu primera prioridad es tener un plan de qué hacer si tus tareas dejan de funcionar de repente. Quiero añadir que esto significa que primero debes saber cómo comprobar si tu corredor, trabajadores y tareas están funcionando.

Busca en Internet "Celery problema" y obtendrás una cantidad abrumadora de páginas. Muchos de estos problemas ya están cubiertos por la documentación de Celery , TL;DR. Una serie de problemas al enviar el correo electrónico y a cómo abordarlos están cubiertos en un bonito post 'Working with Asynchronous Celery Tasks - lessons learned', ver los enlaces de abajo. También te sugiero que revises el sitio web distributedpython, ver los enlaces de abajo. Hay muchas buenas recetas aquí.

¿Qué es toda esta charla sobre acks_late y reintentos?

Por defecto, Celery reconocerá las tareas inmediatamente. Esto significa que una tarea será eliminada de la cola del corredor justo antes de que se inicie. Puede cambiar este comportamiento especificando acks_late, por tarea o en todo el sistema. En este caso Celery reconoce una tarea sólo después de completarla con éxito.

¿Por qué es esto importante?

Si una tarea es interrumpida abruptamente, entonces por defecto Celery no reintentará la tarea. Si se especificó acks_late , la tarea aún está en la cola y Celery reintentará la tarea. En el primer caso, todavía puede reintentar la tarea especificando parámetros de reintento como retry_kwargs.

Entonces, ¿cuál es la diferencia?

Esto significa que acks_late cambia la entrega de at-más-una vez a at-menos-una vez.

Pero acks_late puede no funcionar como se esperaba

Aquí debemos diferenciar entre varias condiciones:

  • Graceful
    El administrador detiene al trabajador usando la señal SIGTERM : kill <pid>.
    task not acked
  • Abrupto
    El administrador mata al trabajador usando la señal SIGKILL : CTRL-C o usando kill -9 <pid>,
    o,
    una caída del sistema (fuera de la memoria, fallo de hardware) termina el proceso del trabajador.
    - Si el proceso padre está en marcha:
    task acked
    - Si el proceso padre no está en marcha, por ejemplo, un fallo de alimentación):
    task not acked (probablemente)

En el primer caso el proceso de trabajo se detiene con gracia y una tarea en ejecución que se detiene no es acked. Esto se debe a que el administrador tenía la intención de detener (temporalmente) la ejecución de la tarea. Si se inicia el trabajador de nuevo, se iniciará la tarea de nuevo.

En el segundo caso con el proceso padre en ejecución, el proceso padre ve que el trabajador se ha perdido, acciona la tarea y establece el estado a FAILURE. En caso de que el administrador matara al trabajador, probablemente había una buena razón para hacer esto. En caso de una caída del sistema, probablemente no queremos que la tarea se reinicie. Si el proceso padre no se está ejecutando, por ejemplo porque murió repentinamente, entonces la tarea probablemente no obtendrá acked.

Idempotencia

Nuestras tareas pueden funcionar inesperadamentevarias veces debemos asegurarnos de que el resultado no cambie. Por ejemplo, si una tarea ya ha enviado un correo electrónico, entonces al ejecutar la misma tarea de nuevo no debe enviar este correo electrónico de nuevo. Esto se llama idempotencia. Ejecutar varias veces sin cambiar el resultado más allá de la aplicación inicial (Wikipedia).

Acks_late contra reintento

Supongamos que una tarea es enviar un correo electrónico a un servicio externo que está temporalmente caído. La tarea intentará conectarse pero la conexión se agotará. Si usamos el comportamiento por defecto, Celery marcará la tarea como fallida. No se volverá a ejecutar.

Pero esperen. Sin acks_late también podemos decirle a Celery que vuelva a intentar la tarea. ¿Qué debería usar? Depende. Lee la documentación en la página web de Celery 'Should I use retry or acks_late?'.

¿Cómo asegurarse de que un correo electrónico se envíe sólo una vez?

Asumiendo que la misma tarea puede ejecutarse varias veces, sólo podemos evitar el envío de un correo electrónico dos veces o más teniendo una bandera global a las tareas, fuera de la tarea. Ya he creado una tabla send_email_table que contiene registros con (meta) datos para cada correo electrónico. Sólo añadimos una nueva columna (booleana) EmailHasBeenSent. Esto también significa que debemos insertar un nuevo registro send_email_table antes de (!) llamar a la tarea. Sin usar las tareas mi función de envío de correo electrónico se veía así:

class EmailSender:

    def send(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

Después de hacer cambios para usar una tarea para enviar un correo electrónico:

class EmailSender:

    def sender_init_function(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        return  send_email_table_record_id

    def sender_send(send_email_table_record_id):
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

Y el código para enviar un correo electrónico:

def send_email(**kwargs):

    email_sender = EmailSender()
     send_email_table_record_id = email_sender.sender_init(**kwargs)

    # pass to task using apply_async
	kwargs  = {}
	kwargs['send_email_table_record_id'] =  send_email_table_record_id
    task_email_sender_sender_send.apply_async(queue=celery_task_queue,  kwargs=kwargs)


@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

    email_sender = EmailSender()
    id = email_sender.sender_send(kwargs['send_email_table_record_id'])

El método sender_send() comprueba la bandera EmailHasBeenSent en el registro que puede ser recuperado usando el id. Si es True, el correo electrónico ya ha sido enviado y la tarea regresa sin hacer nada. En caso contrario, la tarea enviará el correo electrónico y establece este indicador como True después de enviar el correo electrónico.

¿Qué hay de los reintentos?

Asumamos que nuestro proveedor de SMTP no está disponible por algún tiempo. Celery hace que sea muy fácil reintentar una tarea en caso de una excepción. Podemos aumentar y capturar la excepción nosotros mismos o usar el más fácil parámetro autoretry_for . Desde el sitio web de Celery :

@celery.task(bind=True,  autoretry_for=(Exception,),  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):

    if error_sending_email:
        raise Exception(f'error sending email')
    # done

Para reintentos hay algunos parámetros que puede usar. Mi configuración (durante el desarrollo):

retry_backoff  = 2
retry_backoff_max = 10 * 60
retry_kwargs  = {'max_retries': 5}

retry_backoff significa que el primer reintento retrasará 2 segundos, el segundo reintento retrasará 4 segundos, los siguientes reintentos retrasarán 8, 16, 32, etc. segundos. retry_backoff_max especifica el máximo retardo entre reintentos de tareas, en este caso una hora.
max_retries es el número máximo de reintentos.

Redis y el visibility_timeout

Nuestra tarea de correo electrónico se está ejecutando en poco tiempo, tal vez 10 segundos, incluso si el proveedor de SMTP está caído. ¿Pero qué pasa cuando tenemos una tarea que se está ejecutando durante mucho tiempo? Desde el sitio web de Celery : "El tiempo de visibilidad define el número de segundos de espera para que el trabajador reconozca la tarea antes de que el mensaje sea entregado a otro trabajador. El valor por defecto del visibility_timeout es una hora. Esto puede convertirse en un problema si su tarea de larga duración utiliza acks_late. Si su tarea se ejecuta más tiempo que el visibility_timeout, se vuelve a entregar y de repente tiene dos tareas haciendo lo mismo. Puedes aumentar el visibility_timeout pero es mejor hacer que la ejecución de la tarea sea atómica, por ejemplo:

@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

	if get_lock('task_email_sender_sender_send'):
		# we were able to lock the task
	
		email_sender = EmailSender()
		id = email_sender.sender_send(kwargs['id'])
		...
		release_lock()

	else:
		# we could not lock the task
		# retry

Parece que esto sólo puede ser un problema cuando usamos acks_late para una tarea. Para la tarea de correo electrónico no uso acks_late.

Establecer un tiempo de espera para la tarea

Cualquier cosa puede salir mal y tal vez una o más tareas de repente nunca regresan. Esto bloqueará o ralentizará todas las demás tareas. Para evitar esto, podemos establecer tiempos de espera para una tarea. Cuando el task_soft_time_limit expira, se levanta una excepción SoftTimeLimitExceeded . Cuando el task_time_limit expira, la tarea se mata y se reemplaza por una nueva.

@celery.task(bind=True,  task_time_limit=150,  task_soft_time_limit=120, ...)
def task_email_sender_sender_send(self, **kwargs):

    try:
        a_long_time_to_finish()
    except  SoftTimeLimitExceeded:
        recover()

La limitación de la tasa, previene la sobrecarga del proveedor de SMTP

Supongamos que el proveedor de SMTP está caído por mucho tiempo o que hay una caída en su sistema y toma unas pocas horas arreglarlo. Después de que el problema sea resuelto, sus correos electrónicos serán enviados a máxima velocidad. Esto puede exceder los límites de su proveedor de SMTP Afortunadamente podemos especificar un rate_limit esto para una tarea, por ejemplo el límite de 50 tareas por minuto:

@celery.task(bind=True,  autoretry_for=(Exception,), rate_limit='50/m',  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):
    ....

Este es un límite de tasa por trabajador, y no un límite de tasa global. Si tienes dos trabajadores enviando correo electrónico puedes establecer el límite de tarifa en '25/m'.

La fecha y hora en que se envió el correo electrónico inicialmente puede ser importante

En la función block_check(), véase más arriba, cada dirección de correo del destinatario se coteja con una lista de permitidas y una lista de denegadas. Pero hay más comprobaciones. Por ejemplo, el número de mensajes enviados a un destinatario está limitado a X por Y minutos. Si hay un problema en algún lugar, por ejemplo el trabajador se estrelló, y luego se reinició después de algunas horas, entonces todos los correos en cola se envían uno tras otro. Si usamos la marca de tiempo del momento de envío, lo más probable es que un correo electrónico sea bloqueado.

Lo que deberíamos hacer es usar la marca de tiempo del momento en que el mensaje fue enviado a la cola de tareas. Afortunadamente todos los registros de mi base de datos tienen una marca de tiempo created_on y ya sabemos que creamos un registro send_mail_table antes (!) de pasar el mensaje a la cola de tareas. En la tarea podemos usar esta marca de tiempo created_on para comprobar los límites de tiempo.

Agregar archivos adjuntos

Podemos enviar nuestros correos electrónicos, pero ¿qué pasa con los archivos adjuntos? Si ejecutas tu aplicación y los trabajadores en el mismo servidor entonces no hay problema, sólo pasas los enlaces o rutas de los archivos. ¿Pero qué pasa si ejecutas tu aplicación web en el servidor X y los trabajadores en el servidor Y? Una forma es usar un volumen de archivos adjuntos compartidos que hayas creado en algún lugar. Otra forma es almacenar los archivos adjuntos en los registros send_email_table . Esto es costoso en ciclos y memoria de CPU pero la ventaja es que todos los datos están en la base de datos y el trabajador siempre tiene acceso a los archivos adjuntos.

¿Pueden perderse los correos electrónicos?

Como se menciona en los enlaces de abajo: "No guardes datos importantes en tu cola de Celery ". Afortunadamente ya estoy insertando un registro de send_mail_table con los parámetros del correo electrónico antes (!) de llamar a la tarea. También hay una bandera EmailHasBeenSent en este registro. Esto significa que siempre es posible reenviar todos los correos electrónicos que por cualquier razón no han sido enviados.

Configurar Celery de forma global

Podemos establecer los parámetros Celery con el decorador de tareas pero puede ser una buena idea establecer valores por defecto para todas sus tareas.

    TaskBase = celery.Task
    class ContextTask(TaskBase):

        time_limit = 40
        soft_time_limit = 30
         autoretry_for  = (Exception,  SoftTimeLimitExceeded, TimeLimitExceeded)
         retry_backoff  = 2
         retry_backoff_max = 10 * 60
         retry_kwargs  = {'max_retries': 5}
        retry_jitter = False

Resumen

Hace algún tiempo pensé, añadamos Celery a mi aplicación. Una semana, tal vez dos, debería ser posible. Sí, estaba en funcionamiento en este tiempo, pero todavía había mucho que aprender. Los ejemplos de correo electrónico en Internet son sólo una introducción. Usar Celery se siente como aprender otro idioma. Creo que ahora lo entiendo. Al principio hay tantos parámetros, condiciones. No leer no ayuda. Muchos de los problemas mencionados en Internet son de personas que no leyeron la excelente documentación del sitio web de Celery . Sí, hay algunas inconsistencias pero trabajan en ello. De todos modos, Celery está en funcionamiento y estoy aprendiendo a hacer mejores tareas.

Enlaces / créditos

acks_late option seems to have no effect
https://celery-users.narkive.com/Cv8Juuxs/acks-late-option-seems-to-have-no-effect

Celery task exceptions and automatic retries
https://www.distributedpython.com/2018/09/04/error-handling-retry/

Celery throttling - setting rate limit for queues
https://alievmagomed.com/celery-throttling-setting-rate-limit-for-queues/

Custom Celery task states
https://www.distributedpython.com/2018/09/28/celery-task-states/

Distributed Task Locking in Celery
http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

distributedpython
https://www.distributedpython.com/

Don't keep important data in your Celery queue
https://www.caktusgroup.com/blog/2016/10/18/dont-keep-important-data-your-celery-queue/

Flask Celery task locking
https://stackoverflow.com/questions/53950548/flask-celery-task-locking

Should I use retry or acks_late?
https://docs.celeryproject.org/en/latest/faq.html#should-i-use-retry-or-acks-late

Working with Asynchronous Celery Tasks – lessons learned
https://blog.daftcode.pl/working-with-asynchronous-celery-tasks-lessons-learned-32bb7495586b

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.