Как отменить задания с помощью Python Асинхронный ввод-вывод (AsyncIO)
AsyncIO 'create_task()' не запускает задачу, а планирует ее выполнение в цикле событий.
Для одного проекта я использовал AIOHTTP для проверки ответов многих удаленных веб-сайтов, URL. URL-адреса поступали из списка. Этот список может содержать дубликаты.
Все было хорошо, пока я не заметил, что некоторые ответы также имели код состояния: HTTP-429 'Слишком много запросов'. Какова бы ни была причина, перегрузка, безопасность, мы хотим вести себя дружелюбно и не хотим вызывать идентичные URL снова, по крайней мере, в течение минимального времени. Поскольку мы используем AIOHTTP, многие запросы будут ожидать подключения к интернету, мы не используем флаг для проверки возможности продолжения.
Этот пост посвящен прерыванию (отмене) этих ожидающих запросов с помощью метода 'cancel()'. Это не относится к AIOHTTP или ограничению скорости, это другая тема.
Пример: 3 задачи, без отмены
Ниже приведен код без отмены ожидающих заданий. Здесь я использую не AIOHTTP , а 'asyncio.sleep()'. Поскольку мы хотим получить детерминированное поведение, а не случайное, мы сами указываем время сна задачи.
# 3 tasks, no cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
print(f'{f_main} task{task.id} done')
result = task.result()
print(f'{f_main} task{task.id} result: {result}')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Здесь нет ничего особенного, как и ожидалось, первой завершилась задача2, за ней последовали задача1 и задача0. Результат:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 done
main task2 result: normal return
main wait: 0 done, 2 pending
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 1 pending
main task1 done
main task1 result: normal return
main wait: 0 done, 1 pending
mytask0 after sleep(2), returning ...
main wait: 1 done, 0 pending
main task0 done
main task0 result: normal return
Создание задачи в AsyncIO не означает, что задача запущена.
Напротив, при использовании Multiprocessing или Treads, в AsyncIO нет метода 'start()'.
Multiprocessing:
p = multiprocessing.Process(target=task, args ...)
p.start()
Threading:
t = threading.Thread(target=task, args ...)
t.start()
AsyncIO:
t = asyncio.create_task(task, args ...)
В документации по AsyncIO не упоминается, когда задача запускается или выполняется в первый раз при использовании 'create_task()', если я ничего не упустил. Она запланирована на выполнение, что означает, что она может начать выполняться немедленно, но также и некоторое время спустя, когда цикл событий "ожидает" некоторую со-программу(ы).
Это важно, потому что когда вы отменяете задачу, она может быть еще не запущена, то есть со-программа задачи еще не была вызвана.
Различные состояния задачи при отмене задачи
С помощью метода 'cancel()' задачи мы можем отметить, что задача должна быть отменена. Из вышесказанного следует, что при применении метода 'cancel()' состояние задачи может быть следующим:
- Задача еще не начала выполняться, или,
- Задача начала выполняться и завершилась, или,
- Задача запущена и находится в ожидании.
Отмена задачи, которая еще не начала выполняться, означает: задача НЕ будет выполнена. Отмена задачи, которая начала выполняться и находится в ожидании, означает: прервать ожидающую задачу. Если задача уже завершена, то она не будет помечена как отмененная.
Эти три условия показаны ниже, я работаю с AsyncIO на Python 3.10. Мы должны "ожидать" задачу, иначе результаты будут некорректными! Когда задача отменяется, возникает ошибка asyncio.CancelledError. Параметр 'msg'-метода 'cancel()' передается в главный цикл только начиная с Python 3.11.
Отмена незапущенной задачи
# cancel a not started task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# cancel task before awaitable
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Возникла ошибка CancelledError. Важным в результате является то, что задача не запущена:
main create task
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Отмена завершенной задачи
Здесь мы добавляем 'awaitable', 'asyncio.sleep()' после создания задачи:
# cancel a completed task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 3 sec, allow task to complete
print(f'{f_main} before await sleep')
await asyncio.sleep(3)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Задача завершилась до того, как была отменена. Результат:
main create task
main before await sleep
mytask before await sleep
mytask after await sleep
main after await sleep
main task.done() = True
main task.cancelled() = False
main task.result(): normal return
Отмена незавершенной (ожидающей) задачи
Здесь мы изменяем время сна в главном цикле с 3 секунд на 1 секунду перед отменой задачи.
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Результат:
main create task
main before await sleep
mytask before await sleep
main after await sleep
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Перехват ошибки CancelledError в задаче
Также можно перехватить ошибку asyncio.CancelledError в задаче. Обратите внимание, что нельзя полагаться на возвращаемые значения из задачи, поскольку задача может вообще не запуститься, см. выше. Вот код. Используя последний пример выше, мы добавляем обработку исключений также в задачу.
# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Результат. Обратите внимание, что параметр 'msg'-метода 'cancel()' передается в задачу.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main task.done() = True
main task.cancelled() = False
main task.result(): CancelledError return
Если вы также хотите, чтобы ошибка CancelledError передавалась в главный цикл, то вы можете снова поднять исключение в задаче:
# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
# raise the exception again
raise
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
В результате мы видим CancelledError сначала в задаче, а затем в главном цикле. Опять же, поскольку мы находимся на Python 3.10, 'msg'-параметр метода 'cancel()' передается только в задачу, а не в главный цикл.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Пример: 3 задачи с отменой
Ниже приведен код из начала этого поста, "Пример: 3 задачи, без отмены", измененный для отмены задач. В моем случае я не заинтересован в перехвате CancelledError в задаче, я просто хочу, чтобы задача(и) была отменена. Возвращаясь к исходному вопросу, отменяем задачу, когда определенная задача возвращает определенное значение (статус HTTP-429), мы отменяем задачу0 при получении результата задачи2.
Если мы хотим получить причину отмены, указанную в методе 'cancel()', мы можем поднять исключение в цикле, вызвав метод 'exception()'.
# 3 tasks, with cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
if task.cancelled():
print(f'{f_main} task{task.id} was cancelled')
# get cancel msg, python >= 3.11 only
cancel_msg = None
try:
task.exception()
except asyncio.CancelledError as e:
cancel_msg = e
print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
else:
print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')
# cancel task0 if task2 completed
if task.id == 2:
print(f'{f_main} task{task.id} cancel task0')
tasks[0].cancel(msg='my cancel reason')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Результат:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 was not cancelled, result: normal return
main task2 cancel task0
main wait: 1 done, 1 pending
main task0 was cancelled
main task0 cancel_msg:
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 0 pending
main task1 was not cancelled, result: normal return
Резюме
Отмена задачи с помощью AsyncIO сначала немного запутана. Метод AsyncIO 'create_task()' не запускает задачу, а только планирует ее выполнение. Главное помнить, что всегда нужно "ожидать" задачу перед проверкой ее методов 'done()' и 'cancelled()'. Где перехватывать ошибку CancelledError, в главном цикле или в задаче, или в обоих, зависит от вашего приложения. А чтобы получить причину отмены в главном цикле, вам нужно Python 3.11 вверх.
Ссылки / кредиты
asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390
Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
Подробнее
Async
Недавний
- Скрытие первичных ключей базы данных UUID вашего веб-приложения
- Don't Repeat Yourself (DRY) с Jinja2
- SQLAlchemy, PostgreSQL, максимальное количество строк для user
- Показать значения в динамических фильтрах SQLAlchemy
- Безопасная передача данных с помощью шифрования Public Key и pyNaCl
- rqlite: альтернатива dist с высокой степенью готовности и SQLite
Большинство просмотренных
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb
- Использование PyInstaller и Cython для создания исполняемого файла Python
- Подключение к службе на хосте Docker из контейнера Docker
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов
- Flask Удовлетворительный запрос API проверка параметров запроса с помощью схем Маршмэллоу