angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Python Multiprocessing "graceful" uitschakeling in de juiste volgorde

Eerst wordt het worker() proces gestopt, vervolgens worden de gegevens van het result_queue() proces gespoeld en gestopt.

16 juni 2021
post main image
https://unsplash.com/@marcella_mlk

Voor een nieuw project had ik een deamon proces nodig dat veel, min of meer identieke, operaties moet uitvoeren op verschillende bronnen. In dit geval is de operatie IO gebonden en ik heb het opgelost door ThreadPoolExecutor te gebruiken. Zo ver, zo goed.

Vervolgens wilde ik de resultaten opslaan in bestanden. Natuurlijk gebruiken we een wachtrij om te communiceren tussen processen. Het worker() proces gebruikt q.put() om items aan de wachtrij toe te voegen en het result_queue() proces gebruikt q.get() om de items uit de wachtrij te halen.

Mijn result_queue() proces buffert ook de ontvangen resultaten. Zodra een drempel is bereikt, worden alle resultaten in een keer naar een bestand geschreven.

En hier is het probleem. Als je op CTRL-C drukt of een SIGTERM signaal stuurt dan worden de processen abrupt beëindigd. Dit betekent:

  • De middelen die door de worker() werden benaderd, kunnen in een slechte toestand worden achtergelaten
  • De result_queue() kan veel resultaten bevatten die niet bewaard zullen worden

Niet goed! Hieronder laat ik zien hoe ik dit heb opgelost. Zoals altijd gebruik ik Ubuntu (20.04).

Gebruik gebeurtenissen om de kind-processen te stoppen

Pagina's op het internet over Python en Multiprocessing zijn vaak verwarrend, vooral omdat velen verwijzen naar Python2. Het goede is dat je gedwongen wordt om de officiële documentatie te lezen ...

Er zijn twee manieren waarop processen (betrouwbaar) met elkaar kunnen communiceren:

  • wachtrijen
  • gebeurtenissen

In dit geval, om processen te stoppen, gebruiken we events. We noemen onze events stop-events omdat ze het kindproces vragen te stoppen. Het werkt als volgt:

  1. Maak een gebeurtenis (object)
  2. Geef de gebeurtenis door als een parameter wanneer het kind-proces wordt gestart
  3. In het kind proces: controleer de gebeurtenis en stop als de gebeurtenis 'is_set()'
  4. In het hoofdproces: 'set()' de gebeurtenis als het kind moet worden gestopt

Vervangen handlers voor SIGINT / SIGTERM

Hier is de truc die alles laat werken:

Wanneer je een kindproces spawnt, erft dit proces ook de signaalhandlers van de ouder.

We willen niet dat onze kind-processen reageren op CTRL-C (SIGINT signaal) of het SIGTERM signaal. In plaats daarvan willen we dat het hoofdproces beslist hoe de kinderen gestopt worden. Dit betekent dat we:

  1. De SIGINT en SIGTERM handlers voor de kind processen vervangen door dummy handlers. Deze handlers doen niets, wat betekent dat de kind-processen niet beëindigd worden door deze signalen.
  2. Vervang de SIGINT en SIGTERM handlers van het hoofdproces door onze eigen versie. Deze handlers vertellen het hoofdproces dat het moet afsluiten. Dit wordt gedaan met behulp van een hoofd stop gebeurtenis (object)

Met dit schema controleert het hoofdproces de main stop event. Indien 'is_set()' gebruikt het 'set()' om kind-proces X te informeren om te stoppen. Omdat we stop gebeurtenissen kunnen doorgeven aan alle kind-processen, heeft het hoofd-proces volledige controle. Het kan wachten tot een kind proces is gestopt, voordat het een ander kind proces stopt.

De code

Er zijn drie processen:

  • main()
  • (kind) worker(), kan meerdere threads hebben
  • (kind) result_queue()

Als een worker() thread klaar is, stuurt hij het resultaat naar result_queue(). Om sierlijk te stoppen moeten we eerst het worker() proces vragen te stoppen, en zodra het gestopt is, moeten we het result_queue() proces vragen te stoppen.

Ik denk dat de code niet moeilijk te lezen is. Nadat de kind-processen zijn gestart, controleert het hoofd-proces in een lus of er wordt gevraagd om te stoppen.

Het enige dat ik je niet verteld heb is dat ik wat code heb toegevoegd om een shutdown te forceren na een gespecificeerd aantal seconden in geval van een of ander probleem.

# procsig.py

import logging
import multiprocessing
import os
import signal
import sys
import time

def init_logger():
    logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s"
    formatter = logging.Formatter(logger_format)

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

logger = init_logger()


# main sigint/sigterm handlers
main_stop_event = multiprocessing.Event()

def main_sigint_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()

def main_sigterm_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()


# children sigint/sigterm handlers, let main process handle this
def children_sigint_handler(signum, frame):
    logger.debug('')

def children_sigterm_handler(signum, frame):
    logger.debug('')


def worker(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('worker: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('worker STOPPING ...')
    time.sleep(1)
    logger.debug('worker STOPPED')


def result_queue(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('result_queue: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('result_queue: STOPPING')
    time.sleep(1)
    logger.debug('result_queue: STOPPED')


def main():

    # events
    global main_stop_event
    worker_stop_event = multiprocessing.Event()
    result_queue_stop_event = multiprocessing.Event()

    # setup result queue
    rq = multiprocessing.Queue()

    # children: capture sigint/sigterm
    signal.signal(signal.SIGINT, children_sigint_handler)
    signal.signal(signal.SIGTERM, children_sigterm_handler)

    # start worker
    worker_proc = multiprocessing.Process(
        name='worker',
        target=worker,
        kwargs={'rq': rq, 'stop_event': worker_stop_event},
    )
    worker_proc.daemon = True
    worker_proc.start()

    # start result_queue
    result_queue_proc = multiprocessing.Process(
        name='result_queue',
        target=result_queue,
        kwargs={'rq': rq, 'stop_event': result_queue_stop_event},
    )
    result_queue_proc.daemon = True
    result_queue_proc.start()

    # main: capture sigint/sigterm
    signal.signal(signal.SIGINT, main_sigint_handler)
    signal.signal(signal.SIGTERM, main_sigterm_handler)

    child_procs = [worker_proc, result_queue_proc]

    worker_stop_sent = False
    result_queue_stop_sent = False
    max_allowed_shutdown_seconds = 10
    shutdown_et = None

    keep_running = True
    while keep_running:

        if shutdown_et is not None and shutdown_et < int(time.time()):
            logger.debug('Forced shutdown ...')
            break

        run_count = 0
        for p in child_procs:
            logger.debug('[P> {: <12} - {} - {} - {}'.format(p.name, p.pid, p.is_alive(), p.exitcode))
            if p.is_alive():
                run_count += 1

            # send 'stop' to result_queue()?
            if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent:
                logger.debug('Sending stop to result_queue ...')
                result_queue_stop_event.set()
                result_queue_stop_sent = True

        # send 'stop' to worker()?
        if main_stop_event.is_set() and not worker_stop_sent:
            logger.debug('Sending stop to worker ...')
            worker_stop_event.set()
            worker_stop_sent = True
            shutdown_et = int(time.time()) + max_allowed_shutdown_seconds

        time.sleep(1)
        if run_count == 0:
            keep_running = False

    logger.debug('terminating children ...')
    try:
        worker_proc.terminate()
        result_queue_proc.terminate()
        worker_proc.kill()
        result_queue_proc.terminate()
    except Exception as e:
        # who cares
        logger.debug('Exception {}, e = {}'.format(type(e).__name__, e))

    logger.debug('Done')

if __name__=='__main__':
    main()

Om dit te testen, druk op CTRL-C, of open een ander terminal venster en beëindig dit proces:

pkill  -f  procsig

Log resultaat

Je kunt dit zelf uitproberen, hieronder laat ik je zien wat er op de console wordt afgedrukt. Om '19:28:37' druk ik op CTRL-C. Onmiddellijk worden de signal handlers aangeroepen. Het main() proces stuurt een stop bericht naar het worker() proces, door 'set()' te gebruiken. Zodra het worker() proces gestopt is, door het controleren van 'is_alive()', wordt het result_queue() proces een stop-bericht gezonden.

2021-06-16 19:28:34,238 DEBUG    [                  worker():182] worker: 6
2021-06-16 19:28:35,239 DEBUG    [                  worker():182] worker: 7
2021-06-16 19:28:35,239 DEBUG    [            result_queue():197] result_queue: 7
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:36,239 DEBUG    [                  worker():182] worker: 8
2021-06-16 19:28:36,240 DEBUG    [            result_queue():197] result_queue: 8
2021-06-16 19:28:36,240 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:36,241 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:37,239 DEBUG    [                  worker():182] worker: 9
2021-06-16 19:28:37,241 DEBUG    [            result_queue():197] result_queue: 9
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
^C2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [     main_sigint_handler():160] 
2021-06-16 19:28:38,240 DEBUG    [                  worker():182] worker: 10
2021-06-16 19:28:38,242 DEBUG    [            result_queue():197] result_queue: 10
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():271] Sending stop to worker ...
2021-06-16 19:28:39,241 DEBUG    [                  worker():186] worker STOPPING ...
2021-06-16 19:28:39,243 DEBUG    [            result_queue():197] result_queue: 11
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:40,242 DEBUG    [                  worker():188] worker STOPPED
2021-06-16 19:28:40,244 DEBUG    [            result_queue():197] result_queue: 12
2021-06-16 19:28:40,245 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:40,245 DEBUG    [                    main():265] Sending stop to result_queue ...
2021-06-16 19:28:40,246 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:41,246 DEBUG    [            result_queue():201] result_queue: STOPPING
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:42,247 DEBUG    [            result_queue():203] result_queue: STOPPED
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> result_queue - 465190 - False - 0
2021-06-16 19:28:43,249 DEBUG    [                    main():280] terminating children ...
2021-06-16 19:28:43,249 DEBUG    [                    main():290] Done

Samenvatting

Ik was op zoek naar een manier om processen sierlijk te stoppen met Python Multiprocessing en wilde ook de volgorde controleren waarin processen gestopt werden. Ik heb dit opgelost door gebruik te maken van events en de signal handlers te vervangen. Dit geeft me totale controle in het hoofdproces. Ik kan nu vragen om proces B te stoppen nadat proces A gestopt is.

Links / credits

concurrent.futures — Launching parallel tasks
https://docs.python.org/3/library/concurrent.futures.html

Passing Messages to Processes / Signaling between Processes
https://pymotw.com/3/multiprocessing/communication.html

Things I Wish They Told Me About Multiprocessing in Python
https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/

Laat een reactie achter

Reageer anoniem of log in om commentaar te geven.

Opmerkingen

Laat een antwoord achter

Antwoord anoniem of log in om te antwoorden.