angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Comment annuler des tâches avec Python Asynchronous IO (AsyncIO)

AsyncIO 'create_task()' ne démarre pas une tâche mais planifie son exécution dans la boucle d'événements.

2 mai 2023
Dans Async
post main image
https://pixabay.com/users/stocksnap-894430

Dans le cadre d'un projet, j'ai utilisé AIOHTTP pour vérifier les réponses de plusieurs sites web distants, URLs. Les URL proviennent d'une liste. Cette liste peut contenir des doublons.

Tout allait bien jusqu'à ce que je remarque que certaines réponses avaient également un code de statut : HTTP-429 'Too Many Requests'. Quelle qu'en soit la raison, surcharge, sécurité, nous voulons nous comporter de manière conviviale et ne pas rappeler des URL identiques, au moins pendant un temps minimum. Parce que nous utilisons AIOHTTP, de nombreuses requêtes attendront la connexion internet, nous n'utilisons pas de drapeau pour vérifier si l'on peut continuer.

Cet article traite de l'interruption (l'annulation) de ces requêtes en attente à l'aide de la méthode 'cancel()'. Il ne s'agit pas de AIOHTTP ou de limitation de débit, c'est un autre sujet.

Exemple : 3 tâches, pas d'annulation

Voici le code sans annulation des tâches en attente. Je n'utilise pas AIOHTTP ici mais 'asyncio.sleep()'. Parce que nous voulons un comportement déterministe, par opposition à un comportement aléatoire, nous spécifions nous-mêmes le temps de sommeil des tâches.

# 3 tasks, no cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            print(f'{f_main} task{task.id} done')
            result = task.result()
            print(f'{f_main} task{task.id} result: {result}')    
            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

Rien de spécial ici, comme prévu la tâche2 s'est terminée en premier, suivie de la tâche1 et de la tâche0. Le résultat :

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 done
main       task2 result: normal return
main       wait: 0 done, 2 pending
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 1 pending
main       task1 done
main       task1 result: normal return
main       wait: 0 done, 1 pending
mytask0    after sleep(2), returning ...
main       wait: 1 done, 0 pending
main       task0 done
main       task0 result: normal return

La création d'une tâche en AsyncIO ne signifie pas que la tâche est lancée.

En revanche, lorsque l'on utilise Multiprocessing ou Treads, il n'y a pas de méthode 'start()' dans AsyncIO.

Multiprocessing:

p = multiprocessing.Process(target=task, args ...)
p.start()

Threading :

t = threading.Thread(target=task, args ...)
t.start()

AsyncIO :

t = asyncio.create_task(task, args ...)

La documentation AsyncIO ne mentionne pas quand une tâche est démarrée, ou exécutée pour la première fois, lors de l'utilisation de 'create_task()', à moins que j'aie manqué quelque chose. Son exécution est programmée, ce qui signifie qu'elle peut commencer à s'exécuter immédiatement, mais aussi un peu plus tard, lorsque la boucle d'événements "attend" un ou plusieurs co-routines.

Ceci est important car lorsque vous annulez une tâche, il se peut qu'elle n'ait pas encore été exécutée, ce qui signifie que la co-routine de la tâche n'a pas encore été appelée.

Différents états d'une tâche lors de son annulation

La méthode "cancel()" d'une tâche permet de signaler qu'une tâche doit être annulée. D'après ce qui précède, cela signifie que lorsque la méthode "cancel()" est appliquée, l'état de la tâche peut être le suivant :

  • La tâche n'a pas encore commencé à s'exécuter, ou,
  • La tâche a commencé à s'exécuter et s'est achevée, ou,
  • La tâche a commencé à s'exécuter et attend quelque part.

L'annulation d'une tâche qui n'a pas encore commencé à s'exécuter signifie que la tâche ne s'exécutera PAS. L'annulation d'une tâche qui a commencé à s'exécuter et qui est en attente signifie : abandonner la tâche en attente. Si la tâche est déjà terminée, elle ne sera pas considérée comme annulée.

Ces trois conditions sont illustrées ci-dessous, je fonctionne avec AsyncIO sur Python 3.10. Nous devons "attendre" la tâche, sinon les résultats ne sont pas corrects ! Lorsqu'une tâche est annulée, une erreur asyncio.CancelledError est levée. Le paramètre 'msg' de la méthode 'cancel()' n'est propagé à la boucle principale qu'à partir de Python 3.11.

Annuler une tâche non démarrée

# cancel a not started task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # cancel task before awaitable
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Une erreur CancelledError a été soulevée. Ce qui est important dans le résultat, c'est que la tâche n'est pas démarrée :

main       create task
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Annuler une tâche terminée

Ici, nous ajoutons un 'awaitable', 'asyncio.sleep()' après avoir créé la tâche :

# cancel a completed task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 3 sec, allow task to complete
    print(f'{f_main} before await sleep')
    await asyncio.sleep(3)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

La tâche s'est terminée avant d'être annulée. Le résultat :

main       create task
main       before await sleep
mytask     before await sleep
mytask     after await sleep
main       after await sleep
main       task.done() = True
main       task.cancelled() = False
main       task.result(): normal return

Annulation d'une tâche non terminée (en attente)

Ici, nous modifions le temps de sommeil dans la boucle principale de 3 secondes à 1 seconde avant d'annuler la tâche.

# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Le résultat :

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Capture de l'erreur CancelledError dans la tâche

Il est également possible de capturer l'erreur asyncio.CancelledError dans la tâche. Notez que vous ne pouvez pas vous fier aux valeurs de retour d'une tâche, car la tâche peut ne pas démarrer du tout, voir ci-dessus. Voici le code. En utilisant le dernier exemple ci-dessus, nous ajoutons également la gestion des exceptions à la tâche.

# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Voici le résultat. Notez que le paramètre 'msg' de la méthode 'cancel()' est propagé à la tâche.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       task.done() = True
main       task.cancelled() = False
main       task.result(): CancelledError return

Si vous voulez que l'erreur annulée se propage également à la boucle principale, vous pouvez lever l'exception à nouveau dans la tâche :

# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        # raise the exception again
        raise
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Dans le résultat, nous voyons l'exception CancelledError d'abord dans la tâche, puis dans la boucle principale. Encore une fois, parce que nous sommes sur Python 3.10, le paramètre 'msg' de la méthode 'cancel()' est seulement propagé à la tâche, pas à la boucle principale.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Exemple : 3 tâches, avec annulation

Voici le code du début de ce billet, "Exemple : 3 tâches, pas d'annulation", modifié pour annuler des tâches. Dans mon cas, je ne suis pas intéressé par la capture de CancelledError dans une tâche, je veux juste que la (les) tâche(s) soit(ent) annulée(s). Pour en revenir à la question initiale, annuler une tâche lorsqu'une certaine tâche renvoie une certaine valeur (statut HTTP-429), nous annulons la tâche0 lorsque nous recevons le résultat de la tâche2.

Si nous voulons obtenir la raison de l'annulation, comme spécifié dans la méthode 'cancel()', nous pouvons lever l'exception dans la boucle en appelant la méthode 'exception()'.

# 3 tasks, with cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            if task.cancelled():
                print(f'{f_main} task{task.id} was cancelled')
                # get cancel msg, python >= 3.11 only
                cancel_msg = None
                try:
                    task.exception()
                except asyncio.CancelledError as e:
                    cancel_msg = e
                print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
            else:
                print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')    

                # cancel task0 if task2 completed
                if task.id == 2:
                    print(f'{f_main} task{task.id} cancel task0')    
                    tasks[0].cancel(msg='my cancel reason')

            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

Le résultat :

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 was not cancelled, result: normal return
main       task2 cancel task0
main       wait: 1 done, 1 pending
main       task0 was cancelled
main       task0 cancel_msg: 
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 0 pending
main       task1 was not cancelled, result: normal return

Résumé

L'annulation d'une tâche avec l'AsyncIO est un peu déroutante au début. La méthode AsyncIO "create_task()" ne démarre pas une tâche, elle ne fait que planifier son exécution. La principale chose à retenir est qu'il faut toujours attendre une tâche avant de vérifier ses méthodes "done()" et "cancelled()". L'endroit où capturer l'erreur annulée (CancelledError), dans la boucle principale ou dans la tâche, ou les deux, dépend de votre application. Et pour obtenir la raison de l'annulation dans la boucle principale, vous avez besoin du fichier Python 3.11.

Liens / crédits

asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390

Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel

En savoir plus...

Async

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.