angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Python Multiprocessing изящное завершение работы в правильном порядке

Сначала останавливается процесс worker(), затем данные процесса result_queue() промываются и останавливаются.

16 июня 2021 Обновленный 17 июня 2021
post main image
https://unsplash.com/@marcella_mlk

Для нового проекта мне понадобился процесс deamon, который должен выполнять множество более или менее одинаковых операций на различных ресурсах. В данном случае операция связана с IO, и я решил эту проблему с помощью ThreadPoolExecutor. Пока все хорошо.

Далее я хотел хранить результаты в файлах. Конечно, мы используем очередь для связи между процессами. Процесс worker() использует q.put() для добавления элементов в очередь, а процесс result_queue() использует q.get() для получения элементов из очереди.

Мой процесс result_queue() также буферизирует полученные результаты. Как только достигается порог, все результаты сразу записываются в файл.

И вот в чем проблема. Когда вы нажимаете CTRL-C или посылаете сигнал SIGTERM , то процессы резко завершаются. Это означает:

  • Ресурсы, к которым обращался worker() , могут быть оставлены в плохом состоянии.
  • result_queue() может содержать много результатов, которые не будут сохранены.

Нехорошо! Ниже я покажу, как я решил эту проблему. Как всегда, я использую Ubuntu (20.04).

Использование событий для остановки дочерних процессов

Страницы в интернете о Python и Multiprocessing часто сбивают с толку, в основном потому, что многие ссылаются на Python2. Хорошо то, что вы вынуждены читать официальную документацию ...

Существует два способа, с помощью которых процессы могут взаимодействовать (надежно) друг с другом:

  • очереди
  • события

В данном случае, чтобы остановить процессы, мы используем события. Мы называем наши события stop-events, потому что они требуют остановки дочернего процесса. Это работает следующим образом:

  1. Создайте событие (объект)
  2. Передайте событие в качестве параметра при запуске дочернего процесса.
  3. В дочернем процессе: проверьте событие и остановитесь, если событие 'is_set()'
  4. В главном процессе: 'set()' событие, если дочерний процесс должен быть остановлен.

Замените обработчики для SIGINT / SIGTERM

Вот трюк, который заставляет все работать:

Когда вы порождаете дочерний процесс, этот процесс также наследует обработчики сигналов от родительского.

Мы не хотим, чтобы наши дочерние процессы реагировали на CTRL-C (сигнал SIGINT ) или SIGTERM . Вместо этого мы хотим, чтобы главный процесс решал, как будут остановлены дочерние процессы. Это означает, что мы должны:

  1. Заменить обработчики SIGINT и SIGTERM для дочерних процессов на фиктивные обработчики. Эти обработчики ничего не делают, что означает, что дочерние процессы не завершаются по этим сигналам.
  2. Замените обработчики SIGINT и SIGTERM главного процесса на нашу собственную версию. Эти обработчики сообщают главному процессу, что он должен выключиться. Это делается с помощью события остановки главного процесса (объект)

При такой схеме главный процесс проверяет событие main stop. Если 'is_set()', он использует 'set()', чтобы сообщить дочернему процессу X о необходимости остановиться. Поскольку мы можем передавать события остановки всем дочерним процессам, главный процесс имеет полный контроль. Он может дождаться остановки дочернего процесса, прежде чем остановить другой дочерний процесс.

Код

Есть три процесса:

  • main()
  • (дочерний) worker(), может иметь несколько потоков
  • (дочерний) result_queue()

Когда поток worker() завершает работу, он отправляет результат в result_queue(). Для изящной остановки мы должны сначала попросить процесс worker() остановиться, а как только он остановился, мы должны попросить остановиться процесс result_queue() .

Думаю, код несложно прочитать. После запуска дочерних процессов главный процесс в цикле проверяет, не запрошена ли остановка.

Единственное, что я не сказал, это то, что я добавил код для принудительного выключения через определенное количество секунд в случае возникновения какой-либо проблемы.

# procsig.py

import logging
import multiprocessing
import os
import signal
import sys
import time

def init_logger():
    logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s"
    formatter = logging.Formatter(logger_format)

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

logger = init_logger()


# main sigint/sigterm handlers
main_stop_event = multiprocessing.Event()

def main_sigint_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()

def main_sigterm_handler(signum, frame):
    logger.debug('')
    main_stop_event.set()


# children sigint/sigterm handlers, let main process handle this
def children_sigint_handler(signum, frame):
    logger.debug('')

def children_sigterm_handler(signum, frame):
    logger.debug('')


def worker(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('worker: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('worker STOPPING ...')
    time.sleep(1)
    logger.debug('worker STOPPED')


def result_queue(rq, stop_event):

    i = 0
    while True:
        if stop_event.is_set():
            break
        logger.debug('result_queue: {}'.format(i))
        time.sleep(1)
        i += 1

    logger.debug('result_queue: STOPPING')
    time.sleep(1)
    logger.debug('result_queue: STOPPED')


def main():

    # events
    global main_stop_event
    worker_stop_event = multiprocessing.Event()
    result_queue_stop_event = multiprocessing.Event()

    # setup result queue
    rq = multiprocessing.Queue()

    # children: capture sigint/sigterm
    signal.signal(signal.SIGINT, children_sigint_handler)
    signal.signal(signal.SIGTERM, children_sigterm_handler)

    # start worker
    worker_proc = multiprocessing.Process(
        name='worker',
        target=worker,
        kwargs={'rq': rq, 'stop_event': worker_stop_event},
    )
    worker_proc.daemon = True
    worker_proc.start()

    # start result_queue
    result_queue_proc = multiprocessing.Process(
        name='result_queue',
        target=result_queue,
        kwargs={'rq': rq, 'stop_event': result_queue_stop_event},
    )
    result_queue_proc.daemon = True
    result_queue_proc.start()

    # main: capture sigint/sigterm
    signal.signal(signal.SIGINT, main_sigint_handler)
    signal.signal(signal.SIGTERM, main_sigterm_handler)

    child_procs = [worker_proc, result_queue_proc]

    worker_stop_sent = False
    result_queue_stop_sent = False
    max_allowed_shutdown_seconds = 10
    shutdown_et = None

    keep_running = True
    while keep_running:

        if shutdown_et is not None and shutdown_et < int(time.time()):
            logger.debug('Forced shutdown ...')
            break

        run_count = 0
        for p in child_procs:
            logger.debug('[P> {: <12} - {} - {} - {}'.format(p.name, p.pid, p.is_alive(), p.exitcode))
            if p.is_alive():
                run_count += 1

            # send 'stop' to result_queue()?
            if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent:
                logger.debug('Sending stop to result_queue ...')
                result_queue_stop_event.set()
                result_queue_stop_sent = True

        # send 'stop' to worker()?
        if main_stop_event.is_set() and not worker_stop_sent:
            logger.debug('Sending stop to worker ...')
            worker_stop_event.set()
            worker_stop_sent = True
            shutdown_et = int(time.time()) + max_allowed_shutdown_seconds

        time.sleep(1)
        if run_count == 0:
            keep_running = False

    logger.debug('terminating children ...')
    try:
        worker_proc.terminate()
        result_queue_proc.terminate()
        worker_proc.kill()
        result_queue_proc.terminate()
    except Exception as e:
        # who cares
        logger.debug('Exception {}, e = {}'.format(type(e).__name__, e))

    logger.debug('Done')

if __name__=='__main__':
    main()

Чтобы проверить это, нажмите CTRL-C, или откройте другое окно терминала и завершите этот процесс:

pkill  -f  procsig

Результат журнала

Вы можете попробовать это сами, ниже я покажу, что будет выведено в консоль. В '19:28:37' я нажал CTRL-C. Сразу же вызываются обработчики сигналов. Процесс main() посылает сообщение об остановке процессу worker() , используя 'set()'. Как только процесс worker() остановился, проверив 'is_alive()', процессу result_queue() посылается сообщение об остановке.

2021-06-16 19:28:34,238 DEBUG    [                  worker():182] worker: 6
2021-06-16 19:28:35,239 DEBUG    [                  worker():182] worker: 7
2021-06-16 19:28:35,239 DEBUG    [            result_queue():197] result_queue: 7
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:35,239 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:36,239 DEBUG    [                  worker():182] worker: 8
2021-06-16 19:28:36,240 DEBUG    [            result_queue():197] result_queue: 8
2021-06-16 19:28:36,240 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:36,241 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:37,239 DEBUG    [                  worker():182] worker: 9
2021-06-16 19:28:37,241 DEBUG    [            result_queue():197] result_queue: 9
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:37,242 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
^C2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [ children_sigint_handler():170] 
2021-06-16 19:28:37,598 DEBUG    [     main_sigint_handler():160] 
2021-06-16 19:28:38,240 DEBUG    [                  worker():182] worker: 10
2021-06-16 19:28:38,242 DEBUG    [            result_queue():197] result_queue: 10
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:38,243 DEBUG    [                    main():271] Sending stop to worker ...
2021-06-16 19:28:39,241 DEBUG    [                  worker():186] worker STOPPING ...
2021-06-16 19:28:39,243 DEBUG    [            result_queue():197] result_queue: 11
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> worker       - 465189 - True - None
2021-06-16 19:28:39,244 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:40,242 DEBUG    [                  worker():188] worker STOPPED
2021-06-16 19:28:40,244 DEBUG    [            result_queue():197] result_queue: 12
2021-06-16 19:28:40,245 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:40,245 DEBUG    [                    main():265] Sending stop to result_queue ...
2021-06-16 19:28:40,246 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:41,246 DEBUG    [            result_queue():201] result_queue: STOPPING
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:41,247 DEBUG    [                    main():259] [P> result_queue - 465190 - True - None
2021-06-16 19:28:42,247 DEBUG    [            result_queue():203] result_queue: STOPPED
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> worker       - 465189 - False - 0
2021-06-16 19:28:42,248 DEBUG    [                    main():259] [P> result_queue - 465190 - False - 0
2021-06-16 19:28:43,249 DEBUG    [                    main():280] terminating children ...
2021-06-16 19:28:43,249 DEBUG    [                    main():290] Done

Резюме

Я искал способ изящной остановки процессов с помощью Python Multiprocessing , а также хотел контролировать порядок остановки процессов. Я решил эту проблему, используя события и заменив обработчики сигналов. Это дает мне полный контроль над главным процессом. Теперь я могу попросить остановить процесс B после остановки процесса A.

Ссылки / кредиты

concurrent.futures — Launching parallel tasks
https://docs.python.org/3/library/concurrent.futures.html

Passing Messages to Processes / Signaling between Processes
https://pymotw.com/3/multiprocessing/communication.html

Things I Wish They Told Me About Multiprocessing in Python
https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/

Подробнее

Multiprocessing

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.