angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Multiprocessing, блокировка файлов, SQLite и тестирование

Тестирование на проблемы параллелизма сложнее и занимает больше времени, но без него не обойтись.

30 марта 2023
post main image

Я работал над проектом с SQLAlchemy и PostgreSQL. Для нескольких таблиц я хотел ограничить количество строк на user, и сделал это, добавив функцию проверки PostgreSQL и триггер.

Ручное тестирование показало, что все работает нормально, но что если user запустит несколько процессов и добавит строки в одно и то же время? Я добавил 'pg_advisory_xact_lock', но будет ли это действительно работать? Действительно ли я правильно понял документацию?

В этой заметке я показываю универсальный класс TaskRunner, который можно использовать для тестирования одновременных (параллельных) действий. В качестве тестового примера мы используем базу данных SQLite , которую мы пишем с помощью отдельных процессов.

Мы запускаем все процессы из одного процесса. В этом случае мы можем использовать Multiprocessing.Lock() для контроля доступа к SQLite. Но я также реализовал блокировщик файлов, который можно использовать, когда у нас есть полностью независимые процессы.

Как всегда, я запускаю это на Ubuntu 22.04.

Одновременный запуск действий

В нашей тестовой установке мы используем Multiprocessing.Event(), чтобы заставить все процессы ждать на одной и той же строке в коде задачи, за одну строку до "критического действия". Затем, когда все процессы достигли этой точки, мы "отпускаем" процессы и смотрим, что произойдет.

                         stop & release
                              |
                              v

task1  |--------------------->|-------->
task2      |----------------->|-------->
task3        |--------------->|-------->
                              |
taskN               |-------->|-------->

        --------------------------------------> t

В классе TaskRunner:

class TaskRunner:
    ...
    def run_parallel_tasks(self, parallel_tasks_count):
        ...
        self.mp_event = multiprocessing.Event()
        ...
        for task_no in range(parallel_tasks_count):
            p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
            ...

        # release waiting all processes
        time.sleep(self.release_time)
        self.mp_event.set()
        ...

В нашей функции задачи:

def task(task_runner, task_no):
    ...
    # all tasks will wait here 
    task_runner.mp_event.wait()

    # critical action
    ...

Увеличение поля таблицы SQLite

В нашем тесте задачи (процессы) одновременно пытаются увеличить поле таблицы SQLite , 'counter',
путем:

  • считывания значения поля
  • увеличивая его
  • обновление поля

Если у нас 100 задач, то результат в поле таблицы должен быть 100. Любое другое значение неверно.

Блокировка

Задача не может надежно выполнить операцию инкремента без получения эксклюзивного доступа к SQLite. Здесь мы используем блокировку, внешнюю по отношению к SQLite.

Мы можем distразличить следующее:

  1. (параллельные) задачи запускаются одним процессом
  2. (параллельные) задачи являются независимыми

В первом случае мы можем использовать Multiprocessing.Lock() и разделить эту блокировку между всеми нашими задачами. Для целей тестирования это вполне подходит.

Второй случай - это более реальный сценарий. Здесь мы не можем использовать Multiprocessing.Lock(), но мы можем использовать Linux блокировку файлов. Это быстро и надежно.

Блокировка - Multiprocessing.Lock()

Я хочу использовать Multiprocessing.Lock() в качестве менеджера контекста. К сожалению, мы не можем указать таймаут. Это означает, что мы должны написать контекстный менеджер самостоятельно:

# multiprocessing locker context manager with timeout
class mp_locker:
    def __init__(
        self,
        mp_lock=None,
        timeout=10,
    ):
        self.mp_lock = mp_lock
        self.timeout = timeout

    def __enter__(self):
        self.mp_lock.acquire(timeout=self.timeout)

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.mp_lock.release()

Блокировка - Блокировка файлов

В интернете есть много примеров, как это сделать. И снова я хочу использовать это в качестве контекстного менеджера. Здесь я показываю только метод '__enter__()'.

# file locker context manager
    ...
    def __enter__(self):
        while True:
            if (time.time() - ts) > self.timeout:
                raise Exception('pid = {}: acquire lock timeout')
            try:
                self.lock_file_fo = open(self.lock_file, 'a')
                fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
                break
            except BlockingIOError as e:
                # another process locked the file, keep trying
                time.sleep(self.wait_secs)
            # propagate other exceptions

Мы остаемся в 'while-loop', пока не получим блокировку или не произойдет таймаут.

Класс TaskRunner

TaskRunner содержит всю логику для запуска нескольких задач (процессов).

Функции:

  • before_tasks()
  • задача()
  • after_tasks()
  • result_ok()
  • после_результата()

Параметры:

  • Количество одновременных задач.
  • Количество повторов.
  • Время освобождения ожидающих задач (после запуска).
  • Уровень протоколирования.
  • Multiprocessing.Lock() блокировка, или блокировка файла
  • Таймаут блокировки.

Важно: Все ваши функции вызываются с объектом TaskRunner в качестве первого параметра. Это означает, что вы имеете доступ к атрибутам и методам TaskRunner, таким как:

  • get_lock()
  • get_logger()

Код

Код состоит из следующих частей:

  • Класс TaskRunner и вспомогательные классы
  • Ваши функции задачи
  • Инстатация TaskRunner с вашими параметрами.

Когда вы запускаете код, на выходе получается что-то вроде:

INFO     counter = 100 <- final value
INFO     ready in 2.0454471111297607 seconds

Вот код на случай, если вы захотите попробовать сами:

import fcntl
import logging
import multiprocessing
import os
import sys
import time

import sqlite3


class DummyLogger:
    def __getattr__(self, name):
        return lambda *args, **kwargs: None

# file locker context manager
class f_locker:
    def __init__(
        self,
        lock_file=None,
        timeout=10,
        logger=DummyLogger(),
        wait_secs=.01,
    ):
        self.lock_file = lock_file
        self.timeout = timeout
        self.logger = logger
        self.wait_secs = wait_secs
        # keep lock_file opened
        self.lock_file_fo = None

    def __enter__(self):
        pid = os.getpid()
        ts = time.time()
        while True:
            self.logger.debug('pid = {}: trying to acquire lock ...'.format(pid))
            if (time.time() - ts) > self.timeout:
                raise Exception('pid = {}: acquire lock timeout')
            # keep trying until lock or timeout
            try:
                self.lock_file_fo = open(self.lock_file, 'a')
                fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
                self.logger.debug('pid = {}: lock acquired'.format(pid))
                break
            except BlockingIOError as e:
                # another process locked the file, keep trying
                self.logger.debug('pid = {}: cannot acquire lock'.format(pid))
                time.sleep(self.wait_secs)
            # propagate other exceptions
        return True

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
        pid = os.getpid()
        self.logger.debug('pid = {}: trying to release lock ...'.format(pid))
        fcntl.flock(self.lock_file_fo, fcntl.LOCK_UN)
        self.logger.debug('pid = {}: lock released ...'.format(pid))


# multiprocessing locker context manager with timeout
class mp_locker:
    def __init__(
        self,
        mp_lock=None,
        timeout=10,
        logger=DummyLogger(),
    ):
        self.mp_lock = mp_lock
        self.timeout = timeout
        self.logger = logger

    def __enter__(self):
        self.pid = os.getpid()
        self.logger.debug('pid = {}: trying to acquire lock ...'.format(self.pid))
        self.mp_lock.acquire(timeout=self.timeout)
        self.logger.debug('pid = {}: lock acquired'.format(self.pid))

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
        self.logger.debug('pid = {}: trying to release lock ...'.format(self.pid))
        self.mp_lock.release()
        self.logger.debug('pid = {}: lock released ...'.format(self.pid))


class TaskRunner:
    def __init__(
        self,
        loop_count=1,
        parallel_tasks_count=1,
        release_time=1.,
        # functions
        func_before_tasks=None,
        func_task=None,
        func_after_tasks=None,
        func_result_ok=None,
        func_after_result=None,
        # logging
        logger_level=logging.DEBUG,
        # locking
        lock_timeout=10,
        use_file_locking=False,
        lock_file='./lock_file',
        lock_wait_secs=.01,
    ):
        self.loop_count = loop_count
        self.parallel_tasks_count = parallel_tasks_count
        self.release_time = release_time
        # functions
        self.func_before_tasks = func_before_tasks
        self.func_task = func_task
        self.func_after_tasks = func_after_tasks
        self.func_result_ok = func_result_ok
        self.func_after_result = func_after_result
        # logging
        self.logger_level = logger_level
        # locking
        self.lock_timeout = lock_timeout
        self.use_file_locking = use_file_locking
        self.lock_file = lock_file
        self.lock_wait_secs = lock_wait_secs

    def get_logger(self, proc_name, logger_level=None):
        if logger_level is None:
            logger_level = self.logger_level
        logger = logging.getLogger(proc_name)
        logger.setLevel(logging.DEBUG)
        console_handler = logging.StreamHandler()
        console_logger_format = '%(asctime)s %(proc_name)-8.8s %(levelname)-8.8s [%(filename)-20s%(funcName)20s():%(lineno)03s] %(message)s'
        console_handler.setFormatter(logging.Formatter(console_logger_format))
        logger.setLevel(logger_level)
        logger.addHandler(console_handler)
        logger = logging.LoggerAdapter(logger, {'proc_name': proc_name})
        return logger

    def get_lock(self, timeout=None):
        timeout = timeout or self.lock_timeout
        if not self.use_file_locking:
            return mp_locker(self.mp_lock, timeout=timeout, logger=self.logger)
        return f_locker(self.lock_file, timeout=timeout, wait_secs=self.lock_wait_secs)

    def run_parallel_tasks(self, parallel_tasks_count):
        # before tasks
        if self.func_before_tasks:
            self.func_before_tasks(self)

        self.mp_lock = multiprocessing.Lock()
        self.mp_event = multiprocessing.Event()
        tasks = []
        for task_no in range(parallel_tasks_count):
            p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
            p.start()
            tasks.append(p)

        # release waiting processes
        time.sleep(self.release_time)
        self.mp_event.set()

        # wait for all tasks to complete
        for p in tasks:
            p.join()

        # after tasks
        if self.func_after_tasks:
            return self.func_after_tasks(self)
        return None

    def run(
        self, 
        loop_count=None,
        parallel_tasks_count=None,
    ):
        self.logger = self.get_logger('main')
        if loop_count is not None:
            self.loop_count = loop_count
        if parallel_tasks_count is not None:
            self.parallel_tasks_count = parallel_tasks_count
    
        start_time = time.time()
        for loop_no in range(self.loop_count):
            self.logger.debug('loop_no = {}'.format(loop_no))

            result = self.run_parallel_tasks(self.parallel_tasks_count)
            if self.func_result_ok:
                if not self.func_result_ok(self, result):
                    self.logger.error('result = {}'.format(result))
                    break
                else:
                    self.logger.info('result ok')
                        
        if self.func_after_result:
            self.func_after_result(self)

        run_secs = time.time() - start_time
        self.logger.info('ready in {} seconds'.format(run_secs))

# ### YOUR CODE BELOW ### #

def before_tasks(task_runner):
    # create a table, insert row with counter = 0
    with sqlite3.connect('./test_tasks.db') as conn:
        cursor = conn.cursor()
        cursor.execute("""DROP TABLE IF EXISTS tasks""")
        cursor.execute("""CREATE TABLE tasks (counter INTEGER)""")
        cursor.execute("""INSERT INTO tasks (counter) VALUES (0)""")
        conn.commit()

def task(task_runner, task_no):
    logger = task_runner.get_logger('task' + str(task_no))
    pid = os.getpid()

    # wait for event
    logger.debug('pid = {} waiting for event at {}'.format(pid, time.time()))
    task_runner.mp_event.wait()

    # wait for lock
    lock = task_runner.get_lock()
    logger.debug('pid = {} waiting for lock at  {}'.format(pid, time.time()))
    with lock:
        # increment counter field
        with sqlite3.connect('./test_tasks.db', timeout=10) as conn:
            cursor = conn.cursor()
            counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
            logger.debug('counter = {}'.format(counter))
            counter += 1
            cursor.execute("""UPDATE tasks SET counter=?""", (counter,))
            conn.commit()

def after_tasks(task_runner):
    conn = sqlite3.connect('./test_tasks.db')
    cursor = conn.cursor()
    counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
    task_runner.logger.info('counter = {} <- final value'.format(counter))

def result_ok(task_runner, result):
    pass

def after_result(task_runner):
    pass

def main():
    tr = TaskRunner(
        # functions
        func_before_tasks=before_tasks,
        func_task=task,
        func_after_tasks=after_tasks,
        #func_result_ok=result_ok,
        func_after_result=after_result,
        # logging
        logger_level=logging.INFO,
        # locking
        use_file_locking=True,
    )
    tr.run(
        loop_count=1,
        parallel_tasks_count=100,
        #parallel_tasks_count=2,
    )

if __name__ == '__main__':
    main()

Резюме

Нам нужен был простой способ тестирования параллельных операций. В прошлом я использовал пакет Python 'Locust' для тестирования параллелизма, см. пост "Использование Locust для нагрузочного тестирования приложения FastAPI с параллельными user". На этот раз я хотел сохранить его небольшим, гибким и расширяемым.
Кроме этого, мне также нужен был контекстный менеджер блокировки файлов для нескольких процессов. Мы реализовали оба варианта, тесты прошли. Пора вернуться к другим проектам.

Ссылки / кредиты

Python - fcntl
https://docs.python.org/3/library/fcntl.html

Python - multiprocessing
https://docs.python.org/3/library/multiprocessing.html

Python - SQLite3
https://docs.python.org/3/library/sqlite3.html

Using Locust to load test a FastAPI app with concurrent users
https://www.peterspython.com/en/blog/using-locust-to-load-test-a-fastapi-app-with-concurrent-users

Подробнее

Multiprocessing Testing

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.