angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Abbrechen von Aufgaben mit Python Asynchronous IO (AsyncIO)

AsyncIO 'create_task()' startet keine Aufgabe, sondern plant sie für die Ausführung in der Ereignisschleife ein.

2 Mai 2023
In Async
post main image
https://pixabay.com/users/stocksnap-894430

Für ein Projekt habe ich AIOHTTP verwendet, um die Antworten vieler entfernter Websites, URLs, zu überprüfen. Die URLs stammten aus einer Liste. Diese Liste kann Duplikate enthalten.

Alles war in Ordnung, bis ich bemerkte, dass einige Antworten auch einen Statuscode hatten: HTTP-429 'Zu viele Anfragen'. Was auch immer der Grund sein mag, Überlastung, Sicherheit, wir wollen uns freundlich verhalten und keine identischen URLs erneut aufrufen, zumindest für eine Mindestzeit. Da wir AIOHTTP verwenden, werden viele Anfragen auf die Internetverbindung warten, wir verwenden kein Flag, um zu prüfen, ob wir fortfahren können.

In diesem Beitrag geht es darum, diese wartenden Anfragen mit der Methode 'cancel()' abzubrechen (zu canceln). Es geht nicht um AIOHTTP oder Ratenbegrenzung, das ist ein anderes Thema.

Beispiel: 3 Aufgaben, kein Abbruch

Unten sehen Sie den Code ohne Abbruch wartender Aufgaben. Ich verwende hier nicht AIOHTTP , sondern 'asyncio.sleep()'. Da wir ein deterministisches Verhalten wollen, im Gegensatz zu einem zufälligen, legen wir die Schlafzeit der Aufgaben selbst fest.

# 3 tasks, no cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            print(f'{f_main} task{task.id} done')
            result = task.result()
            print(f'{f_main} task{task.id} result: {result}')    
            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

Hier gibt es nichts Besonderes, wie erwartet wurde Task2 zuerst beendet, gefolgt von Task1 und Task0. Das Ergebnis:

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 done
main       task2 result: normal return
main       wait: 0 done, 2 pending
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 1 pending
main       task1 done
main       task1 result: normal return
main       wait: 0 done, 1 pending
mytask0    after sleep(2), returning ...
main       wait: 1 done, 0 pending
main       task0 done
main       task0 result: normal return

Das Erstellen einer Aufgabe in AsyncIO bedeutet nicht, dass die Aufgabe gestartet wird

Im Gegensatz zur Verwendung von Multiprocessing oder Treads gibt es in AsyncIO keine 'start()'-Methode.

Multiprocessing:

p = multiprocessing.Process(target=task, args ...)
p.start()

Threading:

t = threading.Thread(target=task, args ...)
t.start()

AsyncIO:

t = asyncio.create_task(task, args ...)

In der AsyncIO-Dokumentation wird nicht erwähnt, wann eine Aufgabe gestartet oder zum ersten Mal ausgeführt wird, wenn 'create_task()' verwendet wird, es sei denn, ich habe etwas übersehen. Sie wird für die Ausführung geplant, d.h. sie kann sofort starten, aber auch einige Zeit später, wenn die Ereignisschleife auf eine oder mehrere Co-Routinen "wartet".

Dies ist wichtig, denn wenn Sie eine Aufgabe abbrechen, ist sie möglicherweise noch nicht ausgeführt worden, was bedeutet, dass die Co-Routine der Aufgabe noch nicht aufgerufen wurde.

Verschiedene Zustände einer Aufgabe beim Abbrechen einer Aufgabe

Mit der Methode 'cancel()' einer Aufgabe können wir kennzeichnen, dass eine Aufgabe abgebrochen werden muss. Wenn die Methode 'cancel()' angewendet wird, kann der Zustand der Aufgabe wie folgt aussehen:

  • Die Aufgabe hat noch nicht begonnen zu laufen, oder,
  • Die Aufgabe hat begonnen zu laufen und ist abgeschlossen, oder,
  • Die Aufgabe hat begonnen zu laufen und wartet irgendwo

Das Abbrechen einer Aufgabe, deren Ausführung noch nicht begonnen hat, bedeutet: Die Aufgabe wird NICHT ausgeführt. Das Abbrechen einer Aufgabe, deren Ausführung begonnen hat und die noch wartet, bedeutet: Abbruch der wartenden Aufgabe. Wenn die Aufgabe bereits abgeschlossen ist, wird die Aufgabe nicht als abgebrochen gekennzeichnet.

Diese drei Bedingungen sind unten dargestellt, ich arbeite mit AsyncIO auf Python 3.10. Wir müssen die Aufgabe "abwarten", sonst sind die Ergebnisse nicht korrekt! Wenn eine Aufgabe abgebrochen wird, wird ein asyncio.CancelledError ausgelöst. Der 'msg'-Parameter der 'cancel()'-Methode wird erst ab Python 3.11 an die Hauptschleife weitergegeben.

Abbrechen einer nicht gestarteten Aufgabe

# cancel a not started task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # cancel task before awaitable
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Es wurde ein CancelledError ausgelöst. Wichtig bei dem Ergebnis ist, dass die Aufgabe nicht gestartet ist:

main       create task
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Abbrechen einer abgeschlossenen Aufgabe

Hier fügen wir ein 'awaitable', 'asyncio.sleep()' nach dem Erstellen der Aufgabe hinzu:

# cancel a completed task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 3 sec, allow task to complete
    print(f'{f_main} before await sleep')
    await asyncio.sleep(3)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Die Aufgabe wurde beendet, bevor sie abgebrochen wurde. Das Ergebnis:

main       create task
main       before await sleep
mytask     before await sleep
mytask     after await sleep
main       after await sleep
main       task.done() = True
main       task.cancelled() = False
main       task.result(): normal return

Abbrechen einer nicht abgeschlossenen (wartenden) Aufgabe

Hier ändern wir die Sleep-Zeit in der Hauptschleife von 3 Sekunden auf 1 Sekunde, bevor die Aufgabe abgebrochen wird.

# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Das Ergebnis:

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Erfassen des CancelledError in der Aufgabe

Es ist auch möglich, den asyncio.CancelledError in der Aufgabe zu erfassen. Beachten Sie, dass Sie sich nicht auf die Rückgabewerte einer Aufgabe verlassen können, da die Aufgabe möglicherweise gar nicht startet (siehe oben). Hier ist der Code. In Anlehnung an das letzte Beispiel oben fügen wir die Ausnahmebehandlung auch in die Aufgabe ein.

# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Das Ergebnis. Beachten Sie, dass der 'msg'-Parameter der 'cancel()'-Methode an die Aufgabe weitergegeben wird.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       task.done() = True
main       task.cancelled() = False
main       task.result(): CancelledError return

Wenn Sie möchten, dass der CancelledError auch an die Hauptschleife weitergegeben wird, dann können Sie die Ausnahme in der Aufgabe erneut auslösen:

# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        # raise the exception again
        raise
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Im Ergebnis sehen wir den CancelledError zuerst in der Aufgabe und dann in der Hauptschleife. Da wir uns auf Python 3.10 befinden, wird der 'msg'-Parameter der 'cancel()'-Methode nur an die Task weitergegeben, nicht an die Hauptschleife.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Beispiel: 3 Aufgaben, mit Abbruch

Nachfolgend ist der Code vom Anfang dieses Beitrags, 'Beispiel: 3 Aufgaben, ohne Abbruch", geändert für den Abbruch von Aufgaben. In meinem Fall bin ich nicht daran interessiert, den CancelledError in einer Aufgabe zu erfassen, ich möchte nur die Aufgabe(n) abbrechen. Um auf die ursprüngliche Frage zurückzukommen, brechen Sie eine Aufgabe ab, wenn eine bestimmte Aufgabe einen bestimmten Wert zurückgibt (HTTP-429-Status), wir brechen Aufgabe0 ab, wenn wir das Ergebnis von Aufgabe2 erhalten.

Wenn wir den Grund für den Abbruch erhalten möchten, wie in der Methode 'cancel()' angegeben, können wir die Ausnahme in der Schleife durch Aufruf der Methode 'exception()' auslösen.

# 3 tasks, with cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            if task.cancelled():
                print(f'{f_main} task{task.id} was cancelled')
                # get cancel msg, python >= 3.11 only
                cancel_msg = None
                try:
                    task.exception()
                except asyncio.CancelledError as e:
                    cancel_msg = e
                print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
            else:
                print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')    

                # cancel task0 if task2 completed
                if task.id == 2:
                    print(f'{f_main} task{task.id} cancel task0')    
                    tasks[0].cancel(msg='my cancel reason')

            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

Das Ergebnis:

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 was not cancelled, result: normal return
main       task2 cancel task0
main       wait: 1 done, 1 pending
main       task0 was cancelled
main       task0 cancel_msg: 
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 0 pending
main       task1 was not cancelled, result: normal return

Zusammenfassung

Das Abbrechen einer Aufgabe mit AsyncIO ist anfangs etwas verwirrend. Die AsyncIO-Methode 'create_task()' startet keine Aufgabe, sondern plant sie nur für die Ausführung ein. Das Wichtigste ist, dass man eine Aufgabe immer 'await', bevor man ihre Methoden 'done()' und 'cancelled()' überprüft. Wo der CancelledError zu erfassen ist, in der Hauptschleife oder in der Aufgabe, oder beides, hängt von Ihrer Anwendung ab. Und um den Grund des Abbruchs in der Hauptschleife zu erhalten, benötigen Sie Python 3.11 up.

Links / Impressum

asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390

Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel

Mehr erfahren

Async

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.