Taken annuleren met Python Asynchrone IO (AsyncIO)
AsyncIO 'create_task()' start geen taak, maar plant deze voor uitvoering op de gebeurtenislus.
Voor een project gebruikte ik AIOHTTP om de antwoorden van vele websites op afstand, URL's, te controleren. De URL's kwamen uit een lijst. Deze lijst kan duplicaten bevatten.
Alles in orde totdat ik merkte dat sommige antwoorden ook status code hadden: HTTP-429 'Too Many Requests'. Wat de reden ook is, overbelasting, veiligheid, we willen ons vriendelijk gedragen en willen identieke URL's niet opnieuw aanroepen, althans voor een minimale tijd. Omdat we AIOHTTP gebruiken, zullen veel verzoeken wachten op de internetverbinding, we gebruiken geen vlag om te controleren of we door kunnen gaan.
Dit bericht gaat over het afbreken (annuleren) van deze wachtende verzoeken met behulp van de methode 'cancel()'. Het gaat niet over AIOHTTP of rate limiting, dat is een ander onderwerp.
Voorbeeld: 3 taken, geen annulering
Hieronder staat de code zonder annulering van wachtende taken. Ik gebruik hier niet AIOHTTP maar 'asyncio.sleep()'. Omdat we deterministisch gedrag willen, in tegenstelling tot willekeurig, geven we de slaaptijd van de taken zelf aan.
# 3 tasks, no cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
print(f'{f_main} task{task.id} done')
result = task.result()
print(f'{f_main} task{task.id} result: {result}')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Niets bijzonders hier, zoals verwacht voltooide taak2 als eerste, gevolgd door taak1 en taak0. Het resultaat:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 done
main task2 result: normal return
main wait: 0 done, 2 pending
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 1 pending
main task1 done
main task1 result: normal return
main wait: 0 done, 1 pending
mytask0 after sleep(2), returning ...
main wait: 1 done, 0 pending
main task0 done
main task0 result: normal return
Een taak aanmaken in AsyncIO betekent niet dat de taak wordt gestart
In tegenstelling tot Multiprocessing of Treads, is er in AsyncIO geen 'start()' methode.
Multiprocessing:
p = multiprocessing.Process(target=task, args ...)
p.start()
Threading:
t = threading.Thread(target=task, args ...)
t.start()
AsyncIO:
t = asyncio.create_task(task, args ...)
De AsyncIO documentatie vermeldt niet wanneer een taak wordt gestart, of voor het eerst wordt uitgevoerd, bij gebruik van 'create_task()', tenzij ik iets gemist heb. Hij wordt gepland voor uitvoering, wat betekent dat hij onmiddellijk kan beginnen te lopen, maar ook enige tijd later, wanneer de gebeurtenislus 'wacht' op een of andere co-routine(s).
Dit is belangrijk omdat wanneer u een taak annuleert, deze misschien nog niet is uitgevoerd, wat betekent dat de taak-co-routine nog niet is aangeroepen.
Verschillende toestanden van een taak bij het annuleren van een taak
Met de 'cancel()' methode van een taak kunnen we markeren dat een taak geannuleerd moet worden. Uit het bovenstaande volgt dat wanneer de methode 'cancel()' wordt toegepast, de toestand van de taak kan zijn:
- De taak is nog niet begonnen met lopen, of,
- De taak is begonnen met lopen en voltooid, of,
- De taak begon te lopen en wacht ergens op
Een taak annuleren die nog niet is begonnen met lopen betekent: de taak zal NIET lopen. Een taak annuleren die begonnen is met lopen en wacht betekent: de wachtende taak afbreken. Als de taak al is voltooid, dan wordt de taak niet gemarkeerd als geannuleerd.
Deze drie voorwaarden staan hieronder, ik draai met AsyncIO op Python 3.10. We moeten de taak 'afwachten', anders kloppen de resultaten niet! Wanneer een taak wordt geannuleerd, wordt een asyncio.CancelledError opgeworpen. De 'msg'-parameter van de methode 'cancel()' wordt pas doorgegeven aan de hoofdlus vanaf Python 3.11.
Een niet gestarte taak annuleren
# cancel a not started task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# cancel task before awaitable
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Er is een CancelledError opgetreden. Belangrijk in het resultaat is dat de taak niet is gestart:
main create task
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Een voltooide taak annuleren
Hier voegen we een 'awaitable' toe, 'asyncio.sleep()' na het aanmaken van de taak:
# cancel a completed task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 3 sec, allow task to complete
print(f'{f_main} before await sleep')
await asyncio.sleep(3)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
De taak eindigde voordat hij werd geannuleerd. Het resultaat:
main create task
main before await sleep
mytask before await sleep
mytask after await sleep
main after await sleep
main task.done() = True
main task.cancelled() = False
main task.result(): normal return
Een niet voltooide (wachtende) taak annuleren
Hier wijzigen we de slaaptijd in de hoofdlus van 3 seconden naar 1 seconde voor het annuleren van de taak.
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Het resultaat:
main create task
main before await sleep
mytask before await sleep
main after await sleep
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Vastleggen van de CancelledError in de taak
Het is ook mogelijk om de asyncio.CancelledError vast te leggen in de taak. Merk op dat u niet kunt vertrouwen op retourwaarden van een taak, omdat de taak mogelijk helemaal niet start, zie hierboven. Hier is de code. Met het laatste voorbeeld hierboven voegen we de exception handling ook toe aan de taak.
# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Het resultaat. Merk op dat de 'msg'-parameter van de 'cancel()' methode wordt doorgegeven aan de taak.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main task.done() = True
main task.cancelled() = False
main task.result(): CancelledError return
Als je wilt dat de CancelledError ook wordt doorgegeven aan de hoofdlus, dan kun je de exception opnieuw oproepen in de taak:
# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
# raise the exception again
raise
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
In het resultaat zien we de CancelledError eerst in de taak en daarna in de hoofdlus. Nogmaals, omdat we op Python 3.10 zitten, wordt de 'msg'-parameter van de 'cancel()' methode alleen doorgegeven aan de taak, niet aan de hoofdlus.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Voorbeeld: 3 taken, met annulering
Hieronder staat de code uit het begin van deze post, 'Voorbeeld: 3 taken, geen annulering', aangepast voor het annuleren van taken. In mijn geval ben ik niet geïnteresseerd in het afvangen van de CancelledError in een taak, ik wil gewoon de taak of taken geannuleerd hebben. Terug naar de oorspronkelijke vraag, een taak annuleren wanneer een bepaalde taak een bepaalde waarde retourneert (HTTP-429 status), we annuleren taak0 wanneer we het resultaat van taak2 ontvangen.
Als we de reden van annulering willen krijgen, zoals opgegeven in de 'cancel()' methode, kunnen we de uitzondering in de lus oproepen door de 'exception()' methode aan te roepen.
# 3 tasks, with cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
if task.cancelled():
print(f'{f_main} task{task.id} was cancelled')
# get cancel msg, python >= 3.11 only
cancel_msg = None
try:
task.exception()
except asyncio.CancelledError as e:
cancel_msg = e
print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
else:
print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')
# cancel task0 if task2 completed
if task.id == 2:
print(f'{f_main} task{task.id} cancel task0')
tasks[0].cancel(msg='my cancel reason')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Het resultaat:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 was not cancelled, result: normal return
main task2 cancel task0
main wait: 1 done, 1 pending
main task0 was cancelled
main task0 cancel_msg:
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 0 pending
main task1 was not cancelled, result: normal return
Samenvatting
Het annuleren van een taak met AsyncIO is in het begin een beetje verwarrend. De AsyncIO methode 'create_task()' start geen taak, maar plant hem alleen voor uitvoering. Het belangrijkste om te onthouden is om een taak altijd te 'awaiten' voordat je de methoden 'done()' en 'cancelled()' controleert. Waar je de CancelledError opvangt, in de hoofdlus of in de taak, of beide, hangt af van je toepassing. En om de reden van annulering in de hoofdlus te krijgen moet je Python 3.11 omhoog.
Links / credits
asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390
Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
Lees meer
Async
Recent
- Database UUID primaire sleutels van je webapplicatie verbergen
- Don't Repeat Yourself (DRY) met Jinja2
- SQLAlchemy, PostgreSQL, maximum aantal rijen per user
- Toon de waarden in SQLAlchemy dynamische filters
- Veilige gegevensoverdracht met Public Key versleuteling en pyNaCl
- rqlite: een alternatief voor SQLite met hoge beschikbaarheid en distributed
Meest bekeken
- Met behulp van Python's pyOpenSSL om SSL-certificaten die van een host zijn gedownload te controleren
- Gebruik van UUIDs in plaats van Integer Autoincrement Primary Keys met SQLAlchemy en MariaDb
- PyInstaller en Cython gebruiken om een Python executable te maken
- Maak verbinding met een dienst op een Docker host vanaf een Docker container
- SQLAlchemy: Gebruik van Cascade Deletes om verwante objecten te verwijderen
- Flask RESTful API verzoekparametervalidatie met Marshmallow-schema's