angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Cómo cancelar tareas con Python Asynchronous IO (AsyncIO)

AsyncIO 'create_task()' no inicia una tarea sino que la programa para su ejecución en el bucle de eventos.

2 mayo 2023
En Async
post main image
https://pixabay.com/users/stocksnap-894430

Para un proyecto estaba usando AIOHTTP para comprobar las respuestas de muchos sitios web remotos, URLs. Las URLs provenían de una lista. Esta lista puede contener duplicados.

Todo bien hasta que noté que algunas respuestas también tenían el código de estado HTTP-429 'Demasiadas peticiones'. Cualquiera que sea la razón, sobrecarga, seguridad, queremos comportarnos amigablemente y no queremos llamar URLs idénticas de nuevo, al menos por un tiempo mínimo. Como estamos usando AIOHTTP, muchas peticiones estarán esperando la conexión a internet, no estamos usando un flag para comprobar si pueden continuar.

Este post trata sobre abortar (cancelar) estas peticiones en espera usando el método 'cancel()'. No es sobre AIOHTTP o limitación de tasa, ese es otro tema.

Ejemplo: 3 tareas, sin cancelación

A continuación se muestra el código sin cancelar las tareas en espera. Aquí no uso AIOHTTP sino 'asyncio.sleep()'. Como queremos un comportamiento determinista, en lugar de aleatorio, nosotros mismos especificamos el tiempo de reposo de las tareas.

# 3 tasks, no cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            print(f'{f_main} task{task.id} done')
            result = task.result()
            print(f'{f_main} task{task.id} result: {result}')    
            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

Nada especial aquí, como era de esperar la tarea2 completó primero, seguida por la tarea1 y la tarea0. El resultado:

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 done
main       task2 result: normal return
main       wait: 0 done, 2 pending
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 1 pending
main       task1 done
main       task1 result: normal return
main       wait: 0 done, 1 pending
mytask0    after sleep(2), returning ...
main       wait: 1 done, 0 pending
main       task0 done
main       task0 result: normal return

Crear una tarea en AsyncIO no significa que la tarea se inicie.

Por el contrario, cuando se utiliza Multiprocessing o Treads, en AsyncIO no existe el método 'start()'.

Multiprocessing:

p = multiprocessing.Process(target=task, args ...)
p.start()

Threading:

t = threading.Thread(target=task, args ...)
t.start()

AsyncIO:

t = asyncio.create_task(task, args ...)

La documentación de AsyncIO no menciona cuando se inicia una tarea, o se ejecuta por primera vez, cuando se utiliza 'create_task()', a no ser que me haya perdido algo. Está programada para su ejecución, lo que significa que puede empezar a ejecutarse inmediatamente pero también algún tiempo después, cuando el bucle de eventos está 'esperando' alguna(s) co-rutina(s).

Esto es importante porque cuando cancelas una tarea, puede que aún no se haya ejecutado, lo que significa que la co-rutina de la tarea aún no ha sido llamada.

Diferentes estados de una tarea cuando se cancela una tarea

Con el método 'cancel()' de una tarea podemos marcar que una tarea debe ser cancelada. De lo anterior se deduce que cuando se aplica el método 'cancel()', el estado de la tarea puede ser:

  • La tarea no ha comenzado a ejecutarse todavía, o,
  • La tarea comenzó a ejecutarse y se completó, o,
  • La tarea comenzó a ejecutarse y está esperando en algún lugar

Cancelar una tarea que aún no ha comenzado a ejecutarse significa que la tarea NO se ejecutará. Cancelar una tarea que comenzó a ejecutarse y está en espera significa: abortar la tarea en espera. Si la tarea ya se ha completado, entonces la tarea no se marcará como cancelada.

Estas tres condiciones se muestran a continuación, estoy corriendo con AsyncIO en Python 3.10. ¡Debemos 'esperar' la tarea, de lo contrario los resultados no son correctos! Cuando se cancela una tarea, se genera un error asyncio.CancelledError. El parámetro 'msg' del método 'cancel()' sólo se propaga al bucle principal a partir de Python 3.11.

Cancelar una tarea no iniciada

# cancel a not started task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # cancel task before awaitable
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Se ha producido un CancelledError. Lo importante en el resultado es que la tarea no se ha iniciado:

main       create task
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Cancelar una tarea finalizada

Aquí añadimos un 'awaitable', 'asyncio.sleep()' después de crear la tarea:

# cancel a completed task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 3 sec, allow task to complete
    print(f'{f_main} before await sleep')
    await asyncio.sleep(3)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

La tarea terminó antes de ser cancelada. El resultado:

main       create task
main       before await sleep
mytask     before await sleep
mytask     after await sleep
main       after await sleep
main       task.done() = True
main       task.cancelled() = False
main       task.result(): normal return

Cancelar una tarea no completada (en espera)

Aquí cambiamos el tiempo de espera en el bucle principal de 3 segundos a 1 segundo antes de cancelar la tarea.

# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Resultado:

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Capturar el CancelledError en la tarea

También es posible capturar el error asyncio.CancelledError en la tarea. Tenga en cuenta que no puede confiar en los valores de retorno de una tarea, porque la tarea puede no iniciarse en absoluto, véase más arriba. Aquí está el código. Usando el último ejemplo anterior, añadimos el manejo de excepciones también a la tarea.

# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

El resultado. Observe que el parámetro 'msg' del método 'cancel()' se propaga a la tarea.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       task.done() = True
main       task.cancelled() = False
main       task.result(): CancelledError return

Si también quieres que el error cancelado se propague al bucle principal, puedes lanzar la excepción de nuevo en la tarea:

# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        # raise the exception again
        raise
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

En el resultado vemos el CancelledError primero en la tarea y luego en el bucle principal. De nuevo, como estamos en Python 3.10, el parámetro 'msg' del método 'cancel()' sólo se propaga a la tarea, no al bucle principal.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Ejemplo: 3 tareas, con cancelación

A continuación se muestra el código del inicio de este post, 'Ejemplo: 3 tareas, sin cancelación', modificado para cancelar tareas. En mi caso no me interesa capturar el CancelledError en una tarea, sólo quiero que se cancelen la(s) tarea(s). Volviendo a la pregunta original, cancelar una tarea cuando una determinada tarea devuelve un determinado valor (estado HTTP-429), cancelamos la tarea0 al recibir el resultado de la tarea2.

Si queremos obtener la razón de la cancelación, como se especifica en el método 'cancel()', podemos lanzar la excepción en el bucle llamando al método 'exception()'.

# 3 tasks, with cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            if task.cancelled():
                print(f'{f_main} task{task.id} was cancelled')
                # get cancel msg, python >= 3.11 only
                cancel_msg = None
                try:
                    task.exception()
                except asyncio.CancelledError as e:
                    cancel_msg = e
                print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
            else:
                print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')    

                # cancel task0 if task2 completed
                if task.id == 2:
                    print(f'{f_main} task{task.id} cancel task0')    
                    tasks[0].cancel(msg='my cancel reason')

            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

El resultado:

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 was not cancelled, result: normal return
main       task2 cancel task0
main       wait: 1 done, 1 pending
main       task0 was cancelled
main       task0 cancel_msg: 
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 0 pending
main       task1 was not cancelled, result: normal return

Resumen

Cancelar una tarea con AsyncIO es un poco confuso al principio. El método 'create_task()' de AsyncIO no inicia una tarea, sólo la programa para su ejecución. Lo principal a recordar es siempre 'esperar' una tarea antes de comprobar sus métodos 'done()' y 'cancelled()'. Dónde capturar el CancelledError, en el bucle principal o en la tarea, o en ambos, depende de tu aplicación. Y para obtener la razón de la cancelación en el bucle principal necesitas Python 3.11 arriba.

Enlaces / créditos

asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390

Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel

Leer más

Async

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.