Comment annuler des tâches avec Python Asynchronous IO (AsyncIO)
AsyncIO 'create_task()' ne démarre pas une tâche mais planifie son exécution dans la boucle d'événements.

Dans le cadre d'un projet, j'ai utilisé AIOHTTP pour vérifier les réponses de plusieurs sites web distants, URLs. Les URL proviennent d'une liste. Cette liste peut contenir des doublons.
Tout allait bien jusqu'à ce que je remarque que certaines réponses avaient également un code de statut : HTTP-429 'Too Many Requests'. Quelle qu'en soit la raison, surcharge, sécurité, nous voulons nous comporter de manière conviviale et ne pas rappeler des URL identiques, au moins pendant un temps minimum. Parce que nous utilisons AIOHTTP, de nombreuses requêtes attendront la connexion internet, nous n'utilisons pas de drapeau pour vérifier si l'on peut continuer.
Cet article traite de l'interruption (l'annulation) de ces requêtes en attente à l'aide de la méthode 'cancel()'. Il ne s'agit pas de AIOHTTP ou de limitation de débit, c'est un autre sujet.
Exemple : 3 tâches, pas d'annulation
Voici le code sans annulation des tâches en attente. Je n'utilise pas AIOHTTP ici mais 'asyncio.sleep()'. Parce que nous voulons un comportement déterministe, par opposition à un comportement aléatoire, nous spécifions nous-mêmes le temps de sommeil des tâches.
# 3 tasks, no cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
print(f'{f_main} task{task.id} done')
result = task.result()
print(f'{f_main} task{task.id} result: {result}')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Rien de spécial ici, comme prévu la tâche2 s'est terminée en premier, suivie de la tâche1 et de la tâche0. Le résultat :
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 done
main task2 result: normal return
main wait: 0 done, 2 pending
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 1 pending
main task1 done
main task1 result: normal return
main wait: 0 done, 1 pending
mytask0 after sleep(2), returning ...
main wait: 1 done, 0 pending
main task0 done
main task0 result: normal return
La création d'une tâche en AsyncIO ne signifie pas que la tâche est lancée.
En revanche, lorsque l'on utilise Multiprocessing ou Treads, il n'y a pas de méthode 'start()' dans AsyncIO.
Multiprocessing:
p = multiprocessing.Process(target=task, args ...)
p.start()
Threading :
t = threading.Thread(target=task, args ...)
t.start()
AsyncIO :
t = asyncio.create_task(task, args ...)
La documentation AsyncIO ne mentionne pas quand une tâche est démarrée, ou exécutée pour la première fois, lors de l'utilisation de 'create_task()', à moins que j'aie manqué quelque chose. Son exécution est programmée, ce qui signifie qu'elle peut commencer à s'exécuter immédiatement, mais aussi un peu plus tard, lorsque la boucle d'événements "attend" un ou plusieurs co-routines.
Ceci est important car lorsque vous annulez une tâche, il se peut qu'elle n'ait pas encore été exécutée, ce qui signifie que la co-routine de la tâche n'a pas encore été appelée.
Différents états d'une tâche lors de son annulation
La méthode "cancel()" d'une tâche permet de signaler qu'une tâche doit être annulée. D'après ce qui précède, cela signifie que lorsque la méthode "cancel()" est appliquée, l'état de la tâche peut être le suivant :
- La tâche n'a pas encore commencé à s'exécuter, ou,
- La tâche a commencé à s'exécuter et s'est achevée, ou,
- La tâche a commencé à s'exécuter et attend quelque part.
L'annulation d'une tâche qui n'a pas encore commencé à s'exécuter signifie que la tâche ne s'exécutera PAS. L'annulation d'une tâche qui a commencé à s'exécuter et qui est en attente signifie : abandonner la tâche en attente. Si la tâche est déjà terminée, elle ne sera pas considérée comme annulée.
Ces trois conditions sont illustrées ci-dessous, je fonctionne avec AsyncIO sur Python 3.10. Nous devons "attendre" la tâche, sinon les résultats ne sont pas corrects ! Lorsqu'une tâche est annulée, une erreur asyncio.CancelledError est levée. Le paramètre 'msg' de la méthode 'cancel()' n'est propagé à la boucle principale qu'à partir de Python 3.11.
Annuler une tâche non démarrée
# cancel a not started task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# cancel task before awaitable
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Une erreur CancelledError a été soulevée. Ce qui est important dans le résultat, c'est que la tâche n'est pas démarrée :
main create task
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Annuler une tâche terminée
Ici, nous ajoutons un 'awaitable', 'asyncio.sleep()' après avoir créé la tâche :
# cancel a completed task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 3 sec, allow task to complete
print(f'{f_main} before await sleep')
await asyncio.sleep(3)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
La tâche s'est terminée avant d'être annulée. Le résultat :
main create task
main before await sleep
mytask before await sleep
mytask after await sleep
main after await sleep
main task.done() = True
main task.cancelled() = False
main task.result(): normal return
Annulation d'une tâche non terminée (en attente)
Ici, nous modifions le temps de sommeil dans la boucle principale de 3 secondes à 1 seconde avant d'annuler la tâche.
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Le résultat :
main create task
main before await sleep
mytask before await sleep
main after await sleep
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Capture de l'erreur CancelledError dans la tâche
Il est également possible de capturer l'erreur asyncio.CancelledError dans la tâche. Notez que vous ne pouvez pas vous fier aux valeurs de retour d'une tâche, car la tâche peut ne pas démarrer du tout, voir ci-dessus. Voici le code. En utilisant le dernier exemple ci-dessus, nous ajoutons également la gestion des exceptions à la tâche.
# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Voici le résultat. Notez que le paramètre 'msg' de la méthode 'cancel()' est propagé à la tâche.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main task.done() = True
main task.cancelled() = False
main task.result(): CancelledError return
Si vous voulez que l'erreur annulée se propage également à la boucle principale, vous pouvez lever l'exception à nouveau dans la tâche :
# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
# raise the exception again
raise
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Dans le résultat, nous voyons l'exception CancelledError d'abord dans la tâche, puis dans la boucle principale. Encore une fois, parce que nous sommes sur Python 3.10, le paramètre 'msg' de la méthode 'cancel()' est seulement propagé à la tâche, pas à la boucle principale.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Exemple : 3 tâches, avec annulation
Voici le code du début de ce billet, "Exemple : 3 tâches, pas d'annulation", modifié pour annuler des tâches. Dans mon cas, je ne suis pas intéressé par la capture de CancelledError dans une tâche, je veux juste que la (les) tâche(s) soit(ent) annulée(s). Pour en revenir à la question initiale, annuler une tâche lorsqu'une certaine tâche renvoie une certaine valeur (statut HTTP-429), nous annulons la tâche0 lorsque nous recevons le résultat de la tâche2.
Si nous voulons obtenir la raison de l'annulation, comme spécifié dans la méthode 'cancel()', nous pouvons lever l'exception dans la boucle en appelant la méthode 'exception()'.
# 3 tasks, with cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
if task.cancelled():
print(f'{f_main} task{task.id} was cancelled')
# get cancel msg, python >= 3.11 only
cancel_msg = None
try:
task.exception()
except asyncio.CancelledError as e:
cancel_msg = e
print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
else:
print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')
# cancel task0 if task2 completed
if task.id == 2:
print(f'{f_main} task{task.id} cancel task0')
tasks[0].cancel(msg='my cancel reason')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Le résultat :
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 was not cancelled, result: normal return
main task2 cancel task0
main wait: 1 done, 1 pending
main task0 was cancelled
main task0 cancel_msg:
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 0 pending
main task1 was not cancelled, result: normal return
Résumé
L'annulation d'une tâche avec l'AsyncIO est un peu déroutante au début. La méthode AsyncIO "create_task()" ne démarre pas une tâche, elle ne fait que planifier son exécution. La principale chose à retenir est qu'il faut toujours attendre une tâche avant de vérifier ses méthodes "done()" et "cancelled()". L'endroit où capturer l'erreur annulée (CancelledError), dans la boucle principale ou dans la tâche, ou les deux, dépend de votre application. Et pour obtenir la raison de l'annulation dans la boucle principale, vous avez besoin du fichier Python 3.11.
Liens / crédits
asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390
Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
En savoir plus...
Async
Récent
- Obtenir une liste des vidéos YouTube d'une personne
- De Docker-Composer à Docker Swarm : Configs
- Docker-Composer des projets avec des noms de services identiques
- X automatisation du web et scraping avec Selenium
- Aiohttp avec serveurs DNS personnalisés, Unbound et Docker
- Renvoyer uniquement les valeurs d'une liste d'enregistrements de FastAPI
Les plus consultés
- Utiliser UUIDs au lieu de Integer Autoincrement Primary Keys avec SQLAlchemy et MariaDb
- Utilisation des Python's pyOpenSSL pour vérifier les certificats SSL téléchargés d'un hôte
- Utiliser PyInstaller et Cython pour créer un exécutable Python
- Connexion à un service sur un hôte Docker à partir d'un conteneur Docker
- SQLAlchemy : Utilisation de Cascade Deletes pour supprimer des objets connexes
- Flask RESTful API validation des paramètres de la requête avec les schémas Marshmallow