angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Python Multiprocessing graceful shutdown in der richtigen Reihenfolge

Zuerst wird der worker()-Prozess gestoppt, dann werden die Daten des result_queue()-Prozesses gespült und gestoppt.

16 Juni 2021
post main image
https://unsplash.com/@marcella_mlk

Für ein neues Projekt benötigte ich einen Deamon-Prozess, der viele, mehr oder weniger identische, Operationen auf verschiedenen Ressourcen ausführen muss. In diesem Fall ist die Operation IO-gebunden und ich habe es mit ThreadPoolExecutor gelöst. So weit, so gut.

Als nächstes wollte ich die Ergebnisse in Dateien speichern. Natürlich benutzen wir eine Warteschlange, um zwischen den Prozessen zu kommunizieren. Der Prozess worker() verwendet q.put() , um Elemente zur Warteschlange hinzuzufügen und der Prozess result_queue() verwendet q.get() , um die Elemente aus der Warteschlange zu holen.

Mein result_queue() -Prozess puffert auch die empfangenen Ergebnisse. Sobald ein Schwellenwert erreicht ist, werden alle Ergebnisse auf einmal in eine Datei geschrieben.

Und hier ist das Problem. Wenn Sie CTRL-C drücken oder ein SIGTERM -Signal senden, werden die Prozesse abrupt beendet. Dies bedeutet:

  • Die Ressourcen, auf die der worker() zugegriffen hat, werden möglicherweise in einem schlechten Zustand zurückgelassen
  • Der result_queue() kann viele Ergebnisse enthalten, die nicht gespeichert werden

Nicht gut! Im Folgenden zeige ich Ihnen, wie ich dies gelöst habe. Wie immer verwende ich Ubuntu (20.04).

Ereignisse verwenden, um die Kindprozesse zu stoppen

Seiten im Internet über Python und Multiprocessing sind oft verwirrend, vor allem weil viele auf Python2 verweisen. Das Gute daran ist, dass man gezwungen ist, die offizielle Dokumentation zu lesen ...

Es gibt zwei Möglichkeiten, wie Prozesse (zuverlässig) miteinander kommunizieren können:

  • Warteschlangen
  • Ereignisse

In diesem Fall, um Prozesse zu stoppen, verwenden wir Ereignisse. Wir nennen unsere Ereignisse stop-events, weil sie den Kindprozess zum Anhalten auffordern. Das funktioniert folgendermaßen:

  1. Erzeugen Sie ein Ereignis (Objekt)
  2. Übergeben Sie das Ereignis als Parameter, wenn der Kindprozess gestartet wird
  3. Im Kindprozess: Prüfen Sie das Ereignis und stoppen Sie, wenn das Ereignis 'is_set()'
  4. Im Hauptprozess: 'set()' das Ereignis, wenn der Kindprozess gestoppt werden muss

Ersetzen der Handler für SIGINT / SIGTERM

Hier ist der Trick, der alles zum Laufen bringt:

Wenn Sie einen Kindprozess spawnen, erbt dieser Prozess auch die Signalhandler des Elternprozesses.

Wir wollen nicht, dass unsere Kindprozesse auf das Signal CTRL-C (SIGINT ) oder das Signal SIGTERM reagieren. Stattdessen wollen wir, dass der Hauptprozess entscheidet, wie die Kinder gestoppt werden. Das bedeutet, wir müssen:

  1. Ersetzen Sie die Handler SIGINT und SIGTERM für die Kindprozesse durch Dummy-Handler. Diese Handler bewirken nichts, d. h. die Kindprozesse werden durch diese Signale nicht beendet.
  2. Ersetzen Sie die Handler SIGINT und SIGTERM des Hauptprozesses durch unsere eigene Version. Diese Handler teilen dem Hauptprozess mit, dass er herunterfahren muss. Dies geschieht über ein Hauptstopp-Ereignis (Objekt)

Bei diesem Schema prüft der Hauptprozess das Hauptstopp-Ereignis. Wenn es 'is_set()' ist, verwendet er 'set()', um dem Kindprozess X mitzuteilen, dass er anhalten soll. Da wir Stop-Ereignisse an alle Kindprozesse weitergeben können, hat der Hauptprozess die volle Kontrolle. Er kann warten, bis ein Kindprozess angehalten hat, bevor er einen anderen Kindprozess anhält.

Der Code

Es gibt drei Prozesse:

  • main()
  • (Kind) worker(), kann mehrere Threads haben
  • (untergeordnet) result_queue()

Wenn ein worker() -Thread fertig ist, sendet er das Ergebnis an result_queue(). Um gracefully zu stoppen, müssen wir zuerst den Prozess worker() auffordern, zu stoppen, und wenn er gestoppt hat, müssen wir den Prozess result_queue() auffordern, zu stoppen.

Ich denke, der Code ist nicht schwer zu lesen. Nachdem die Kindprozesse gestartet wurden, prüft der Hauptprozess in einer Schleife, ob ein Herunterfahren angefordert wird.

Das einzige, was ich Ihnen nicht gesagt habe, ist, dass ich etwas Code hinzugefügt habe, um ein Herunterfahren nach einer bestimmten Anzahl von Sekunden zu erzwingen, falls ein Problem auftritt.

# procsig.py

import logging
import multiprocessing
import os
import signal
import sys
import time

def init_logger():
    logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s"
    formatter = logging.Formatter(logger_format)

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

logger = init_logger()


# main sigint/sigterm handlers
main_stop_event = multiprocessing.Event()

def main_sigint_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()

def main_sigterm_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()


# children sigint/sigterm handlers, let main process handle this
def children_sigint_handler(signum, frame):
    logger.debug('')

def children_sigterm_handler(signum, frame):
    logger.debug('')


def worker(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('worker: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('worker STOPPING ...')
    time.sleep(1)
    logger.debug('worker STOPPED')


def result_queue(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('result_queue: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('result_queue: STOPPING')
    time.sleep(1)
    logger.debug('result_queue: STOPPED')


def main():

    # events
    global main_stop_event
    worker_stop_event = multiprocessing.Event()
    result_queue_stop_event = multiprocessing.Event()

    # setup result queue
    rq = multiprocessing.Queue()

    # children: capture sigint/sigterm
    signal.signal(signal.SIGINT, children_sigint_handler)
    signal.signal(signal.SIGTERM, children_sigterm_handler)

    # start worker
    worker_proc = multiprocessing.Process(
        name='worker',
        target=worker,
        kwargs={'rq': rq, 'stop_event': worker_stop_event},
    )
    worker_proc.daemon = True
    worker_proc.start()

    # start result_queue
    result_queue_proc = multiprocessing.Process(
        name='result_queue',
        target=result_queue,
        kwargs={'rq': rq, 'stop_event': result_queue_stop_event},
    )
    result_queue_proc.daemon = True
    result_queue_proc.start()

    # main: capture sigint/sigterm
    signal.signal(signal.SIGINT, main_sigint_handler)
    signal.signal(signal.SIGTERM, main_sigterm_handler)

    child_procs = [worker_proc, result_queue_proc]

    worker_stop_sent = False
    result_queue_stop_sent = False
    max_allowed_shutdown_seconds = 10
    shutdown_et = None

    keep_running = True
    while keep_running:

        if shutdown_et is not None and shutdown_et < int(time.time()):
            logger.debug('Forced shutdown ...')
            break

        run_count = 0
        for p in child_procs:
            logger.debug('[P> {: <12} - {} - {} - {}'.format(p.name, p.pid, p.is_alive(), p.exitcode))
            if p.is_alive():
                run_count += 1

            # send 'stop' to result_queue()?
            if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent:
                logger.debug('Sending stop to result_queue ...')
                result_queue_stop_event.set()
                result_queue_stop_sent = True

        # send 'stop' to worker()?
        if main_stop_event.is_set() and not worker_stop_sent:
            logger.debug('Sending stop to worker ...')
            worker_stop_event.set()
            worker_stop_sent = True
            shutdown_et = int(time.time()) + max_allowed_shutdown_seconds

        time.sleep(1)
        if run_count == 0:
            keep_running = False

    logger.debug('terminating children ...')
    try:
        worker_proc.terminate()
        result_queue_proc.terminate()
        worker_proc.kill()
        result_queue_proc.terminate()
    except Exception as e:
        # who cares
        logger.debug('Exception {}, e = {}'.format(type(e).__name__, e))

    logger.debug('Done')

if __name__=='__main__':
    main()

Um dies zu testen, drücken Sie CTRL-C, oder öffnen Sie ein anderes Terminalfenster und beenden Sie diesen Prozess:

pkill  -f  procsig

Ergebnis protokollieren

Sie können dies selbst ausprobieren, unten zeige ich Ihnen, was auf der Konsole ausgegeben wird. Um '19:28:37' drücke ich CTRL-C. Sofort werden die Signalhandler aufgerufen. Der main()-Prozess sendet eine Stop-Nachricht an den worker() -Prozess, indem er 'set()' verwendet. Sobald der worker() -Prozess gestoppt ist, wird mittels 'is_alive()' eine Stop-Nachricht an den result_queue() -Prozess gesendet.

2021-06-16 19:28:34,238 DEBUG    [                  worker():182] worker: 6
2021-06-16 19:28:35,239 DEBUG    [                  worker():182] worker: 7
2021-06-16 19:28:35,239 DEBUG    [            result_queue():197] result_queue: 7
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:36,239 DEBUG    [                  worker():182] worker: 8
2021-06-16 19:28:36,240 DEBUG    [            result_queue():197] result_queue: 8
2021-06-16 19:28:36,240 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:36,241 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:37,239 DEBUG    [                  worker():182] worker: 9
2021-06-16 19:28:37,241 DEBUG    [            result_queue():197] result_queue: 9
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
^C2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [     main_sigint_handler():160] 
2021-06-16 19:28:38,240 DEBUG    [                  worker():182] worker: 10
2021-06-16 19:28:38,242 DEBUG    [            result_queue():197] result_queue: 10
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():271] Sending stop to worker ...
2021-06-16 19:28:39,241 DEBUG    [                  worker():186] worker STOPPING ...
2021-06-16 19:28:39,243 DEBUG    [            result_queue():197] result_queue: 11
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:40,242 DEBUG    [                  worker():188] worker STOPPED
2021-06-16 19:28:40,244 DEBUG    [            result_queue():197] result_queue: 12
2021-06-16 19:28:40,245 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:40,245 DEBUG    [                    main():265] Sending stop to result_queue ...
2021-06-16 19:28:40,246 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:41,246 DEBUG    [            result_queue():201] result_queue: STOPPING
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:42,247 DEBUG    [            result_queue():203] result_queue: STOPPED
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> result_queue - 465190 - False - 0
2021-06-16 19:28:43,249 DEBUG    [                    main():280] terminating children ...
2021-06-16 19:28:43,249 DEBUG    [                    main():290] Done

Zusammenfassung

Ich suchte nach einer Möglichkeit, Prozesse mit Python Multiprocessing elegant zu stoppen und wollte auch die Reihenfolge kontrollieren, in der die Prozesse gestoppt werden. Ich löste dies, indem ich Ereignisse verwendete und die Signalhandler ersetzte. Dies gibt mir die totale Kontrolle über den Hauptprozess. Ich kann jetzt darum bitten, Prozess B zu stoppen, nachdem Prozess A gestoppt wurde.

Links / Impressum

concurrent.futures — Launching parallel tasks
https://docs.python.org/3/library/concurrent.futures.html

Passing Messages to Processes / Signaling between Processes
https://pymotw.com/3/multiprocessing/communication.html

Things I Wish They Told Me About Multiprocessing in Python
https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/

Mehr erfahren

Multiprocessing

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.