angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Как отменить задания с помощью Python Асинхронный ввод-вывод (AsyncIO)

AsyncIO 'create_task()' не запускает задачу, а планирует ее выполнение в цикле событий.

2 мая 2023
В Async
post main image
https://pixabay.com/users/stocksnap-894430

Для одного проекта я использовал AIOHTTP для проверки ответов многих удаленных веб-сайтов, URL. URL-адреса поступали из списка. Этот список может содержать дубликаты.

Все было хорошо, пока я не заметил, что некоторые ответы также имели код состояния: HTTP-429 'Слишком много запросов'. Какова бы ни была причина, перегрузка, безопасность, мы хотим вести себя дружелюбно и не хотим вызывать идентичные URL снова, по крайней мере, в течение минимального времени. Поскольку мы используем AIOHTTP, многие запросы будут ожидать подключения к интернету, мы не используем флаг для проверки возможности продолжения.

Этот пост посвящен прерыванию (отмене) этих ожидающих запросов с помощью метода 'cancel()'. Это не относится к AIOHTTP или ограничению скорости, это другая тема.

Пример: 3 задачи, без отмены

Ниже приведен код без отмены ожидающих заданий. Здесь я использую не AIOHTTP , а 'asyncio.sleep()'. Поскольку мы хотим получить детерминированное поведение, а не случайное, мы сами указываем время сна задачи.

# 3 tasks, no cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            print(f'{f_main} task{task.id} done')
            result = task.result()
            print(f'{f_main} task{task.id} result: {result}')    
            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

Здесь нет ничего особенного, как и ожидалось, первой завершилась задача2, за ней последовали задача1 и задача0. Результат:

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 done
main       task2 result: normal return
main       wait: 0 done, 2 pending
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 1 pending
main       task1 done
main       task1 result: normal return
main       wait: 0 done, 1 pending
mytask0    after sleep(2), returning ...
main       wait: 1 done, 0 pending
main       task0 done
main       task0 result: normal return

Создание задачи в AsyncIO не означает, что задача запущена.

Напротив, при использовании Multiprocessing или Treads, в AsyncIO нет метода 'start()'.

Multiprocessing:

p = multiprocessing.Process(target=task, args ...)
p.start()

Threading:

t = threading.Thread(target=task, args ...)
t.start()

AsyncIO:

t = asyncio.create_task(task, args ...)

В документации по AsyncIO не упоминается, когда задача запускается или выполняется в первый раз при использовании 'create_task()', если я ничего не упустил. Она запланирована на выполнение, что означает, что она может начать выполняться немедленно, но также и некоторое время спустя, когда цикл событий "ожидает" некоторую со-программу(ы).

Это важно, потому что когда вы отменяете задачу, она может быть еще не запущена, то есть со-программа задачи еще не была вызвана.

Различные состояния задачи при отмене задачи

С помощью метода 'cancel()' задачи мы можем отметить, что задача должна быть отменена. Из вышесказанного следует, что при применении метода 'cancel()' состояние задачи может быть следующим:

  • Задача еще не начала выполняться, или,
  • Задача начала выполняться и завершилась, или,
  • Задача запущена и находится в ожидании.

Отмена задачи, которая еще не начала выполняться, означает: задача НЕ будет выполнена. Отмена задачи, которая начала выполняться и находится в ожидании, означает: прервать ожидающую задачу. Если задача уже завершена, то она не будет помечена как отмененная.

Эти три условия показаны ниже, я работаю с AsyncIO на Python 3.10. Мы должны "ожидать" задачу, иначе результаты будут некорректными! Когда задача отменяется, возникает ошибка asyncio.CancelledError. Параметр 'msg'-метода 'cancel()' передается в главный цикл только начиная с Python 3.11.

Отмена незапущенной задачи

# cancel a not started task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # cancel task before awaitable
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Возникла ошибка CancelledError. Важным в результате является то, что задача не запущена:

main       create task
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Отмена завершенной задачи

Здесь мы добавляем 'awaitable', 'asyncio.sleep()' после создания задачи:

# cancel a completed task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 3 sec, allow task to complete
    print(f'{f_main} before await sleep')
    await asyncio.sleep(3)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Задача завершилась до того, как была отменена. Результат:

main       create task
main       before await sleep
mytask     before await sleep
mytask     after await sleep
main       after await sleep
main       task.done() = True
main       task.cancelled() = False
main       task.result(): normal return

Отмена незавершенной (ожидающей) задачи

Здесь мы изменяем время сна в главном цикле с 3 секунд на 1 секунду перед отменой задачи.

# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Результат:

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Перехват ошибки CancelledError в задаче

Также можно перехватить ошибку asyncio.CancelledError в задаче. Обратите внимание, что нельзя полагаться на возвращаемые значения из задачи, поскольку задача может вообще не запуститься, см. выше. Вот код. Используя последний пример выше, мы добавляем обработку исключений также в задачу.

# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Результат. Обратите внимание, что параметр 'msg'-метода 'cancel()' передается в задачу.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       task.done() = True
main       task.cancelled() = False
main       task.result(): CancelledError return

Если вы также хотите, чтобы ошибка CancelledError передавалась в главный цикл, то вы можете снова поднять исключение в задаче:

# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        # raise the exception again
        raise
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

В результате мы видим CancelledError сначала в задаче, а затем в главном цикле. Опять же, поскольку мы находимся на Python 3.10, 'msg'-параметр метода 'cancel()' передается только в задачу, а не в главный цикл.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Пример: 3 задачи с отменой

Ниже приведен код из начала этого поста, "Пример: 3 задачи, без отмены", измененный для отмены задач. В моем случае я не заинтересован в перехвате CancelledError в задаче, я просто хочу, чтобы задача(и) была отменена. Возвращаясь к исходному вопросу, отменяем задачу, когда определенная задача возвращает определенное значение (статус HTTP-429), мы отменяем задачу0 при получении результата задачи2.

Если мы хотим получить причину отмены, указанную в методе 'cancel()', мы можем поднять исключение в цикле, вызвав метод 'exception()'.

# 3 tasks, with cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            if task.cancelled():
                print(f'{f_main} task{task.id} was cancelled')
                # get cancel msg, python >= 3.11 only
                cancel_msg = None
                try:
                    task.exception()
                except asyncio.CancelledError as e:
                    cancel_msg = e
                print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
            else:
                print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')    

                # cancel task0 if task2 completed
                if task.id == 2:
                    print(f'{f_main} task{task.id} cancel task0')    
                    tasks[0].cancel(msg='my cancel reason')

            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

Результат:

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 was not cancelled, result: normal return
main       task2 cancel task0
main       wait: 1 done, 1 pending
main       task0 was cancelled
main       task0 cancel_msg: 
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 0 pending
main       task1 was not cancelled, result: normal return

Резюме

Отмена задачи с помощью AsyncIO сначала немного запутана. Метод AsyncIO 'create_task()' не запускает задачу, а только планирует ее выполнение. Главное помнить, что всегда нужно "ожидать" задачу перед проверкой ее методов 'done()' и 'cancelled()'. Где перехватывать ошибку CancelledError, в главном цикле или в задаче, или в обоих, зависит от вашего приложения. А чтобы получить причину отмены в главном цикле, вам нужно Python 3.11 вверх.

Ссылки / кредиты

asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390

Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel

Подробнее

Async

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.