angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Python Multiprocessing graceful shutdown en el orden correcto

Primero se detiene el proceso worker(), luego se vacían los datos del proceso result_queue() y se detiene.

16 junio 2021 Actualizado 17 junio 2021
post main image
https://unsplash.com/@marcella_mlk

Para un nuevo proyecto necesitaba un proceso deamon que debe ejecutar muchas operaciones, más o menos idénticas, sobre diferentes recursos. En este caso la operación está ligada al IO y lo resolví utilizando ThreadPoolExecutor. Hasta aquí todo bien.

A continuación quería almacenar los resultados en archivos. Por supuesto, usamos una cola para comunicarnos entre procesos. El proceso worker() utiliza q.put() para añadir elementos a la cola y el proceso result_queue() utiliza q.get() para obtener los elementos de la cola.

Mi proceso result_queue() también almacena en búfer los resultados recibidos. Una vez que se alcanza un umbral, todos los resultados se escriben en un archivo a la vez.

Y aquí está el problema. Cuando se pulsa CTRL-C o se envía una señal SIGTERM los procesos se terminan abruptamente. Esto significa:

  • Los recursos a los que se estaba accediendo por el worker() pueden quedar en mal estado
  • El result_queue() puede contener muchos resultados que no se guardarán

No es bueno! A continuación os muestro cómo he solucionado esto. Como siempre, estoy usando Ubuntu (20.04).

Usar eventos para detener los procesos hijos

Las páginas en internet sobre Python y Multiprocessing suelen ser confusas, principalmente porque muchas hacen referencia a Python2. Lo bueno es que se obliga a leer la documentación oficial...

Hay dos formas en que los procesos pueden comunicarse (de forma fiable) entre sí:

  • colas
  • eventos

En este caso, para detener procesos, utilizamos eventos. Llamamos a nuestros eventos stop-events porque solicitan al proceso hijo que se detenga. Funciona así

  1. Crear un evento (objeto)
  2. Pasar el evento como parámetro cuando se inicia el proceso hijo
  3. En el proceso hijo: comprueba el evento y se detiene si el evento 'is_set()'
  4. En el proceso principal: 'set()' el evento si el hijo debe ser detenido

Reemplazar los manejadores para SIGINT / SIGTERM

Aquí está el truco que hace que todo funcione:

Cuando generas un proceso hijo, este proceso hereda también los manejadores de señales del padre.

No queremos que nuestros procesos hijos respondan a la señal CTRL-C (SIGINT ) o a la señal SIGTERM . En cambio, queremos que el proceso principal decida cómo se detienen los hijos. Esto significa que debemos

  1. Sustituir los manejadores SIGINT y SIGTERM de los procesos hijos por manejadores ficticios. Estos manejadores no hacen nada, lo que significa que los procesos hijos no son terminados por estas señales.
  2. Reemplazar los manejadores SIGINT y SIGTERM del proceso principal con nuestra propia versión. Estos manejadores le dicen al proceso principal que debe cerrarse. Esto se hace utilizando un evento de parada principal (objeto)

Con este esquema, el proceso principal comprueba el evento de parada principal. Si es 'is_set()' utiliza 'set()' para informar al proceso hijo X que se detenga. Como podemos pasar eventos de parada a todos los procesos hijos, el proceso principal tiene el control total. Puede esperar a que un proceso hijo se detenga, antes de detener otro proceso hijo.

El código

Hay tres procesos:

  • main()
  • (hijo) worker(), puede tener múltiples hilos
  • (hijo) result_queue()

Cuando un hilo worker() termina, envía el resultado a result_queue(). Para que se detenga con gracia primero debemos pedirle al proceso worker() que se detenga, y una vez que se detuvo, debemos pedirle al proceso result_queue() que se detenga.

Creo que el código no es difícil de leer. Una vez iniciados los procesos hijos, el proceso principal comprueba en un bucle si se solicita la parada.

Lo único que no te he dicho es que he añadido algo de código para forzar un cierre después de un número especificado de segundos en caso de algún problema.

# procsig.py

import logging
import multiprocessing
import os
import signal
import sys
import time

def init_logger():
    logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s"
    formatter = logging.Formatter(logger_format)

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

logger = init_logger()


# main sigint/sigterm handlers
main_stop_event = multiprocessing.Event()

def main_sigint_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()

def main_sigterm_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()


# children sigint/sigterm handlers, let main process handle this
def children_sigint_handler(signum, frame):
    logger.debug('')

def children_sigterm_handler(signum, frame):
    logger.debug('')


def worker(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('worker: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('worker STOPPING ...')
    time.sleep(1)
    logger.debug('worker STOPPED')


def result_queue(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('result_queue: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('result_queue: STOPPING')
    time.sleep(1)
    logger.debug('result_queue: STOPPED')


def main():

    # events
    global main_stop_event
    worker_stop_event = multiprocessing.Event()
    result_queue_stop_event = multiprocessing.Event()

    # setup result queue
    rq = multiprocessing.Queue()

    # children: capture sigint/sigterm
    signal.signal(signal.SIGINT, children_sigint_handler)
    signal.signal(signal.SIGTERM, children_sigterm_handler)

    # start worker
    worker_proc = multiprocessing.Process(
        name='worker',
        target=worker,
        kwargs={'rq': rq, 'stop_event': worker_stop_event},
    )
    worker_proc.daemon = True
    worker_proc.start()

    # start result_queue
    result_queue_proc = multiprocessing.Process(
        name='result_queue',
        target=result_queue,
        kwargs={'rq': rq, 'stop_event': result_queue_stop_event},
    )
    result_queue_proc.daemon = True
    result_queue_proc.start()

    # main: capture sigint/sigterm
    signal.signal(signal.SIGINT, main_sigint_handler)
    signal.signal(signal.SIGTERM, main_sigterm_handler)

    child_procs = [worker_proc, result_queue_proc]

    worker_stop_sent = False
    result_queue_stop_sent = False
    max_allowed_shutdown_seconds = 10
    shutdown_et = None

    keep_running = True
    while keep_running:

        if shutdown_et is not None and shutdown_et < int(time.time()):
            logger.debug('Forced shutdown ...')
            break

        run_count = 0
        for p in child_procs:
            logger.debug('[P> {: <12} - {} - {} - {}'.format(p.name, p.pid, p.is_alive(), p.exitcode))
            if p.is_alive():
                run_count += 1

            # send 'stop' to result_queue()?
            if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent:
                logger.debug('Sending stop to result_queue ...')
                result_queue_stop_event.set()
                result_queue_stop_sent = True

        # send 'stop' to worker()?
        if main_stop_event.is_set() and not worker_stop_sent:
            logger.debug('Sending stop to worker ...')
            worker_stop_event.set()
            worker_stop_sent = True
            shutdown_et = int(time.time()) + max_allowed_shutdown_seconds

        time.sleep(1)
        if run_count == 0:
            keep_running = False

    logger.debug('terminating children ...')
    try:
        worker_proc.terminate()
        result_queue_proc.terminate()
        worker_proc.kill()
        result_queue_proc.terminate()
    except Exception as e:
        # who cares
        logger.debug('Exception {}, e = {}'.format(type(e).__name__, e))

    logger.debug('Done')

if __name__=='__main__':
    main()

Para probar esto, pulsa CTRL-C, o abre otra ventana de terminal y termina este proceso:

pkill  -f  procsig

Resultado del registro

Puedes probarlo tú mismo, a continuación te muestro lo que se imprime en la consola. A las '19:28:37' he pulsado CTRL-C. Inmediatamente, los manejadores de señales son llamados. El proceso main() envía un mensaje de parada al proceso worker() , utilizando 'set()'. Una vez que el proceso worker() se detiene, mediante la comprobación de 'is_alive()', el proceso result_queue() recibe un mensaje de parada.

2021-06-16 19:28:34,238 DEBUG    [                  worker():182] worker: 6
2021-06-16 19:28:35,239 DEBUG    [                  worker():182] worker: 7
2021-06-16 19:28:35,239 DEBUG    [            result_queue():197] result_queue: 7
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:36,239 DEBUG    [                  worker():182] worker: 8
2021-06-16 19:28:36,240 DEBUG    [            result_queue():197] result_queue: 8
2021-06-16 19:28:36,240 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:36,241 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:37,239 DEBUG    [                  worker():182] worker: 9
2021-06-16 19:28:37,241 DEBUG    [            result_queue():197] result_queue: 9
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
^C2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [     main_sigint_handler():160] 
2021-06-16 19:28:38,240 DEBUG    [                  worker():182] worker: 10
2021-06-16 19:28:38,242 DEBUG    [            result_queue():197] result_queue: 10
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():271] Sending stop to worker ...
2021-06-16 19:28:39,241 DEBUG    [                  worker():186] worker STOPPING ...
2021-06-16 19:28:39,243 DEBUG    [            result_queue():197] result_queue: 11
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:40,242 DEBUG    [                  worker():188] worker STOPPED
2021-06-16 19:28:40,244 DEBUG    [            result_queue():197] result_queue: 12
2021-06-16 19:28:40,245 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:40,245 DEBUG    [                    main():265] Sending stop to result_queue ...
2021-06-16 19:28:40,246 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:41,246 DEBUG    [            result_queue():201] result_queue: STOPPING
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:42,247 DEBUG    [            result_queue():203] result_queue: STOPPED
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> result_queue - 465190 - False - 0
2021-06-16 19:28:43,249 DEBUG    [                    main():280] terminating children ...
2021-06-16 19:28:43,249 DEBUG    [                    main():290] Done

Resumen

Estaba buscando una forma de detener procesos de forma elegante con Python Multiprocessing y también quería controlar el orden en el que se detenían los procesos. Esto lo resolví usando eventos y reemplazando los manejadores de señales. Esto me da un control total en el proceso principal. Ahora puedo pedir que se detenga el proceso B después de que el proceso A se haya detenido.

Enlaces / créditos

concurrent.futures — Launching parallel tasks
https://docs.python.org/3/library/concurrent.futures.html

Passing Messages to Processes / Signaling between Processes
https://pymotw.com/3/multiprocessing/communication.html

Things I Wish They Told Me About Multiprocessing in Python
https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.