angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Celery, Redis et l'exemple (in)célèbre de tâche de courrier électronique

Commencer avec Celery est facile. Mais avant de passer à la production de Celery , il faut passer beaucoup de temps pour vraiment le comprendre.

10 octobre 2020
Dans Celery
post main image
https://unsplash.com/@designnnco

Dans de nombreux cas, il est indispensable de pouvoir exécuter des tâches asynchrones à partir de votre application web. Une façon d'y parvenir est d'utiliser Celery. Il existe de nombreux articles sur l'internet et quelques exemples sont donnés. J'ai beaucoup aimé les articles de Miguel Grinberg sur Celery. Très clair, merci Miguel.

Mais le post de Miguel et la plupart des autres articles ne sont qu'une introduction. Lorsque vous commencez avec Celery , vous vous rendez soudain compte que ce n'est pas facile. En fait, il vaut mieux se préparer à beaucoup d'apprentissage avant de l'utiliser en production.

Dans ce billet, j'écris quelques notes qui sont apparues lors de la mise en œuvre de Celery avec Flask et Redis. Il s'agit pour l'essentiel d'un résumé des documents présents sur le site web Celery et d'autres postes sur Internet.

Beaucoup d'autres ont écrit sur les problèmes et la façon dont ils les ont résolus

Quelqu'un a écrit que lorsque vous exécutez Celery en production, votre première priorité est d'avoir un plan de ce qu'il faut faire si vos tâches cessent soudainement de fonctionner. Je tiens à ajouter que cela signifie que vous devez d'abord savoir comment vérifier si votre courtier, vos travailleurs et vos tâches sont en cours d'exécution.

Si vous cherchez sur Internet le problème "Celery ", vous obtenez une quantité impressionnante de pages. Nombre de ces problèmes sont déjà couverts par la documentation Celery , TL;DR. Un certain nombre de problèmes liés à l'envoi de courriers électroniques et à la manière de les résoudre sont abordés dans un beau post intitulé "Working with Asynchronous Celery Tasks - lessons learned", voir les liens ci-dessous. Je vous suggère également de consulter le site web distributedpython, voir les liens ci-dessous. Vous y trouverez de nombreuses bonnes recettes.

Qu'est-ce que c'est que cette discussion sur le acks_late et les essais ?

Par défaut, Celery accuse réception des tâches immédiatement. Cela signifie qu'une tâche sera retirée de la file d'attente du courtier juste avant d'être lancée. Vous pouvez modifier ce comportement en spécifiant acks_late, par tâche ou pour l'ensemble du système. Dans ce cas, Celery n'accuse réception d'une tâche qu'après son achèvement.

Pourquoi est-ce important ?

Si une tâche est interrompue brusquement, alors par défaut Celery ne réessaiera pas la tâche. Si acks_late a été spécifié, la tâche est toujours dans la file d'attente et Celery va réessayer la tâche. Dans le premier cas, vous pouvez toujours réessayer la tâche en spécifiant des paramètres de réessai comme retry_kwargs.

Quelle est donc la différence ?

Cela signifie que acks_late change la livraison de "at-most-once" à "at-least-once".

Mais acks_late peut ne pas fonctionner comme prévu

Il faut ici faire la distinction entre plusieurs conditions :

  • Gracieux
    L'administrateur arrête le travailleur en utilisant le signal SIGTERM : kill <pid>.
    task not acked
  • Abrupt
    L'administrateur tue le travailleur en utilisant le signal SIGKILL : CTRL-C ou en utilisant le signal kill -9 <pid>,
    ou,
    un crash système (perte de mémoire, panne de matériel) met fin au processus du travailleur.
    - Si le processus parent est en cours d'exécution :
    task acked
    - Si le processus parent n'est pas en cours d'exécution, par exemple une panne de courant) :
    task not acked (probablement)

Dans le premier cas, le processus du travailleur est arrêté gracieusement et une tâche en cours qui est arrêtée n'est pas acked. Cela est dû au fait que l'administrateur avait l'intention d'arrêter (temporairement) l'exécution de la tâche. Le fait de redémarrer le travailleur relancera la tâche.

Dans le second cas, le processus parent en cours d'exécution voit que le travailleur est perdu, accède à la tâche et définit l'état comme FAILURE. Dans le cas où l'administrateur a tué le travailleur, il y avait probablement une bonne raison de le faire. En cas de panne du système, nous ne voulons probablement pas que la tâche soit redémarrée. Si le processus parent ne fonctionne pas, par exemple parce qu'il est soudainement mort, alors la tâche ne recevra probablement pas acked.

Idempotence

Nos tâches peuvent fonctionner à l'improvisteplusieurs fois, nous devons veiller à ce que le résultat ne change pas. Par exemple, si une tâche a déjà envoyé un courrier électronique, le fait d'exécuter à nouveau la même tâche ne doit pas entraîner l'envoi de ce courrier. C'est ce qu'on appelle l'idempotence. Exécuter plusieurs fois sans modifier le résultat au-delà de l'application initiale (Wikipedia).

Acks_late contre réessai

Supposons qu'une tâche consiste à envoyer un courrier électronique à un service externe qui est temporairement hors service. La tâche essaiera de se connecter mais la connexion sera interrompue. Si nous utilisons le comportement par défaut, Celery marquera l'échec de la tâche. Elle ne sera pas exécutée à nouveau.

Mais attendez. Sans acks_late , nous pouvons également dire à Celery de réessayer la tâche. Que devez-vous utiliser ? Cela dépend. Lisez la documentation sur le site web Celery 'Should I use retry or acks_late?'.

Comment s'assurer qu'un courriel n'est envoyé qu'une seule fois ?

En supposant qu'une même tâche puisse être exécutée plusieurs fois, nous ne pouvons empêcher l'envoi d'un courriel deux fois ou plus qu'en ayant un drapeau global aux tâches, en dehors de la tâche. J'ai déjà créé une table send_email_table qui contient des enregistrements avec des (méta) données pour chaque e-mail. Nous venons d'ajouter une nouvelle colonne (booléenne) EmailHasBeenSent. Cela signifie également que nous devons insérer un nouvel enregistrement send_email_table avant ( !) d'appeler la tâche. Sans utiliser les tâches, ma fonction d'envoi de courrier électronique ressemblait à ceci :

class EmailSender:

    def send(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

Après avoir apporté des modifications pour utiliser une tâche pour l'envoi d'un e-mail :

class EmailSender:

    def sender_init_function(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        return  send_email_table_record_id

    def sender_send(send_email_table_record_id):
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

Et le code pour envoyer un e-mail :

def send_email(**kwargs):

    email_sender = EmailSender()
     send_email_table_record_id = email_sender.sender_init(**kwargs)

    # pass to task using apply_async
	kwargs  = {}
	kwargs['send_email_table_record_id'] =  send_email_table_record_id
    task_email_sender_sender_send.apply_async(queue=celery_task_queue,  kwargs=kwargs)


@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

    email_sender = EmailSender()
    id = email_sender.sender_send(kwargs['send_email_table_record_id'])

La méthode sender_send() vérifie le drapeau EmailHasBeenSent dans l'enregistrement qui peut être récupéré en utilisant l'identifiant. Si la réponse est "True", l'e-mail a déjà été envoyé et la tâche revient sans rien faire. Sinon, la tâche enverra le courrier électronique et mettra ce drapeau à True après l'envoi du courrier électronique.

Et pour les tentatives de récupération ?

Supposons que notre fournisseur SMTP ne soit pas disponible pendant un certain temps. Celery permet de réessayer très facilement une tâche en cas d'exception. Nous pouvons soulever et attraper l'exception nous-mêmes ou utiliser le paramètre plus facile autoretry_for . Sur le site web de Celery :

@celery.task(bind=True,  autoretry_for=(Exception,),  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):

    if error_sending_email:
        raise Exception(f'error sending email')
    # done

Il existe quelques paramètres que vous pouvez utiliser pour les tentatives de relance. Mon paramètre (pendant le développement) :

retry_backoff  = 2
retry_backoff_max = 10 * 60
retry_kwargs  = {'max_retries': 5}

retry_backoff signifie que la première tentative sera retardée de 2 secondes, la deuxième tentative sera retardée de 4 secondes, les tentatives suivantes seront retardées de 8, 16, 32, etc. secondes. retry_backoff_max indique le délai maximum entre les tentatives de tâches, dans ce cas une heure.
max_retries est le nombre maximum de tentatives.

Redis et le visibility_timeout

Notre tâche de courrier électronique est de courte durée, peut-être 10 secondes, même si le fournisseur SMTP est en panne. Mais que se passe-t-il lorsque nous avons une tâche qui dure très longtemps ? Extrait du site web Celery : Le délai de visibilité définit le nombre de secondes à attendre pour que le travailleur accuse réception de la tâche avant que le message ne soit transmis à un autre travailleur". La valeur par défaut de visibility_timeout est d'une heure. Cela peut devenir un problème si votre tâche de longue durée utilise acks_late. Si votre tâche dure plus longtemps que le visibility_timeout, elle est à nouveau livrée et vous avez soudainement deux tâches qui font la même chose. Vous pouvez augmenter le visibility_timeout mais il est préférable de rendre l'exécution de la tâche atomique, par exemple :

@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

	if get_lock('task_email_sender_sender_send'):
		# we were able to lock the task
	
		email_sender = EmailSender()
		id = email_sender.sender_send(kwargs['id'])
		...
		release_lock()

	else:
		# we could not lock the task
		# retry

Il semble que cela ne puisse être un problème que lorsque nous utilisons acks_late pour une tâche. Pour la tâche de courrier électronique, je n'utilise pas acks_late.

Définir un délai d'attente pour la tâche

Tout peut mal tourner et peut-être qu'une ou plusieurs tâches ne reviennent jamais. Cela bloquera ou ralentira toutes les autres tâches. Pour éviter cela, nous pouvons fixer des délais pour une tâche. Lorsque le task_soft_time_limit expire, une exception SoftTimeLimitExceeded est levée. Lorsque le task_time_limit expire, la tâche est tuée et remplacée par une nouvelle.

@celery.task(bind=True,  task_time_limit=150,  task_soft_time_limit=120, ...)
def task_email_sender_sender_send(self, **kwargs):

    try:
        a_long_time_to_finish()
    except  SoftTimeLimitExceeded:
        recover()

Limitation du débit, pour éviter la surcharge du fournisseur SMTP

Supposons que le fournisseur SMTP soit hors service pendant une longue période ou qu'il y ait un crash dans votre système et qu'il faille quelques heures pour le réparer. Une fois le problème résolu, vos e-mails seront envoyés à la vitesse maximale. Cela peut dépasser les limites de votre fournisseur SMTP . Heureusement, nous pouvons spécifier un taux_limite pour une tâche, par exemple limiter à 50 tâches par minute :

@celery.task(bind=True,  autoretry_for=(Exception,), rate_limit='50/m',  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):
    ....

Il s'agit d'une limite de taux par travailleur, et non d'une limite de taux globale. Si deux de vos employés envoient des courriers électroniques, vous pouvez fixer la rate_limit à "25/m".

L'horodatage de l'envoi initial du courriel peut être important

Dans la fonction block_check(), voir ci-dessus, chaque adresse de courrier électronique du destinataire est comparée à une liste d'autorisation et une liste de refus. Mais il existe d'autres contrôles. Par exemple, le nombre de messages envoyés à un destinataire est limité à X par Y minutes. S'il y a un problème quelque part, par exemple si l'employé a planté, puis redémarré après quelques heures, tous les courriels en file d'attente sont envoyés les uns après les autres. Si nous utilisions l'horodatage du moment de l'envoi, il y a de fortes chances qu'un courriel soit bloqué.

Ce que nous devrions faire, c'est utiliser l'horodatage du moment où le message a été soumis à la file d'attente des tâches. Heureusement, tous les enregistrements de ma base de données ont un horodatage created_on et nous savons déjà que nous avons créé un enregistrement send_mail_table avant ( !) de passer le message dans la file d'attente des tâches. Dans la tâche, nous pouvons utiliser cet horodatage created_on pour vérifier les limites de temps.

Ajout de pièces jointes

Nous pouvons envoyer nos courriels, mais qu'en est-il des pièces jointes ? Si vous exécutez votre application et vos travailleurs sur le même serveur, il n'y a pas de problème, il vous suffit de transmettre les liens ou les chemins d'accès des fichiers. Mais qu'en est-il si vous exécutez votre application web sur le serveur X et vos employés sur le serveur Y ? Une solution consiste à utiliser un volume de pièces jointes partagé que vous avez créé quelque part. Une autre façon est de stocker les fichiers joints dans les enregistrements send_email_table . Cela coûte cher en cycles et en mémoire CPU , mais l'avantage est que toutes les données se trouvent dans la base de données et que le travailleur a toujours accès aux pièces jointes.

Les courriels peuvent-ils se perdre ?

Comme mentionné dans les liens ci-dessous : Ne gardez pas de données importantes dans votre file d'attente Celery ". Heureusement, j'ai déjà inséré un enregistrement send_mail_table avec les paramètres du courrier électronique avant ( !) d'appeler la tâche. Il y a également un drapeau EmailHasBeenSent dans cet enregistrement. Cela signifie qu'il est toujours possible de renvoyer tous les courriels qui, pour une raison quelconque, n'ont pas été envoyés.

Définir les paramètres globaux de Celery

Nous pouvons définir les paramètres Celery avec le décorateur de tâches, mais il peut être judicieux de définir des valeurs par défaut pour toutes vos tâches.

    TaskBase = celery.Task
    class ContextTask(TaskBase):

        time_limit = 40
        soft_time_limit = 30
         autoretry_for  = (Exception,  SoftTimeLimitExceeded, TimeLimitExceeded)
         retry_backoff  = 2
         retry_backoff_max = 10 * 60
         retry_kwargs  = {'max_retries': 5}
        retry_jitter = False

Résumé

Il y a quelque temps, j'ai pensé : ajoutons Celery à ma demande. Une semaine, peut-être deux, devrait être possible. Oui, c'était déjà en place pendant ce temps, mais il y avait encore tant à apprendre. Les exemples de courrier électronique sur Internet ne sont qu'une introduction. Utiliser Celery , c'est comme apprendre une autre langue. Je crois que je le comprends maintenant. Au début, il y a tellement de paramètres, de conditions. Ne pas lire n'aide pas. De nombreux problèmes mentionnés sur Internet proviennent de personnes qui n'ont pas lu la documentation, pour la plupart excellente, du site web Celery . Oui, il y a quelques incohérences, mais ils y travaillent. Quoi qu'il en soit, le site Celery est opérationnel et j'apprends à faire de meilleures tâches.

Liens / crédits

acks_late option seems to have no effect
https://celery-users.narkive.com/Cv8Juuxs/acks-late-option-seems-to-have-no-effect

Celery task exceptions and automatic retries
https://www.distributedpython.com/2018/09/04/error-handling-retry/

Celery throttling - setting rate limit for queues
https://alievmagomed.com/celery-throttling-setting-rate-limit-for-queues/

Custom Celery task states
https://www.distributedpython.com/2018/09/28/celery-task-states/

Distributed Task Locking in Celery
http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

distributedpython
https://www.distributedpython.com/

Don't keep important data in your Celery queue
https://www.caktusgroup.com/blog/2016/10/18/dont-keep-important-data-your-celery-queue/

Flask Celery task locking
https://stackoverflow.com/questions/53950548/flask-celery-task-locking

Should I use retry or acks_late?
https://docs.celeryproject.org/en/latest/faq.html#should-i-use-retry-or-acks-late

Working with Asynchronous Celery Tasks – lessons learned
https://blog.daftcode.pl/working-with-asynchronous-celery-tasks-lessons-learned-32bb7495586b

En savoir plus...

Celery Email Redis

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.