angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Celery, Redis en het (in)beroemde e-mail taakvoorbeeld

Beginnen met Celery is eenvoudig. Maar voordat je Celery in productie neemt, moet je veel tijd besteden om het echt te begrijpen.

10 oktober 2020
In Celery
post main image
https://unsplash.com/@designnnco

Het kunnen uitvoeren van asynchrone taken vanuit uw webapplicatie is in veel gevallen een must. Een manier om dit te bereiken is het gebruik van Celery. Er zijn veel artikelen op het internet en er worden enkele voorbeelden gegeven. Ik vond Miguel echt goed. Grinberg's berichten over Celery. Heel duidelijk, dank u wel Miguel.

Maar Miguel's post en de meeste andere artikelen zijn slechts een inleiding. Wanneer u begint met Celery realiseert u zich plotseling dat het niet gemakkelijk is. In feite kun je je beter voorbereiden op veel leren voordat je het gebruikt in de productie.

In deze post schrijf ik enkele aantekeningen die bij de uitvoering van Celery met Flask en Redis naar voren kwamen. Het grootste deel hiervan is een samenvatting van de documenten die op de website van Celery en andere berichten op het internet te vinden zijn.

Vele anderen hebben geschreven over problemen en hoe ze die hebben opgelost

Iemand schreef dat wanneer je Celery in productie uitvoert, je eerste prioriteit is om een plan te hebben van wat je moet doen als je taken plotseling stoppen met werken. Ik wil toevoegen dat dit betekent dat je eerst moet weten hoe om te controleren of uw makelaar, werknemers en taken worden uitgevoerd.

Zoek op het internet naar 'Celery probleem' en je krijgt een overweldigende hoeveelheid pagina's. Veel van deze problemen worden al gedekt door de Celery documentatie, TL;DR. Een aantal problemen bij het verzenden van e-mail en hoe deze aan te pakken worden behandeld in een mooie post 'Werken met Asynchrone Celery Taken - geleerde lessen', zie de links hieronder. Ik stel ook voor dat je de website distributedpython bekijkt, zie de links hieronder. Er zijn hier veel goede recepten.

Wat is al dat gepraat over acks_late en apportaties?

Standaard zal Celery de taken onmiddellijk bevestigen. Dit betekent dat een taak wordt verwijderd uit de wachtrij van de makelaar net voordat deze wordt gestart. U kunt dit gedrag veranderen door acks_late op te geven, per taak of systeem. In dit geval erkent Celery een taak pas na succesvolle afronding.

Waarom is dit belangrijk?

Als een taak abrupt wordt onderbroken, dan zal Celery de taak standaard niet opnieuw proberen. Als acks_late is opgegeven, staat de taak nog steeds in de wachtrij en zal Celery de taak opnieuw proberen. In het eerste geval kun je de taak nog steeds opnieuw proberen door parameters als retry_kwargs op te geven.

Wat is dan het verschil?

Dit betekent dat acks_late de levering wijzigt van at-most-once naar at-least-once.

Maar acks_late werkt misschien niet zoals verwacht.

Hier moeten we een onderscheid maken tussen verschillende voorwaarden:

  • Sierlijk
    De beheerder stopt de werknemer met het SIGTERM signaal: kill <pid>.
    taak not acked
  • Abrupt
    De beheerder doodt de werknemer met behulp van het SIGKILL signaal: CTRL-C of met kill -9 <pid>,
    of,
    een systeemcrash (uit het geheugen, hardwarefout) beëindigt het worker proces.
    - Als het parent proces loopt:
    task acked
    - Als het parent proces niet loopt, bijv. stroomuitval):
    taak not acked (waarschijnlijk)

In het eerste geval wordt het arbeidersproces sierlijk gestopt en een lopende taak die wordt gestopt is niet acked. Dit komt omdat de beheerder de bedoeling had om de taakuitvoering (tijdelijk) te stoppen. Het opnieuw starten van de werker zal de taak opnieuw starten.

In het tweede geval met het lopende ouderproces ziet het ouderproces dat de werker verloren gaat, accepteert de taak en stelt de status in op FAILURE. In het geval dat de beheerder de werker heeft vermoord, was er waarschijnlijk een goede reden om dit te doen. In het geval van een systeemcrash willen we waarschijnlijk niet dat de taak opnieuw wordt opgestart. Als het ouderproces niet loopt, bijvoorbeeld omdat het plotseling is overleden, dan zal de taak waarschijnlijk niet acked krijgen.

Ideeënpotentie

Onze taken kunnen lopen onverwachtWe moeten er meerdere malen voor zorgen dat het resultaat niet verandert. Als een taak bijvoorbeeld al een e-mail heeft gestuurd, dan moet het opnieuw uitvoeren van dezelfde taak deze e-mail niet opnieuw versturen. Dit heet idempotency. Draai meerdere keren zonder het resultaat te wijzigen buiten de oorspronkelijke toepassing (Wikipedia).

Acks_late versus opnieuw proberen

Stel dat een taak bestaat uit het sturen van een e-mail naar een externe dienst die tijdelijk niet bereikbaar is. De taak zal proberen verbinding te maken, maar de verbinding zal time-out zijn. Als we het standaard gedrag gebruiken, zal Celery de mislukte taak markeren. Het zal niet opnieuw worden uitgevoerd.

Maar wacht. Zonder acks_late kunnen we ook Celery vertellen de taak opnieuw te proberen. Wat moet je gebruiken? Hangt ervan af. Lees de documentatie op de website van Celery 'Should I use retry or acks_late?'.

Hoe zorg je ervoor dat een e-mail slechts één keer wordt verstuurd?

Ervan uitgaande dat dezelfde taak meerdere keren kan worden uitgevoerd, kunnen we alleen voorkomen dat een e-mail twee keer of meer wordt verzonden door een globale vlag te hebben voor de taken, buiten de taak om. Ik heb al een tabel send_email_table gemaakt die records bevat met (meta)gegevens voor elke e-mail. We voegen gewoon een nieuwe (Booleaanse) kolom EmailHasBeenSent toe. Dit betekent ook dat we een nieuw send_email_table record moeten invoegen voordat (!) de taak wordt opgeroepen. Zonder gebruik te maken van taken zag mijn e-mailverzendingsfunctie er zo uit:

class EmailSender:

    def send(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

Na het maken van wijzigingen om een taak te gebruiken voor het verzenden van een e-mail:

class EmailSender:

    def sender_init_function(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        return  send_email_table_record_id

    def sender_send(send_email_table_record_id):
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

En de code om een e-mail te versturen:

def send_email(**kwargs):

    email_sender = EmailSender()
     send_email_table_record_id = email_sender.sender_init(**kwargs)

    # pass to task using apply_async
	kwargs  = {}
	kwargs['send_email_table_record_id'] =  send_email_table_record_id
    task_email_sender_sender_send.apply_async(queue=celery_task_queue,  kwargs=kwargs)


@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

    email_sender = EmailSender()
    id = email_sender.sender_send(kwargs['send_email_table_record_id'])

De sender_send() methode controleert de vlag EmailHasBeenSent in het record dat kan worden opgehaald met het id. Als dat waar is, is de e-mail al verzonden en komt de taak terug zonder iets te doen. Anders zal de taak de e-mail versturen en deze vlag True instellen na het versturen van de e-mail.

Hoe zit het met ophalen?

Laten we aannemen dat onze SMTP provider enige tijd niet beschikbaar is. Celery maakt het zeer eenvoudig om een taak opnieuw te proberen in geval van een uitzondering. We kunnen de uitzondering zelf opheffen en vangen of de eenvoudigere autoretry_for parameter gebruiken. Van de Celery website:

@celery.task(bind=True,  autoretry_for=(Exception,),  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):

    if error_sending_email:
        raise Exception(f'error sending email')
    # done

Voor opvragingen zijn er een aantal parameters die u kunt gebruiken. Mijn instelling (tijdens de ontwikkeling):

retry_backoff  = 2
retry_backoff_max = 10 * 60
retry_kwargs  = {'max_retries': 5}

retry_backoff betekent dat de eerste poging 2 seconden vertraagt, de tweede 4 seconden, de volgende pogingen 8, 16, 32, etc. seconden. retry_backoff_max specificeert de maximale vertraging tussen taakopnames, in dit geval een uur.
max_retries is het maximale aantal pogingen.

Redis en de visibility_timeout

Onze e-mail taak loopt een korte tijd, misschien 10 seconden, zelfs als de SMTP provider is down. Maar wat gebeurt er als we een taak hebben die erg lang duurt? Van de Celery website: De zichtbaarheids-time-out bepaalt het aantal seconden dat moet worden gewacht tot de werknemer de taak heeft bevestigd voordat het bericht opnieuw aan een andere werknemer wordt geleverd. De standaardwaarde van de visibility_timeout is één uur. Dit kan een probleem worden als uw lang lopende taak acks_late gebruikt. Als je taak langer loopt dan de visibility_timeout, wordt deze opnieuw geleverd en heb je plotseling twee taken die hetzelfde doen. Je kunt de visibility_timeout verhogen, maar het is beter om de taakuitvoering atomair te maken, bijvoorbeeld:

@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

	if get_lock('task_email_sender_sender_send'):
		# we were able to lock the task
	
		email_sender = EmailSender()
		id = email_sender.sender_send(kwargs['id'])
		...
		release_lock()

	else:
		# we could not lock the task
		# retry

Het lijkt erop dat dit alleen een probleem kan zijn als we acks_late gebruiken voor een taak. Voor de e-mail taak gebruik ik geen acks_late.

Stel een time-out in voor de taak

Alles kan fout gaan en misschien komen een of meer taken ineens wel nooit meer terug. Dit zal alle andere taken blokkeren of vertragen. Om dit te voorkomen, kunnen we een time-out instellen voor een taak. Wanneer de task_soft_time_limit afloopt wordt een SoftTimeLimitExceeded uitzondering verhoogd. Wanneer de task_time_limit afloopt, wordt de taak gedood en vervangen door een nieuwe.

@celery.task(bind=True,  task_time_limit=150,  task_soft_time_limit=120, ...)
def task_email_sender_sender_send(self, **kwargs):

    try:
        a_long_time_to_finish()
    except  SoftTimeLimitExceeded:
        recover()

Tariefbeperking, voorkom overbelasting van de SMTP provider

Stel dat de SMTP provider voor een lange tijd offline is of er is een crash in uw systeem en het duurt een paar uur om het te repareren. Nadat het probleem is opgelost, worden uw e-mails op maximale snelheid verzonden. Dit kan de grenzen van uw SMTP provider overschrijden. Gelukkig kunnen we een rate_limit instellen voor een taak, bijvoorbeeld een limiet van 50 taken per minuut:

@celery.task(bind=True,  autoretry_for=(Exception,), rate_limit='50/m',  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):
    ....

Dit is een tariefgrens per werknemer, en geen globale tariefgrens. Als u twee arbeiders hebt die e-mail versturen, kunt u de rate_limit instellen op '25/m'.

De tijdstempel wanneer de e-mail in eerste instantie werd verzonden kan belangrijk zijn

In de functie block_check(), zie hierboven, wordt elk e-mailadres van de ontvanger gecontroleerd aan de hand van een toestemmingslijst en een weigeringslijst. Maar er zijn meer controles. Zo is bijvoorbeeld het aantal berichten dat naar een ontvanger wordt gestuurd beperkt tot X per Y-minuut. Als er ergens een probleem is, bijvoorbeeld de werknemer is gecrasht, en vervolgens na enkele uren opnieuw gestart, dan worden alle in de wachtrij geplaatste e-mails een voor een verzonden. Als we de tijdstempel van het moment van verzending zouden gebruiken, is de kans groot dat een e-mail wordt geblokkeerd.

Wat we moeten doen is de tijdstempel gebruiken wanneer het bericht in de wachtrij voor de taak is geplaatst. Gelukkig hebben al mijn database records een created_on timestamp en weten we al dat we een send_mail_table record hebben gemaakt voordat (!) het bericht naar de taakwachtrij werd gestuurd. In de taak kunnen we deze created_on timestamp gebruiken om te controleren op tijdslimieten.

Bijlagen toevoegen

We kunnen onze e-mails versturen, maar hoe zit het met de bijlagen? Als u uw applicatie en medewerkers op dezelfde server draait dan is er geen probleem, u passeert gewoon de links of paden van de bestanden. Maar wat als u uw webapplicatie op server X draait en werkers op server Y? Een manier is om een gedeeld volume van bijlagen te gebruiken die je ergens hebt gemaakt. Een andere manier is om de bijlage bestanden op te slaan in de send_email_table records. Dit is duur in CPU cycli en geheugen, maar het voordeel is dat alle gegevens in de database staan en de medewerker altijd toegang heeft tot de bijlagen.

Kunnen e-mails verloren gaan?

Zoals vermeld in de onderstaande links: Bewaar geen belangrijke gegevens in uw Celery wachtrij. Gelukkig ben ik al bezig met het invoegen van een send_mail_table record met de e-mailparameters voordat (!) de taak wordt opgeroepen. Er staat ook een vlag EmailHasBeenSent in dit record. Dit betekent dat het altijd mogelijk is om alle e-mails die om welke reden dan ook niet zijn verzonden opnieuw te verzenden.

Stel Celery globale instellingen in.

We kunnen de Celery parameters instellen met de task decorator, maar het kan een goed idee zijn om de standaardinstellingen voor al uw taken in te stellen.

    TaskBase = celery.Task
    class ContextTask(TaskBase):

        time_limit = 40
        soft_time_limit = 30
         autoretry_for  = (Exception,  SoftTimeLimitExceeded, TimeLimitExceeded)
         retry_backoff  = 2
         retry_backoff_max = 10 * 60
         retry_kwargs  = {'max_retries': 5}
        retry_jitter = False

Samenvatting

Enige tijd geleden dacht ik, laten we Celery toevoegen aan mijn sollicitatie. Een week, misschien twee, moet mogelijk zijn. Ja, het was in deze tijd al in werking, maar er was nog zoveel te leren. De e-mail voorbeelden op het internet zijn slechts een introductie. Het gebruik van Celery voelt als het leren van een andere taal. Ik denk dat ik het nu begrijp. In het begin zijn er zoveel parameters, voorwaarden. Niet lezen helpt niet. Veel problemen die op het internet worden genoemd zijn afkomstig van mensen die de veelal uitstekende documentatie op de Celery website niet hebben gelezen. Ja, er zijn enkele inconsistenties, maar ze werken eraan. Hoe dan ook, Celery is up and running en ik leer betere taken te maken.

Links / credits

acks_late option seems to have no effect
https://celery-users.narkive.com/Cv8Juuxs/acks-late-option-seems-to-have-no-effect

Celery task exceptions and automatic retries
https://www.distributedpython.com/2018/09/04/error-handling-retry/

Celery throttling - setting rate limit for queues
https://alievmagomed.com/celery-throttling-setting-rate-limit-for-queues/

Custom Celery task states
https://www.distributedpython.com/2018/09/28/celery-task-states/

Distributed Task Locking in Celery
http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

distributedpython
https://www.distributedpython.com/

Don't keep important data in your Celery queue
https://www.caktusgroup.com/blog/2016/10/18/dont-keep-important-data-your-celery-queue/

Flask Celery task locking
https://stackoverflow.com/questions/53950548/flask-celery-task-locking

Should I use retry or acks_late?
https://docs.celeryproject.org/en/latest/faq.html#should-i-use-retry-or-acks-late

Working with Asynchronous Celery Tasks – lessons learned
https://blog.daftcode.pl/working-with-asynchronous-celery-tasks-lessons-learned-32bb7495586b

Laat een reactie achter

Reageer anoniem of log in om commentaar te geven.

Opmerkingen

Laat een antwoord achter

Antwoord anoniem of log in om te antwoorden.