How to cancel tasks with Python Asynchronous IO (AsyncIO)
AsyncIO 'create_task()' does not start a task but schedules it for execution on the event loop.

For a project I was using AIOHTTP to check the responses of many remote websites, URLs. The URLs were coming from a list. This list can contain duplicates.
Everything fine until I noticed that some responses also had status code: HTTP-429 'Too Many Requests'. Whatever the reason, overload, security, we want to behave friendly and do not want to call identical URLs again, at least for a minimum time. Because we are using AIOHTTP, many requests will be waiting for the internet connection, we are not using a flag to check if can continue.
This post is about aborting (cancelling) these waiting requests using the 'cancel()' method. It is not about AIOHTTP or rate limiting, that's another subject.
Example: 3 tasks, no cancellation
Below is the code without cancelling waiting tasks. I do not use AIOHTTP here but 'asyncio.sleep()'. Because we want deterministic behavior, opposed to random, we specify the tasks sleeping time ourselves.
# 3 tasks, no cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
print(f'{f_main} task{task.id} done')
result = task.result()
print(f'{f_main} task{task.id} result: {result}')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Nothing special here, as expected task2 completed first, followed by task1 and task0. The result:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 done
main task2 result: normal return
main wait: 0 done, 2 pending
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 1 pending
main task1 done
main task1 result: normal return
main wait: 0 done, 1 pending
mytask0 after sleep(2), returning ...
main wait: 1 done, 0 pending
main task0 done
main task0 result: normal return
Creating a task in AsyncIO does not mean the task is started
In contrast when using Multiprocessing or Treads, in AsyncIO there is no 'start()' method.
Multiprocessing:
p = multiprocessing.Process(target=task, args ...)
p.start()
Threading:
t = threading.Thread(target=task, args ...)
t.start()
AsyncIO:
t = asyncio.create_task(task, args ...)
The AsyncIO documentation does not mention when a task is started, or run for the first time, when using 'create_task()', unless I missed something. It is scheduled for execution, meaning that it can start running immediately but also some time later, when the event loop is 'awaiting' some co-routine(s).
This is important because when you cancel a task, it may not have run yet, meaning that the task co-routine was not yet called.
Different states of a task when cancelling a task
With the 'cancel()' method of a task we can flag that a task must be cancelled. From the above this means that when the 'cancel()' method is applied, the state of the task can be:
- The task has not started running yet, or,
- The task started running and completed, or,
- The task started running and is waiting somewhere
Cancelling a task that has not started running yet means: the task will NOT run. Cancelling a task that started running and is waiting means: abort the waiting task. If the task already completed, then the task will not be flagged cancelled.
These three conditions are shown below, I am running with AsyncIO on Python 3.10. We must 'await' the task, otherwise the results are not correct! When a task is cancelled, a asyncio.CancelledError will be raised. The 'msg'-parameter of the 'cancel()' method is only propagated to the main loop starting from Python 3.11.
Cancel a not started task
# cancel a not started task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# cancel task before awaitable
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
A CancelledError was raised. Important in the result is that the task is not started:
main create task
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Cancel a completed task
Here we add an 'awaitable', 'asyncio.sleep()' after creating the task:
# cancel a completed task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 3 sec, allow task to complete
print(f'{f_main} before await sleep')
await asyncio.sleep(3)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
The task finished before it was cancelled. The result:
main create task
main before await sleep
mytask before await sleep
mytask after await sleep
main after await sleep
main task.done() = True
main task.cancelled() = False
main task.result(): normal return
Cancel a not completed (waiting) task
Here we change the sleep time in the main loop from 3 seconds to 1 second before cancelling the task.
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
The result:
main create task
main before await sleep
mytask before await sleep
main after await sleep
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Capture the CancelledError in the task
It is also possible to capture the asyncio.CancelledError in the task. Note that you cannot rely on return values from a task, because the task may not start at all, see above. Here is the code. Using the last example above, we add the exception handling also to the task.
# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
The result. Note that the 'msg'-parameter of the 'cancel()' method is propagated to the task.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main task.done() = True
main task.cancelled() = False
main task.result(): CancelledError return
If you also want the CancelledError to propagate to the main loop, then you can raise the exception again in the task:
# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
# raise the exception again
raise
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
In the result we see the CancelledError first in the task and then in the main loop. Again, because we are on Python 3.10, the 'msg'-parameter of the 'cancel()' method is only propagated to task, not to the main loop.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Example: 3 tasks, with cancellation
Below is the code from the start of this post, 'Example: 3 tasks, no cancellation', modified for cancelling tasks. In my case I am not interested in capturing the CancelledError in a task, I just want the task(s) cancelled. Back to the original question, cancel a task when a certain task returns a certain value (HTTP-429 status), we cancel task0 when receiving the result of task2.
If we want to get the reason of cancellation, as specified in the 'cancel()' method, we can raise the exception in the loop by calling the 'exception()' method.
# 3 tasks, with cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
if task.cancelled():
print(f'{f_main} task{task.id} was cancelled')
# get cancel msg, python >= 3.11 only
cancel_msg = None
try:
task.exception()
except asyncio.CancelledError as e:
cancel_msg = e
print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
else:
print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')
# cancel task0 if task2 completed
if task.id == 2:
print(f'{f_main} task{task.id} cancel task0')
tasks[0].cancel(msg='my cancel reason')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
The result:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 was not cancelled, result: normal return
main task2 cancel task0
main wait: 1 done, 1 pending
main task0 was cancelled
main task0 cancel_msg:
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 0 pending
main task1 was not cancelled, result: normal return
Summary
Cancelling a task with AsyncIO is a bit confusing at first. The AsyncIO 'create_task()' method does not start a task, it only schedules it for execution. The main thing to remember is to always 'await' a task before checking its methods 'done()' and 'cancelled()'. Where to capture the CancelledError, in the main loop or in the task, or both, depends on your application. And to get the reason of cancellation in the main loop you need Python 3.11 up.
Links / credits
asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390
Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
Read more
Async
Most viewed
- Using PyInstaller and Cython to create a Python executable
- Reducing page response times of a Flask SQLAlchemy website
- Using Python's pyOpenSSL to verify SSL certificates downloaded from a host
- Connect to a service on a Docker host from a Docker container
- Using UUIDs instead of Integer Autoincrement Primary Keys with SQLAlchemy and MariaDb
- SQLAlchemy: Using Cascade Deletes to delete related objects