Abbrechen von Aufgaben mit Python Asynchronous IO (AsyncIO)
AsyncIO 'create_task()' startet keine Aufgabe, sondern plant sie für die Ausführung in der Ereignisschleife ein.
Für ein Projekt habe ich AIOHTTP verwendet, um die Antworten vieler entfernter Websites, URLs, zu überprüfen. Die URLs stammten aus einer Liste. Diese Liste kann Duplikate enthalten.
Alles war in Ordnung, bis ich bemerkte, dass einige Antworten auch einen Statuscode hatten: HTTP-429 'Zu viele Anfragen'. Was auch immer der Grund sein mag, Überlastung, Sicherheit, wir wollen uns freundlich verhalten und keine identischen URLs erneut aufrufen, zumindest für eine Mindestzeit. Da wir AIOHTTP verwenden, werden viele Anfragen auf die Internetverbindung warten, wir verwenden kein Flag, um zu prüfen, ob wir fortfahren können.
In diesem Beitrag geht es darum, diese wartenden Anfragen mit der Methode 'cancel()' abzubrechen (zu canceln). Es geht nicht um AIOHTTP oder Ratenbegrenzung, das ist ein anderes Thema.
Beispiel: 3 Aufgaben, kein Abbruch
Unten sehen Sie den Code ohne Abbruch wartender Aufgaben. Ich verwende hier nicht AIOHTTP , sondern 'asyncio.sleep()'. Da wir ein deterministisches Verhalten wollen, im Gegensatz zu einem zufälligen, legen wir die Schlafzeit der Aufgaben selbst fest.
# 3 tasks, no cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
print(f'{f_main} task{task.id} done')
result = task.result()
print(f'{f_main} task{task.id} result: {result}')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Hier gibt es nichts Besonderes, wie erwartet wurde Task2 zuerst beendet, gefolgt von Task1 und Task0. Das Ergebnis:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 done
main task2 result: normal return
main wait: 0 done, 2 pending
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 1 pending
main task1 done
main task1 result: normal return
main wait: 0 done, 1 pending
mytask0 after sleep(2), returning ...
main wait: 1 done, 0 pending
main task0 done
main task0 result: normal return
Das Erstellen einer Aufgabe in AsyncIO bedeutet nicht, dass die Aufgabe gestartet wird
Im Gegensatz zur Verwendung von Multiprocessing oder Treads gibt es in AsyncIO keine 'start()'-Methode.
Multiprocessing:
p = multiprocessing.Process(target=task, args ...)
p.start()
Threading:
t = threading.Thread(target=task, args ...)
t.start()
AsyncIO:
t = asyncio.create_task(task, args ...)
In der AsyncIO-Dokumentation wird nicht erwähnt, wann eine Aufgabe gestartet oder zum ersten Mal ausgeführt wird, wenn 'create_task()' verwendet wird, es sei denn, ich habe etwas übersehen. Sie wird für die Ausführung geplant, d.h. sie kann sofort starten, aber auch einige Zeit später, wenn die Ereignisschleife auf eine oder mehrere Co-Routinen "wartet".
Dies ist wichtig, denn wenn Sie eine Aufgabe abbrechen, ist sie möglicherweise noch nicht ausgeführt worden, was bedeutet, dass die Co-Routine der Aufgabe noch nicht aufgerufen wurde.
Verschiedene Zustände einer Aufgabe beim Abbrechen einer Aufgabe
Mit der Methode 'cancel()' einer Aufgabe können wir kennzeichnen, dass eine Aufgabe abgebrochen werden muss. Wenn die Methode 'cancel()' angewendet wird, kann der Zustand der Aufgabe wie folgt aussehen:
- Die Aufgabe hat noch nicht begonnen zu laufen, oder,
- Die Aufgabe hat begonnen zu laufen und ist abgeschlossen, oder,
- Die Aufgabe hat begonnen zu laufen und wartet irgendwo
Das Abbrechen einer Aufgabe, deren Ausführung noch nicht begonnen hat, bedeutet: Die Aufgabe wird NICHT ausgeführt. Das Abbrechen einer Aufgabe, deren Ausführung begonnen hat und die noch wartet, bedeutet: Abbruch der wartenden Aufgabe. Wenn die Aufgabe bereits abgeschlossen ist, wird die Aufgabe nicht als abgebrochen gekennzeichnet.
Diese drei Bedingungen sind unten dargestellt, ich arbeite mit AsyncIO auf Python 3.10. Wir müssen die Aufgabe "abwarten", sonst sind die Ergebnisse nicht korrekt! Wenn eine Aufgabe abgebrochen wird, wird ein asyncio.CancelledError ausgelöst. Der 'msg'-Parameter der 'cancel()'-Methode wird erst ab Python 3.11 an die Hauptschleife weitergegeben.
Abbrechen einer nicht gestarteten Aufgabe
# cancel a not started task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# cancel task before awaitable
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Es wurde ein CancelledError ausgelöst. Wichtig bei dem Ergebnis ist, dass die Aufgabe nicht gestartet ist:
main create task
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Abbrechen einer abgeschlossenen Aufgabe
Hier fügen wir ein 'awaitable', 'asyncio.sleep()' nach dem Erstellen der Aufgabe hinzu:
# cancel a completed task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 3 sec, allow task to complete
print(f'{f_main} before await sleep')
await asyncio.sleep(3)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Die Aufgabe wurde beendet, bevor sie abgebrochen wurde. Das Ergebnis:
main create task
main before await sleep
mytask before await sleep
mytask after await sleep
main after await sleep
main task.done() = True
main task.cancelled() = False
main task.result(): normal return
Abbrechen einer nicht abgeschlossenen (wartenden) Aufgabe
Hier ändern wir die Sleep-Zeit in der Hauptschleife von 3 Sekunden auf 1 Sekunde, bevor die Aufgabe abgebrochen wird.
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Das Ergebnis:
main create task
main before await sleep
mytask before await sleep
main after await sleep
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Erfassen des CancelledError in der Aufgabe
Es ist auch möglich, den asyncio.CancelledError in der Aufgabe zu erfassen. Beachten Sie, dass Sie sich nicht auf die Rückgabewerte einer Aufgabe verlassen können, da die Aufgabe möglicherweise gar nicht startet (siehe oben). Hier ist der Code. In Anlehnung an das letzte Beispiel oben fügen wir die Ausnahmebehandlung auch in die Aufgabe ein.
# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Das Ergebnis. Beachten Sie, dass der 'msg'-Parameter der 'cancel()'-Methode an die Aufgabe weitergegeben wird.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main task.done() = True
main task.cancelled() = False
main task.result(): CancelledError return
Wenn Sie möchten, dass der CancelledError auch an die Hauptschleife weitergegeben wird, dann können Sie die Ausnahme in der Aufgabe erneut auslösen:
# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio
async def mytask():
f_mytask = f'{"mytask":10}'
try:
print(f'{f_mytask} before await sleep')
await asyncio.sleep(2)
print(f'{f_mytask} after await sleep')
return f'normal return'
except asyncio.CancelledError as e:
print(f'{f_mytask} CancelledError raised, {e}')
# raise the exception again
raise
return f'CancelledError return'
async def main():
f_main = f'{"main":10}'
# create task
print(f'{f_main} create task')
task = asyncio.create_task(mytask())
# wait 1 sec, cancel task before completion
print(f'{f_main} before await sleep')
await asyncio.sleep(1)
print(f'{f_main} after await sleep')
# cancel task
task.cancel(msg='my cancel reason')
try:
await task
except asyncio.CancelledError as e:
print(f'{f_main} CancelledError raised, {e}')
print(f'{f_main} task.done() = {task.done()}')
print(f'{f_main} task.cancelled() = {task.cancelled()}')
if task.done() and not task.cancelled():
print(f'{f_main} task.result(): {task.result()}')
asyncio.run(main())
Im Ergebnis sehen wir den CancelledError zuerst in der Aufgabe und dann in der Hauptschleife. Da wir uns auf Python 3.10 befinden, wird der 'msg'-Parameter der 'cancel()'-Methode nur an die Task weitergegeben, nicht an die Hauptschleife.
main create task
main before await sleep
mytask before await sleep
main after await sleep
mytask CancelledError raised, my cancel reason
main CancelledError raised,
main task.done() = True
main task.cancelled() = True
Beispiel: 3 Aufgaben, mit Abbruch
Nachfolgend ist der Code vom Anfang dieses Beitrags, 'Beispiel: 3 Aufgaben, ohne Abbruch", geändert für den Abbruch von Aufgaben. In meinem Fall bin ich nicht daran interessiert, den CancelledError in einer Aufgabe zu erfassen, ich möchte nur die Aufgabe(n) abbrechen. Um auf die ursprüngliche Frage zurückzukommen, brechen Sie eine Aufgabe ab, wenn eine bestimmte Aufgabe einen bestimmten Wert zurückgibt (HTTP-429-Status), wir brechen Aufgabe0 ab, wenn wir das Ergebnis von Aufgabe2 erhalten.
Wenn wir den Grund für den Abbruch erhalten möchten, wie in der Methode 'cancel()' angegeben, können wir die Ausnahme in der Schleife durch Aufruf der Methode 'exception()' auslösen.
# 3 tasks, with cancellation
import asyncio
async def mytask(task_num, sleep_num):
f_mytask = f'{"mytask" + str(task_num):10}'
print(f'{f_mytask} before sleep({sleep_num})')
await asyncio.sleep(sleep_num)
print(f'{f_mytask} after sleep({sleep_num}), returning ...')
return f'normal return'
async def main():
f_main = f'{"main":10}'
# create 3 tasks
tasks = []
sleep_secs = [2, 1.5, 0.8]
for task_id in range(3):
print(f'{f_main} create task{task_id}')
task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
task.id = task_id
tasks.append(task)
while True:
tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
for task in list(tasks_done):
if task.cancelled():
print(f'{f_main} task{task.id} was cancelled')
# get cancel msg, python >= 3.11 only
cancel_msg = None
try:
task.exception()
except asyncio.CancelledError as e:
cancel_msg = e
print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
else:
print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')
# cancel task0 if task2 completed
if task.id == 2:
print(f'{f_main} task{task.id} cancel task0')
tasks[0].cancel(msg='my cancel reason')
tasks.remove(task)
tasks_len = len(tasks)
if tasks_len == 0:
break
asyncio.run(main())
Das Ergebnis:
main create task0
main create task1
main create task2
mytask0 before sleep(2)
mytask1 before sleep(1.5)
mytask2 before sleep(0.8)
main wait: 0 done, 3 pending
main wait: 0 done, 3 pending
mytask2 after sleep(0.8), returning ...
main wait: 1 done, 2 pending
main task2 was not cancelled, result: normal return
main task2 cancel task0
main wait: 1 done, 1 pending
main task0 was cancelled
main task0 cancel_msg:
mytask1 after sleep(1.5), returning ...
main wait: 1 done, 0 pending
main task1 was not cancelled, result: normal return
Zusammenfassung
Das Abbrechen einer Aufgabe mit AsyncIO ist anfangs etwas verwirrend. Die AsyncIO-Methode 'create_task()' startet keine Aufgabe, sondern plant sie nur für die Ausführung ein. Das Wichtigste ist, dass man eine Aufgabe immer 'await', bevor man ihre Methoden 'done()' und 'cancelled()' überprüft. Wo der CancelledError zu erfassen ist, in der Hauptschleife oder in der Aufgabe, oder beides, hängt von Ihrer Anwendung ab. Und um den Grund des Abbruchs in der Hauptschleife zu erhalten, benötigen Sie Python 3.11 up.
Links / Impressum
asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390
Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
Mehr erfahren
Async
Neueste
- Ausblenden der Primärschlüssel der Datenbank UUID Ihrer Webanwendung
- Don't Repeat Yourself (DRY) mit Jinja2
- SQLAlchemy, PostgreSQL, maximale Anzahl von Zeilen pro user
- Anzeige der Werte in den dynamischen Filtern SQLAlchemy
- Sichere Datenübertragung mit Public Key Verschlüsselung und pyNaCl
- rqlite: eine hochverfügbare und distverteilte SQLite -Alternative
Meistgesehen
- Verwendung von Pythons pyOpenSSL zur Überprüfung von SSL-Zertifikaten, die von einem Host heruntergeladen wurden
- Verwendung von UUIDs anstelle von Integer Autoincrement Primary Keys mit SQLAlchemy und MariaDb
- PyInstaller und Cython verwenden, um eine ausführbare Python-Datei zu erstellen
- Verbindung zu einem Dienst auf einem Docker -Host von einem Docker -Container aus
- SQLAlchemy: Verwendung von Cascade Deletes zum Löschen verwandter Objekte
- Flask RESTful API Validierung von Anfrageparametern mit Marshmallow-Schemas