angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Arrêt gracieux de Python Multiprocessing dans l'ordre approprié

D'abord le processus worker() est arrêté, puis les données du processus result_queue() sont vidées et arrêtées.

16 juin 2021 Mise à jour 17 juin 2021
post main image
https://unsplash.com/@marcella_mlk

Pour un nouveau projet, j'avais besoin d'un processus deamon qui doit exécuter de nombreuses opérations, plus ou moins identiques, sur différentes ressources. Dans ce cas, l'opération est liée aux entrées-sorties et j'ai résolu le problème en utilisant ThreadPoolExecutor. Jusqu'ici, tout va bien.

Ensuite, je voulais stocker les résultats dans des fichiers. Bien sûr, nous utilisons une file d'attente pour communiquer entre les processus. Le processus worker() utilise q.put() pour ajouter des éléments à la file d'attente et le processus result_queue() utilise q.get() pour récupérer les éléments de la file d'attente.

Le processus result_queue() met également en mémoire tampon les résultats reçus. Lorsqu'un seuil est atteint, tous les résultats sont écrits dans un fichier en une seule fois.

Et voici le problème. Lorsque vous appuyez sur CTRL-C ou envoyez un signal SIGTERM , les processus s'arrêtent brusquement. Cela signifie que :

  • Les ressources auxquelles accédait le worker() peuvent être laissées dans un mauvais état.
  • La result_queue() peut contenir de nombreux résultats qui ne seront pas sauvegardés.

Pas bon ! Je vous montre ci-dessous comment j'ai résolu ce problème. Comme toujours, j'utilise Ubuntu (20.04).

Utiliser les événements pour arrêter les processus enfants

Les pages sur internet concernant Python et Multiprocessing sont souvent confuses, principalement parce que beaucoup font référence à Python2. L'avantage est que vous êtes obligé de lire la documentation officielle...

Il existe deux façons pour les processus de communiquer (de manière fiable) entre eux :

  • les files d'attente
  • les événements

Dans ce cas, pour arrêter les processus, nous utilisons des événements. Nous appelons nos événements stop-events car ils demandent au processus fils de s'arrêter. Cela fonctionne comme suit :

  1. Créer un événement (objet)
  2. Passez l'événement en tant que paramètre lorsque le processus enfant est lancé.
  3. Dans le processus enfant : vérifier l'événement et arrêter si l'événement 'is_set()'.
  4. Dans le processus principal : set()' l'événement si l'enfant doit être arrêté.

Remplacer les handlers pour SIGINT / SIGTERM

Voici l'astuce qui fait que tout fonctionne :

Lorsque vous créez un processus enfant, ce processus hérite également des gestionnaires de signaux du parent.

Nous ne voulons pas que nos processus enfants répondent à CTRL-C (signal SIGINT ) ou au signal SIGTERM . Au lieu de cela, nous voulons que le processus principal décide comment les enfants sont arrêtés. Cela signifie que nous devons :

  1. Remplacer les handlers SIGINT et SIGTERM pour les processus enfants par des handlers factices. Ces gestionnaires ne font rien, ce qui signifie que les processus enfants ne sont pas interrompus par ces signaux.
  2. Remplacer les gestionnaires SIGINT et SIGTERM du processus principal par notre propre version. Ces gestionnaires indiquent au processus principal qu'il doit s'arrêter. Ceci est fait en utilisant un événement d'arrêt principal (objet).

Avec ce schéma, le processus principal vérifie l'événement d'arrêt principal. Si 'is_set()', il utilise 'set()' pour informer le processus enfant X de s'arrêter. Comme nous pouvons transmettre des événements d'arrêt à tous les processus enfants, le processus principal a un contrôle total. Il peut attendre qu'un processus enfant s'arrête, avant d'arrêter un autre processus enfant.

Le code

Il y a trois processus :

  • main()
  • (enfant) worker(), peut avoir plusieurs threads.
  • (enfant) result_queue()

Quand un thread worker() se termine, il envoie le résultat à result_queue(). Pour un arrêt en douceur, nous devons d'abord demander au processus worker() de s'arrêter, et une fois qu'il s'est arrêté, nous devons demander au processus result_queue() de s'arrêter.

Je pense que le code n'est pas difficile à lire. Une fois que les processus enfants ont été lancés, le processus principal vérifie dans une boucle si un arrêt est demandé.

La seule chose que je ne vous ai pas dit est que j'ai ajouté du code pour forcer un arrêt après un nombre de secondes spécifié en cas de problème.

# procsig.py

import logging
import multiprocessing
import os
import signal
import sys
import time

def init_logger():
    logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s"
    formatter = logging.Formatter(logger_format)

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

logger = init_logger()


# main sigint/sigterm handlers
main_stop_event = multiprocessing.Event()

def main_sigint_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()

def main_sigterm_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()


# children sigint/sigterm handlers, let main process handle this
def children_sigint_handler(signum, frame):
    logger.debug('')

def children_sigterm_handler(signum, frame):
    logger.debug('')


def worker(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('worker: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('worker STOPPING ...')
    time.sleep(1)
    logger.debug('worker STOPPED')


def result_queue(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('result_queue: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('result_queue: STOPPING')
    time.sleep(1)
    logger.debug('result_queue: STOPPED')


def main():

    # events
    global main_stop_event
    worker_stop_event = multiprocessing.Event()
    result_queue_stop_event = multiprocessing.Event()

    # setup result queue
    rq = multiprocessing.Queue()

    # children: capture sigint/sigterm
    signal.signal(signal.SIGINT, children_sigint_handler)
    signal.signal(signal.SIGTERM, children_sigterm_handler)

    # start worker
    worker_proc = multiprocessing.Process(
        name='worker',
        target=worker,
        kwargs={'rq': rq, 'stop_event': worker_stop_event},
    )
    worker_proc.daemon = True
    worker_proc.start()

    # start result_queue
    result_queue_proc = multiprocessing.Process(
        name='result_queue',
        target=result_queue,
        kwargs={'rq': rq, 'stop_event': result_queue_stop_event},
    )
    result_queue_proc.daemon = True
    result_queue_proc.start()

    # main: capture sigint/sigterm
    signal.signal(signal.SIGINT, main_sigint_handler)
    signal.signal(signal.SIGTERM, main_sigterm_handler)

    child_procs = [worker_proc, result_queue_proc]

    worker_stop_sent = False
    result_queue_stop_sent = False
    max_allowed_shutdown_seconds = 10
    shutdown_et = None

    keep_running = True
    while keep_running:

        if shutdown_et is not None and shutdown_et < int(time.time()):
            logger.debug('Forced shutdown ...')
            break

        run_count = 0
        for p in child_procs:
            logger.debug('[P> {: <12} - {} - {} - {}'.format(p.name, p.pid, p.is_alive(), p.exitcode))
            if p.is_alive():
                run_count += 1

            # send 'stop' to result_queue()?
            if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent:
                logger.debug('Sending stop to result_queue ...')
                result_queue_stop_event.set()
                result_queue_stop_sent = True

        # send 'stop' to worker()?
        if main_stop_event.is_set() and not worker_stop_sent:
            logger.debug('Sending stop to worker ...')
            worker_stop_event.set()
            worker_stop_sent = True
            shutdown_et = int(time.time()) + max_allowed_shutdown_seconds

        time.sleep(1)
        if run_count == 0:
            keep_running = False

    logger.debug('terminating children ...')
    try:
        worker_proc.terminate()
        result_queue_proc.terminate()
        worker_proc.kill()
        result_queue_proc.terminate()
    except Exception as e:
        # who cares
        logger.debug('Exception {}, e = {}'.format(type(e).__name__, e))

    logger.debug('Done')

if __name__=='__main__':
    main()

Pour tester cela, tapez CTRL-C, ou ouvrez une autre fenêtre de terminal et terminez ce processus :

pkill  -f  procsig

Résultat du journal

Vous pouvez essayer vous-même, ci-dessous je vous montre ce qui est imprimé dans la console. À '19:28:37', je frappe CTRL-C. Immédiatement, les gestionnaires de signaux sont appelés. Le processus main() envoie un message d'arrêt au processus worker() , en utilisant 'set()'. Une fois le processus worker() arrêté, en vérifiant 'is_alive()', le processus result_queue() reçoit un message d'arrêt.

2021-06-16 19:28:34,238 DEBUG    [                  worker():182] worker: 6
2021-06-16 19:28:35,239 DEBUG    [                  worker():182] worker: 7
2021-06-16 19:28:35,239 DEBUG    [            result_queue():197] result_queue: 7
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:36,239 DEBUG    [                  worker():182] worker: 8
2021-06-16 19:28:36,240 DEBUG    [            result_queue():197] result_queue: 8
2021-06-16 19:28:36,240 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:36,241 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:37,239 DEBUG    [                  worker():182] worker: 9
2021-06-16 19:28:37,241 DEBUG    [            result_queue():197] result_queue: 9
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
^C2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [     main_sigint_handler():160] 
2021-06-16 19:28:38,240 DEBUG    [                  worker():182] worker: 10
2021-06-16 19:28:38,242 DEBUG    [            result_queue():197] result_queue: 10
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():271] Sending stop to worker ...
2021-06-16 19:28:39,241 DEBUG    [                  worker():186] worker STOPPING ...
2021-06-16 19:28:39,243 DEBUG    [            result_queue():197] result_queue: 11
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:40,242 DEBUG    [                  worker():188] worker STOPPED
2021-06-16 19:28:40,244 DEBUG    [            result_queue():197] result_queue: 12
2021-06-16 19:28:40,245 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:40,245 DEBUG    [                    main():265] Sending stop to result_queue ...
2021-06-16 19:28:40,246 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:41,246 DEBUG    [            result_queue():201] result_queue: STOPPING
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:42,247 DEBUG    [            result_queue():203] result_queue: STOPPED
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> result_queue - 465190 - False - 0
2021-06-16 19:28:43,249 DEBUG    [                    main():280] terminating children ...
2021-06-16 19:28:43,249 DEBUG    [                    main():290] Done

Résumé

Je cherchais un moyen d'arrêter les processus en douceur avec Python Multiprocessing et je voulais également contrôler l'ordre dans lequel les processus étaient arrêtés. J'ai résolu ce problème en utilisant des événements et en remplaçant les gestionnaires de signaux. Cela me donne un contrôle total sur le processus principal. Je peux maintenant demander l'arrêt du processus B après l'arrêt du processus A.

Liens / crédits

concurrent.futures — Launching parallel tasks
https://docs.python.org/3/library/concurrent.futures.html

Passing Messages to Processes / Signaling between Processes
https://pymotw.com/3/multiprocessing/communication.html

Things I Wish They Told Me About Multiprocessing in Python
https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/

En savoir plus...

Multiprocessing

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.