angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Python Multiprocessing graceful shutdown in the proper order

First the worker() process is stopped, then the result_queue() process data is flushed and stopped.

16 June 2021 Updated 16 June 2021
post main image
https://unsplash.com/@marcella_mlk

For a new project I needed a deamon process that must execute many, more or less identical, operations on different resources. In this case the operation is IO bound and I solved it by using ThreadPoolExecutor. So far, so good.

Next I wanted to store the results in files. Of course we use a queue to communicate between processes. The worker() process uses q.put() to add items to the queue and the result_queue() process uses q.get() to get the items from the queue.

My result_queue() process also buffers the received results. Once a threshold is reached, all results are written to a file at once.

And here is the problem. When you press CTRL-C or send a SIGTERM signal then the processes are abruptly terminated. This means:

  • The resources that were being accessed by the worker() may be left behind in a bad state
  • The result_queue() can contain many results that will not be saved

Not good! Below I show you how I solved this. As always, I am using Ubuntu (20.04).

Use events to stop the child processes

Pages on the internet about Python and Multiprocessing are often confusing, mainly because many are referencing Python2. The good thing is that you are forced to read the official documentation ...

There are two ways processes can communicate (reliably) with each other:

  • queues
  • events

In this case, to stop processes, we use events. We call our events stop-events because they request the child process to stop. It works like this:

  1. Create an event (object)
  2. Pass the event as a parameter when the child process is started
  3. In the child process: check the event and stop if the event 'is_set()'
  4. In the main process: 'set()' the event if the child must be stopped

Replace handlers for SIGINT / SIGTERM

Here is the trick that makes everything work:

When you spawn a child process, this process inherits also the signal handlers from the parent.

We do not want our child processes to respond to CTRL-C (SIGINT signal) or the SIGTERM signal. Instead, we want the main process to decide how the children are stopped. This means we must:

  1. Replace the SIGINT and SIGTERM handlers for the child processes with dummy handlers. These handlers do not do anything meaning that the child processes do not get terminated by these signals.
  2. Replace the SIGINT and SIGTERM handlers of the main process with our own version. These handlers tell the main process that it must shut down. This is done using a main stop event (object)

With this scheme, the main process checks the main stop event. If 'is_set()' it uses 'set()' to inform child process X to stop. Because we can pass stop events to all child processes, the main process has full control. It can wait for a child process to stop, before stopping another child process.

The code

There are three processes:

  • main()
  • (child) worker(), can have multiple threads
  • (child) result_queue()

When a worker() thread finishes it sends the result to result_queue(). To gracefully stop we must first ask the worker() process to stop, and once it stopped,  we must ask the result_queue() process to stop.

I think the code is not difficult to read. After the child processes have been started, the main process checks in a loop if a shutdown is requested.

The only thing I did not tell you is that I added some code to force a shutdown after a specified number of seconds in case of some problem.

# procsig.py

import logging
import multiprocessing
import os
import signal
import sys
import time

def init_logger():
    logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s"
    formatter = logging.Formatter(logger_format)

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

logger = init_logger()


# main sigint/sigterm handlers
main_stop_event = multiprocessing.Event()

def main_sigint_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()

def main_sigterm_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()


# children sigint/sigterm handlers, let main process handle this
def children_sigint_handler(signum, frame):
    logger.debug('')

def children_sigterm_handler(signum, frame):
    logger.debug('')


def worker(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('worker: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('worker STOPPING ...')
    time.sleep(1)
    logger.debug('worker STOPPED')


def result_queue(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('result_queue: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('result_queue: STOPPING')
    time.sleep(1)
    logger.debug('result_queue: STOPPED')


def main():

    # events
    global main_stop_event
    worker_stop_event = multiprocessing.Event()
    result_queue_stop_event = multiprocessing.Event()

    # setup result queue
    rq = multiprocessing.Queue()

    # children: capture sigint/sigterm
    signal.signal(signal.SIGINT, children_sigint_handler)
    signal.signal(signal.SIGTERM, children_sigterm_handler)

    # start worker
    worker_proc = multiprocessing.Process(
        name='worker',
        target=worker,
        kwargs={'rq': rq, 'stop_event': worker_stop_event},
    )
    worker_proc.daemon = True
    worker_proc.start()

    # start result_queue
    result_queue_proc = multiprocessing.Process(
        name='result_queue',
        target=result_queue,
        kwargs={'rq': rq, 'stop_event': result_queue_stop_event},
    )
    result_queue_proc.daemon = True
    result_queue_proc.start()

    # main: capture sigint/sigterm
    signal.signal(signal.SIGINT, main_sigint_handler)
    signal.signal(signal.SIGTERM, main_sigterm_handler)

    child_procs = [worker_proc, result_queue_proc]

    worker_stop_sent = False
    result_queue_stop_sent = False
    max_allowed_shutdown_seconds = 10
    shutdown_et = None

    keep_running = True
    while keep_running:

        if shutdown_et is not None and shutdown_et < int(time.time()):
            logger.debug('Forced shutdown ...')
            break

        run_count = 0
        for p in child_procs:
            logger.debug('[P> {: <12} - {} - {} - {}'.format(p.name, p.pid, p.is_alive(), p.exitcode))
            if p.is_alive():
                run_count += 1

            # send 'stop' to result_queue()?
            if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent:
                logger.debug('Sending stop to result_queue ...')
                result_queue_stop_event.set()
                result_queue_stop_sent = True

        # send 'stop' to worker()?
        if main_stop_event.is_set() and not worker_stop_sent:
            logger.debug('Sending stop to worker ...')
            worker_stop_event.set()
            worker_stop_sent = True
            shutdown_et = int(time.time()) + max_allowed_shutdown_seconds

        time.sleep(1)
        if run_count == 0:
            keep_running = False

    logger.debug('terminating children ...')
    try:
        worker_proc.terminate()
        result_queue_proc.terminate()
        worker_proc.kill()
        result_queue_proc.terminate()
    except Exception as e:
        # who cares
        logger.debug('Exception {}, e = {}'.format(type(e).__name__, e))

    logger.debug('Done')

if __name__=='__main__':
    main()

To test this, hit CTRL-C, or open another terminal window and terminate this process:

pkill  -f  procsig

Log result

You can try this yourself, below I show you what is printed to the console. At '19:28:37' I hit CTRL-C. Immediately, the signal handlers are called. The main() process sends a stop message to the worker() process, by using 'set()'. Once the worker() process stopped, by checking 'is_alive()', the result_queue() process is send a stop message.

2021-06-16 19:28:34,238 DEBUG    [                  worker():182] worker: 6
2021-06-16 19:28:35,239 DEBUG    [                  worker():182] worker: 7
2021-06-16 19:28:35,239 DEBUG    [            result_queue():197] result_queue: 7
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:36,239 DEBUG    [                  worker():182] worker: 8
2021-06-16 19:28:36,240 DEBUG    [            result_queue():197] result_queue: 8
2021-06-16 19:28:36,240 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:36,241 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:37,239 DEBUG    [                  worker():182] worker: 9
2021-06-16 19:28:37,241 DEBUG    [            result_queue():197] result_queue: 9
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
^C2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [     main_sigint_handler():160] 
2021-06-16 19:28:38,240 DEBUG    [                  worker():182] worker: 10
2021-06-16 19:28:38,242 DEBUG    [            result_queue():197] result_queue: 10
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():271] Sending stop to worker ...
2021-06-16 19:28:39,241 DEBUG    [                  worker():186] worker STOPPING ...
2021-06-16 19:28:39,243 DEBUG    [            result_queue():197] result_queue: 11
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:40,242 DEBUG    [                  worker():188] worker STOPPED
2021-06-16 19:28:40,244 DEBUG    [            result_queue():197] result_queue: 12
2021-06-16 19:28:40,245 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:40,245 DEBUG    [                    main():265] Sending stop to result_queue ...
2021-06-16 19:28:40,246 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:41,246 DEBUG    [            result_queue():201] result_queue: STOPPING
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:42,247 DEBUG    [            result_queue():203] result_queue: STOPPED
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> result_queue - 465190 - False - 0
2021-06-16 19:28:43,249 DEBUG    [                    main():280] terminating children ...
2021-06-16 19:28:43,249 DEBUG    [                    main():290] Done

Summary

I was looking for a way to stop processes gracefully with Python Multiprocessing and also wanted to control the order in which processes were stopped. I solved this by using events and replacing the signal handlers. This gives me total control in the main process. I can now ask to stop process B after process A stopped.

Links / credits

concurrent.futures — Launching parallel tasks
https://docs.python.org/3/library/concurrent.futures.html

Passing Messages to Processes / Signaling between Processes
https://pymotw.com/3/multiprocessing/communication.html

Things I Wish They Told Me About Multiprocessing in Python
https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/

Leave a comment

Comment anonymously or log in to comment.

Comments

Leave a reply

Reply anonymously or log in to reply.