Multiprocessing, блокировка файлов, SQLite и тестирование
Тестирование на проблемы параллелизма сложнее и занимает больше времени, но без него не обойтись.
Я работал над проектом с SQLAlchemy и PostgreSQL. Для нескольких таблиц я хотел ограничить количество строк на user, и сделал это, добавив функцию проверки PostgreSQL и триггер.
Ручное тестирование показало, что все работает нормально, но что если user запустит несколько процессов и добавит строки в одно и то же время? Я добавил 'pg_advisory_xact_lock', но будет ли это действительно работать? Действительно ли я правильно понял документацию?
В этой заметке я показываю универсальный класс TaskRunner, который можно использовать для тестирования одновременных (параллельных) действий. В качестве тестового примера мы используем базу данных SQLite , которую мы пишем с помощью отдельных процессов.
Мы запускаем все процессы из одного процесса. В этом случае мы можем использовать Multiprocessing.Lock() для контроля доступа к SQLite. Но я также реализовал блокировщик файлов, который можно использовать, когда у нас есть полностью независимые процессы.
Как всегда, я запускаю это на Ubuntu 22.04.
Одновременный запуск действий
В нашей тестовой установке мы используем Multiprocessing.Event(), чтобы заставить все процессы ждать на одной и той же строке в коде задачи, за одну строку до "критического действия". Затем, когда все процессы достигли этой точки, мы "отпускаем" процессы и смотрим, что произойдет.
stop & release
|
v
task1 |--------------------->|-------->
task2 |----------------->|-------->
task3 |--------------->|-------->
|
taskN |-------->|-------->
--------------------------------------> t
В классе TaskRunner:
class TaskRunner:
...
def run_parallel_tasks(self, parallel_tasks_count):
...
self.mp_event = multiprocessing.Event()
...
for task_no in range(parallel_tasks_count):
p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
...
# release waiting all processes
time.sleep(self.release_time)
self.mp_event.set()
...
В нашей функции задачи:
def task(task_runner, task_no):
...
# all tasks will wait here
task_runner.mp_event.wait()
# critical action
...
Увеличение поля таблицы SQLite
В нашем тесте задачи (процессы) одновременно пытаются увеличить поле таблицы SQLite , 'counter',
путем:
- считывания значения поля
- увеличивая его
- обновление поля
Если у нас 100 задач, то результат в поле таблицы должен быть 100. Любое другое значение неверно.
Блокировка
Задача не может надежно выполнить операцию инкремента без получения эксклюзивного доступа к SQLite. Здесь мы используем блокировку, внешнюю по отношению к SQLite.
Мы можем distразличить следующее:
- (параллельные) задачи запускаются одним процессом
- (параллельные) задачи являются независимыми
В первом случае мы можем использовать Multiprocessing.Lock() и разделить эту блокировку между всеми нашими задачами. Для целей тестирования это вполне подходит.
Второй случай - это более реальный сценарий. Здесь мы не можем использовать Multiprocessing.Lock(), но мы можем использовать Linux блокировку файлов. Это быстро и надежно.
Блокировка - Multiprocessing.Lock()
Я хочу использовать Multiprocessing.Lock() в качестве менеджера контекста. К сожалению, мы не можем указать таймаут. Это означает, что мы должны написать контекстный менеджер самостоятельно:
# multiprocessing locker context manager with timeout
class mp_locker:
def __init__(
self,
mp_lock=None,
timeout=10,
):
self.mp_lock = mp_lock
self.timeout = timeout
def __enter__(self):
self.mp_lock.acquire(timeout=self.timeout)
def __exit__(self, exc_type, exc_value, exc_tb):
self.mp_lock.release()
Блокировка - Блокировка файлов
В интернете есть много примеров, как это сделать. И снова я хочу использовать это в качестве контекстного менеджера. Здесь я показываю только метод '__enter__()'.
# file locker context manager
...
def __enter__(self):
while True:
if (time.time() - ts) > self.timeout:
raise Exception('pid = {}: acquire lock timeout')
try:
self.lock_file_fo = open(self.lock_file, 'a')
fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError as e:
# another process locked the file, keep trying
time.sleep(self.wait_secs)
# propagate other exceptions
Мы остаемся в 'while-loop', пока не получим блокировку или не произойдет таймаут.
Класс TaskRunner
TaskRunner содержит всю логику для запуска нескольких задач (процессов).
Функции:
- before_tasks()
- задача()
- after_tasks()
- result_ok()
- после_результата()
Параметры:
- Количество одновременных задач.
- Количество повторов.
- Время освобождения ожидающих задач (после запуска).
- Уровень протоколирования.
- Multiprocessing.Lock() блокировка, или блокировка файла
- Таймаут блокировки.
Важно: Все ваши функции вызываются с объектом TaskRunner в качестве первого параметра. Это означает, что вы имеете доступ к атрибутам и методам TaskRunner, таким как:
- get_lock()
- get_logger()
Код
Код состоит из следующих частей:
- Класс TaskRunner и вспомогательные классы
- Ваши функции задачи
- Инстатация TaskRunner с вашими параметрами.
Когда вы запускаете код, на выходе получается что-то вроде:
INFO counter = 100 <- final value
INFO ready in 2.0454471111297607 seconds
Вот код на случай, если вы захотите попробовать сами:
import fcntl
import logging
import multiprocessing
import os
import sys
import time
import sqlite3
class DummyLogger:
def __getattr__(self, name):
return lambda *args, **kwargs: None
# file locker context manager
class f_locker:
def __init__(
self,
lock_file=None,
timeout=10,
logger=DummyLogger(),
wait_secs=.01,
):
self.lock_file = lock_file
self.timeout = timeout
self.logger = logger
self.wait_secs = wait_secs
# keep lock_file opened
self.lock_file_fo = None
def __enter__(self):
pid = os.getpid()
ts = time.time()
while True:
self.logger.debug('pid = {}: trying to acquire lock ...'.format(pid))
if (time.time() - ts) > self.timeout:
raise Exception('pid = {}: acquire lock timeout')
# keep trying until lock or timeout
try:
self.lock_file_fo = open(self.lock_file, 'a')
fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.logger.debug('pid = {}: lock acquired'.format(pid))
break
except BlockingIOError as e:
# another process locked the file, keep trying
self.logger.debug('pid = {}: cannot acquire lock'.format(pid))
time.sleep(self.wait_secs)
# propagate other exceptions
return True
def __exit__(self, exc_type, exc_value, exc_tb):
self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
pid = os.getpid()
self.logger.debug('pid = {}: trying to release lock ...'.format(pid))
fcntl.flock(self.lock_file_fo, fcntl.LOCK_UN)
self.logger.debug('pid = {}: lock released ...'.format(pid))
# multiprocessing locker context manager with timeout
class mp_locker:
def __init__(
self,
mp_lock=None,
timeout=10,
logger=DummyLogger(),
):
self.mp_lock = mp_lock
self.timeout = timeout
self.logger = logger
def __enter__(self):
self.pid = os.getpid()
self.logger.debug('pid = {}: trying to acquire lock ...'.format(self.pid))
self.mp_lock.acquire(timeout=self.timeout)
self.logger.debug('pid = {}: lock acquired'.format(self.pid))
def __exit__(self, exc_type, exc_value, exc_tb):
self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
self.logger.debug('pid = {}: trying to release lock ...'.format(self.pid))
self.mp_lock.release()
self.logger.debug('pid = {}: lock released ...'.format(self.pid))
class TaskRunner:
def __init__(
self,
loop_count=1,
parallel_tasks_count=1,
release_time=1.,
# functions
func_before_tasks=None,
func_task=None,
func_after_tasks=None,
func_result_ok=None,
func_after_result=None,
# logging
logger_level=logging.DEBUG,
# locking
lock_timeout=10,
use_file_locking=False,
lock_file='./lock_file',
lock_wait_secs=.01,
):
self.loop_count = loop_count
self.parallel_tasks_count = parallel_tasks_count
self.release_time = release_time
# functions
self.func_before_tasks = func_before_tasks
self.func_task = func_task
self.func_after_tasks = func_after_tasks
self.func_result_ok = func_result_ok
self.func_after_result = func_after_result
# logging
self.logger_level = logger_level
# locking
self.lock_timeout = lock_timeout
self.use_file_locking = use_file_locking
self.lock_file = lock_file
self.lock_wait_secs = lock_wait_secs
def get_logger(self, proc_name, logger_level=None):
if logger_level is None:
logger_level = self.logger_level
logger = logging.getLogger(proc_name)
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_logger_format = '%(asctime)s %(proc_name)-8.8s %(levelname)-8.8s [%(filename)-20s%(funcName)20s():%(lineno)03s] %(message)s'
console_handler.setFormatter(logging.Formatter(console_logger_format))
logger.setLevel(logger_level)
logger.addHandler(console_handler)
logger = logging.LoggerAdapter(logger, {'proc_name': proc_name})
return logger
def get_lock(self, timeout=None):
timeout = timeout or self.lock_timeout
if not self.use_file_locking:
return mp_locker(self.mp_lock, timeout=timeout, logger=self.logger)
return f_locker(self.lock_file, timeout=timeout, wait_secs=self.lock_wait_secs)
def run_parallel_tasks(self, parallel_tasks_count):
# before tasks
if self.func_before_tasks:
self.func_before_tasks(self)
self.mp_lock = multiprocessing.Lock()
self.mp_event = multiprocessing.Event()
tasks = []
for task_no in range(parallel_tasks_count):
p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
p.start()
tasks.append(p)
# release waiting processes
time.sleep(self.release_time)
self.mp_event.set()
# wait for all tasks to complete
for p in tasks:
p.join()
# after tasks
if self.func_after_tasks:
return self.func_after_tasks(self)
return None
def run(
self,
loop_count=None,
parallel_tasks_count=None,
):
self.logger = self.get_logger('main')
if loop_count is not None:
self.loop_count = loop_count
if parallel_tasks_count is not None:
self.parallel_tasks_count = parallel_tasks_count
start_time = time.time()
for loop_no in range(self.loop_count):
self.logger.debug('loop_no = {}'.format(loop_no))
result = self.run_parallel_tasks(self.parallel_tasks_count)
if self.func_result_ok:
if not self.func_result_ok(self, result):
self.logger.error('result = {}'.format(result))
break
else:
self.logger.info('result ok')
if self.func_after_result:
self.func_after_result(self)
run_secs = time.time() - start_time
self.logger.info('ready in {} seconds'.format(run_secs))
# ### YOUR CODE BELOW ### #
def before_tasks(task_runner):
# create a table, insert row with counter = 0
with sqlite3.connect('./test_tasks.db') as conn:
cursor = conn.cursor()
cursor.execute("""DROP TABLE IF EXISTS tasks""")
cursor.execute("""CREATE TABLE tasks (counter INTEGER)""")
cursor.execute("""INSERT INTO tasks (counter) VALUES (0)""")
conn.commit()
def task(task_runner, task_no):
logger = task_runner.get_logger('task' + str(task_no))
pid = os.getpid()
# wait for event
logger.debug('pid = {} waiting for event at {}'.format(pid, time.time()))
task_runner.mp_event.wait()
# wait for lock
lock = task_runner.get_lock()
logger.debug('pid = {} waiting for lock at {}'.format(pid, time.time()))
with lock:
# increment counter field
with sqlite3.connect('./test_tasks.db', timeout=10) as conn:
cursor = conn.cursor()
counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
logger.debug('counter = {}'.format(counter))
counter += 1
cursor.execute("""UPDATE tasks SET counter=?""", (counter,))
conn.commit()
def after_tasks(task_runner):
conn = sqlite3.connect('./test_tasks.db')
cursor = conn.cursor()
counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
task_runner.logger.info('counter = {} <- final value'.format(counter))
def result_ok(task_runner, result):
pass
def after_result(task_runner):
pass
def main():
tr = TaskRunner(
# functions
func_before_tasks=before_tasks,
func_task=task,
func_after_tasks=after_tasks,
#func_result_ok=result_ok,
func_after_result=after_result,
# logging
logger_level=logging.INFO,
# locking
use_file_locking=True,
)
tr.run(
loop_count=1,
parallel_tasks_count=100,
#parallel_tasks_count=2,
)
if __name__ == '__main__':
main()
Резюме
Нам нужен был простой способ тестирования параллельных операций. В прошлом я использовал пакет Python 'Locust' для тестирования параллелизма, см. пост "Использование Locust для нагрузочного тестирования приложения FastAPI с параллельными user". На этот раз я хотел сохранить его небольшим, гибким и расширяемым.
Кроме этого, мне также нужен был контекстный менеджер блокировки файлов для нескольких процессов. Мы реализовали оба варианта, тесты прошли. Пора вернуться к другим проектам.
Ссылки / кредиты
Python - fcntl
https://docs.python.org/3/library/fcntl.html
Python - multiprocessing
https://docs.python.org/3/library/multiprocessing.html
Python - SQLite3
https://docs.python.org/3/library/sqlite3.html
Using Locust to load test a FastAPI app with concurrent users
https://www.peterspython.com/en/blog/using-locust-to-load-test-a-fastapi-app-with-concurrent-users
Подробнее
Multiprocessing Testing
Недавний
- Don't Repeat Yourself (DRY) с Jinja2
- SQLAlchemy, PostgreSQL, максимальное количество строк для user
- Показать значения в динамических фильтрах SQLAlchemy
- Безопасная передача данных с помощью шифрования Public Key и pyNaCl
- rqlite: альтернатива dist с высокой степенью готовности и SQLite
- Нужно ли переносить Docker Swarm на Kubernetes?
Большинство просмотренных
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb
- Подключение к службе на хосте Docker из контейнера Docker
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов
- Использование PyInstaller и Cython для создания исполняемого файла Python
- Flask Удовлетворительный запрос API проверка параметров запроса с помощью схем Маршмэллоу