angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Taken annuleren met Python Asynchrone IO (AsyncIO)

AsyncIO 'create_task()' start geen taak, maar plant deze voor uitvoering op de gebeurtenislus.

2 mei 2023
In Async
post main image
https://pixabay.com/users/stocksnap-894430

Voor een project gebruikte ik AIOHTTP om de antwoorden van vele websites op afstand, URL's, te controleren. De URL's kwamen uit een lijst. Deze lijst kan duplicaten bevatten.

Alles in orde totdat ik merkte dat sommige antwoorden ook status code hadden: HTTP-429 'Too Many Requests'. Wat de reden ook is, overbelasting, veiligheid, we willen ons vriendelijk gedragen en willen identieke URL's niet opnieuw aanroepen, althans voor een minimale tijd. Omdat we AIOHTTP gebruiken, zullen veel verzoeken wachten op de internetverbinding, we gebruiken geen vlag om te controleren of we door kunnen gaan.

Dit bericht gaat over het afbreken (annuleren) van deze wachtende verzoeken met behulp van de methode 'cancel()'. Het gaat niet over AIOHTTP of rate limiting, dat is een ander onderwerp.

Voorbeeld: 3 taken, geen annulering

Hieronder staat de code zonder annulering van wachtende taken. Ik gebruik hier niet AIOHTTP maar 'asyncio.sleep()'. Omdat we deterministisch gedrag willen, in tegenstelling tot willekeurig, geven we de slaaptijd van de taken zelf aan.

# 3 tasks, no cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            print(f'{f_main} task{task.id} done')
            result = task.result()
            print(f'{f_main} task{task.id} result: {result}')    
            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

Niets bijzonders hier, zoals verwacht voltooide taak2 als eerste, gevolgd door taak1 en taak0. Het resultaat:

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 done
main       task2 result: normal return
main       wait: 0 done, 2 pending
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 1 pending
main       task1 done
main       task1 result: normal return
main       wait: 0 done, 1 pending
mytask0    after sleep(2), returning ...
main       wait: 1 done, 0 pending
main       task0 done
main       task0 result: normal return

Een taak aanmaken in AsyncIO betekent niet dat de taak wordt gestart

In tegenstelling tot Multiprocessing of Treads, is er in AsyncIO geen 'start()' methode.

Multiprocessing:

p = multiprocessing.Process(target=task, args ...)
p.start()

Threading:

t = threading.Thread(target=task, args ...)
t.start()

AsyncIO:

t = asyncio.create_task(task, args ...)

De AsyncIO documentatie vermeldt niet wanneer een taak wordt gestart, of voor het eerst wordt uitgevoerd, bij gebruik van 'create_task()', tenzij ik iets gemist heb. Hij wordt gepland voor uitvoering, wat betekent dat hij onmiddellijk kan beginnen te lopen, maar ook enige tijd later, wanneer de gebeurtenislus 'wacht' op een of andere co-routine(s).

Dit is belangrijk omdat wanneer u een taak annuleert, deze misschien nog niet is uitgevoerd, wat betekent dat de taak-co-routine nog niet is aangeroepen.

Verschillende toestanden van een taak bij het annuleren van een taak

Met de 'cancel()' methode van een taak kunnen we markeren dat een taak geannuleerd moet worden. Uit het bovenstaande volgt dat wanneer de methode 'cancel()' wordt toegepast, de toestand van de taak kan zijn:

  • De taak is nog niet begonnen met lopen, of,
  • De taak is begonnen met lopen en voltooid, of,
  • De taak begon te lopen en wacht ergens op

Een taak annuleren die nog niet is begonnen met lopen betekent: de taak zal NIET lopen. Een taak annuleren die begonnen is met lopen en wacht betekent: de wachtende taak afbreken. Als de taak al is voltooid, dan wordt de taak niet gemarkeerd als geannuleerd.

Deze drie voorwaarden staan hieronder, ik draai met AsyncIO op Python 3.10. We moeten de taak 'afwachten', anders kloppen de resultaten niet! Wanneer een taak wordt geannuleerd, wordt een asyncio.CancelledError opgeworpen. De 'msg'-parameter van de methode 'cancel()' wordt pas doorgegeven aan de hoofdlus vanaf Python 3.11.

Een niet gestarte taak annuleren

# cancel a not started task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # cancel task before awaitable
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Er is een CancelledError opgetreden. Belangrijk in het resultaat is dat de taak niet is gestart:

main       create task
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Een voltooide taak annuleren

Hier voegen we een 'awaitable' toe, 'asyncio.sleep()' na het aanmaken van de taak:

# cancel a completed task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 3 sec, allow task to complete
    print(f'{f_main} before await sleep')
    await asyncio.sleep(3)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

De taak eindigde voordat hij werd geannuleerd. Het resultaat:

main       create task
main       before await sleep
mytask     before await sleep
mytask     after await sleep
main       after await sleep
main       task.done() = True
main       task.cancelled() = False
main       task.result(): normal return

Een niet voltooide (wachtende) taak annuleren

Hier wijzigen we de slaaptijd in de hoofdlus van 3 seconden naar 1 seconde voor het annuleren van de taak.

# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    print(f'{f_mytask} before await sleep')
    await asyncio.sleep(2)
    print(f'{f_mytask} after await sleep')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Het resultaat:

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Vastleggen van de CancelledError in de taak

Het is ook mogelijk om de asyncio.CancelledError vast te leggen in de taak. Merk op dat u niet kunt vertrouwen op retourwaarden van een taak, omdat de taak mogelijk helemaal niet start, zie hierboven. Hier is de code. Met het laatste voorbeeld hierboven voegen we de exception handling ook toe aan de taak.

# capture CancelledError in task
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

Het resultaat. Merk op dat de 'msg'-parameter van de 'cancel()' methode wordt doorgegeven aan de taak.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       task.done() = True
main       task.cancelled() = False
main       task.result(): CancelledError return

Als je wilt dat de CancelledError ook wordt doorgegeven aan de hoofdlus, dan kun je de exception opnieuw oproepen in de taak:

# capture CancelledError in task and raise it again
# cancel a not completed (waiting) task
import asyncio

async def mytask():
    f_mytask = f'{"mytask":10}'

    try:
        print(f'{f_mytask} before await sleep')
        await asyncio.sleep(2)
        print(f'{f_mytask} after await sleep')
        return f'normal return'
    except asyncio.CancelledError as e:
        print(f'{f_mytask} CancelledError raised, {e}')
        # raise the exception again
        raise
        return f'CancelledError return'

async def main():
    f_main = f'{"main":10}'

    # create task
    print(f'{f_main} create task')
    task = asyncio.create_task(mytask())

    # wait 1 sec, cancel task before completion
    print(f'{f_main} before await sleep')
    await asyncio.sleep(1)
    print(f'{f_main} after await sleep')

    # cancel task
    task.cancel(msg='my cancel reason')

    try:
        await task
    except asyncio.CancelledError as e:
        print(f'{f_main} CancelledError raised, {e}')

    print(f'{f_main} task.done() = {task.done()}')
    print(f'{f_main} task.cancelled() = {task.cancelled()}')
    if task.done() and not task.cancelled():
        print(f'{f_main} task.result(): {task.result()}')    

asyncio.run(main())

In het resultaat zien we de CancelledError eerst in de taak en daarna in de hoofdlus. Nogmaals, omdat we op Python 3.10 zitten, wordt de 'msg'-parameter van de 'cancel()' methode alleen doorgegeven aan de taak, niet aan de hoofdlus.

main       create task
main       before await sleep
mytask     before await sleep
main       after await sleep
mytask     CancelledError raised, my cancel reason
main       CancelledError raised, 
main       task.done() = True
main       task.cancelled() = True

Voorbeeld: 3 taken, met annulering

Hieronder staat de code uit het begin van deze post, 'Voorbeeld: 3 taken, geen annulering', aangepast voor het annuleren van taken. In mijn geval ben ik niet geïnteresseerd in het afvangen van de CancelledError in een taak, ik wil gewoon de taak of taken geannuleerd hebben. Terug naar de oorspronkelijke vraag, een taak annuleren wanneer een bepaalde taak een bepaalde waarde retourneert (HTTP-429 status), we annuleren taak0 wanneer we het resultaat van taak2 ontvangen.

Als we de reden van annulering willen krijgen, zoals opgegeven in de 'cancel()' methode, kunnen we de uitzondering in de lus oproepen door de 'exception()' methode aan te roepen.

# 3 tasks, with cancellation
import asyncio

async def mytask(task_num, sleep_num):
    f_mytask = f'{"mytask" + str(task_num):10}'

    print(f'{f_mytask} before sleep({sleep_num})')
    await asyncio.sleep(sleep_num)
    print(f'{f_mytask} after sleep({sleep_num}), returning ...')
    return f'normal return'

async def main():
    f_main = f'{"main":10}'

    # create 3 tasks
    tasks = []
    sleep_secs = [2, 1.5, 0.8]
    for task_id in range(3):
        print(f'{f_main} create task{task_id}')
        task = asyncio.create_task(mytask(task_id, sleep_secs[task_id]))
        task.id = task_id
        tasks.append(task)

    while True:
        tasks_done, tasks_pending = await asyncio.wait(tasks, timeout=0.3)
        print(f'{f_main} wait: {len(tasks_done)} done, {len(tasks_pending)} pending')
        for task in list(tasks_done):
            if task.cancelled():
                print(f'{f_main} task{task.id} was cancelled')
                # get cancel msg, python >= 3.11 only
                cancel_msg = None
                try:
                    task.exception()
                except asyncio.CancelledError as e:
                    cancel_msg = e
                print(f'{f_main} task{task.id} cancel_msg: {cancel_msg}')
            else:
                print(f'{f_main} task{task.id} was not cancelled, result: {task.result()}')    

                # cancel task0 if task2 completed
                if task.id == 2:
                    print(f'{f_main} task{task.id} cancel task0')    
                    tasks[0].cancel(msg='my cancel reason')

            tasks.remove(task)

        tasks_len = len(tasks)
        if tasks_len == 0:
            break

asyncio.run(main())

Het resultaat:

main       create task0
main       create task1
main       create task2
mytask0    before sleep(2)
mytask1    before sleep(1.5)
mytask2    before sleep(0.8)
main       wait: 0 done, 3 pending
main       wait: 0 done, 3 pending
mytask2    after sleep(0.8), returning ...
main       wait: 1 done, 2 pending
main       task2 was not cancelled, result: normal return
main       task2 cancel task0
main       wait: 1 done, 1 pending
main       task0 was cancelled
main       task0 cancel_msg: 
mytask1    after sleep(1.5), returning ...
main       wait: 1 done, 0 pending
main       task1 was not cancelled, result: normal return

Samenvatting

Het annuleren van een taak met AsyncIO is in het begin een beetje verwarrend. De AsyncIO methode 'create_task()' start geen taak, maar plant hem alleen voor uitvoering. Het belangrijkste om te onthouden is om een taak altijd te 'awaiten' voordat je de methoden 'done()' en 'cancelled()' controleert. Waar je de CancelledError opvangt, in de hoofdlus of in de taak, of beide, hangt af van je toepassing. En om de reden van annulering in de hoofdlus te krijgen moet je Python 3.11 omhoog.

Links / credits

asyncio.Task doesn't propagate CancelledError() exception correctly.
https://bugs.python.org/issue45390

Python - Asynchronous I/O - cancel
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel

Lees meer

Async

Laat een reactie achter

Reageer anoniem of log in om commentaar te geven.

Opmerkingen

Laat een antwoord achter

Antwoord anoniem of log in om te antwoorden.