angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Multiprocessing, bloqueo de archivos, SQLite y pruebas

Las pruebas para detectar problemas de concurrencia son más difíciles y llevan más tiempo, pero no se puede prescindir de ellas.

30 marzo 2023
post main image

Estaba trabajando en un proyecto con SQLAlchemy y PostgreSQL. Para algunas tablas, quería limitar el número de filas por user, y lo hice añadiendo una función de comprobación y un trigger PostgreSQL .

Probando manualmente todo parecía estar funcionando bien, pero ¿qué pasaría si un user iniciara múltiples procesos y añadiera filas exactamente al mismo tiempo? He añadido el 'pg_advisory_xact_lock' pero ¿funcionará realmente? ¿He entendido bien la documentación?

En este post muestro una clase TaskRunner universal que puede ser usada para probar acciones simultáneas (concurrentes). Como caso de prueba, usamos una base de datos SQLite que escribimos con procesos separados.

Iniciamos todos los procesos desde un único proceso. En este caso, podemos usar Multiprocessing.Lock() para controlar el acceso a SQLite. Pero también implementé un casillero de archivos que puede ser usado cuando tenemos procesos totalmente independientes.

Como siempre estoy ejecutando esto en Ubuntu 22.04.

Iniciando acciones al mismo tiempo

En nuestra configuración de prueba usamos Multiprocessing.Event() para hacer que todos los procesos esperen en la misma línea en el código de la tarea, una línea antes de la 'acción crítica'. Entonces, cuando todos los procesos han llegado a este punto, 'liberamos' los procesos y vemos qué pasa.

                         stop & release
                              |
                              v

task1  |--------------------->|-------->
task2      |----------------->|-------->
task3        |--------------->|-------->
                              |
taskN               |-------->|-------->

        --------------------------------------> t

En la clase TaskRunner:

class TaskRunner:
    ...
    def run_parallel_tasks(self, parallel_tasks_count):
        ...
        self.mp_event = multiprocessing.Event()
        ...
        for task_no in range(parallel_tasks_count):
            p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
            ...

        # release waiting all processes
        time.sleep(self.release_time)
        self.mp_event.set()
        ...

En nuestra función de tarea:

def task(task_runner, task_no):
    ...
    # all tasks will wait here 
    task_runner.mp_event.wait()

    # critical action
    ...

Incrementar un campo de la tabla SQLite

En nuestra prueba, las tareas (procesos) intentan simultáneamente incrementar un campo de la tabla SQLite , 'counter',
mediante:

  • leyendo el valor del campo
  • incrementándolo
  • actualizando el campo

Si tenemos 100 tareas, entonces el resultado en el campo de la tabla debe ser 100. Cualquier otro valor es incorrecto. Cualquier otro valor es incorrecto.

Bloqueo de

Una tarea no puede realizar la operación de incremento de forma fiable sin obtener acceso exclusivo a SQLite. En este caso, utilizamos un bloqueo externo a SQLite.

Podemos dist distinguir lo siguiente:

  1. Las tareas (concurrentes) son iniciadas por un único proceso
  2. Las tareas (concurrentes) son independientes

En el primer caso, podemos utilizar Multiprocessing.Lock() y compartir este bloqueo entre todas nuestras tareas. Esto está bien para las pruebas.

El segundo caso es un escenario más real. Aquí no podemos usar Multiprocessing.Lock() pero podemos usar el bloqueo de ficheros Linux . Esto es rápido y fiable.

Bloqueo - Multiprocessing.Lock()

Quiero utilizar Multiprocessing.Lock() como gestor de contexto. Desafortunadamente, no podemos especificar un tiempo de espera. Esto significa que debemos escribir el gestor de contexto nosotros mismos:

# multiprocessing locker context manager with timeout
class mp_locker:
    def __init__(
        self,
        mp_lock=None,
        timeout=10,
    ):
        self.mp_lock = mp_lock
        self.timeout = timeout

    def __enter__(self):
        self.mp_lock.acquire(timeout=self.timeout)

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.mp_lock.release()

Bloqueo - Bloqueo de ficheros

Hay muchos ejemplos en Internet sobre cómo hacer esto. De nuevo quiero usar esto como gestor de contexto. Aquí sólo muestro el método '__enter__()'.

# file locker context manager
    ...
    def __enter__(self):
        while True:
            if (time.time() - ts) > self.timeout:
                raise Exception('pid = {}: acquire lock timeout')
            try:
                self.lock_file_fo = open(self.lock_file, 'a')
                fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
                break
            except BlockingIOError as e:
                # another process locked the file, keep trying
                time.sleep(self.wait_secs)
            # propagate other exceptions

Permanecemos en el 'while-loop' hasta que adquirimos el bloqueo o se produce un timeout.

La clase TaskRunner

La clase TaskRunner contiene toda la lógica para iniciar múltiples tareas (procesos).

Funciones:

  • before_tasks()
  • tarea()
  • después_tareas()
  • resultado_ok()
  • después_resultado()

Opciones:

  • Número de tareas concurrentes.
  • Número de veces a repetir.
  • Tiempo de liberación de las tareas en espera (después del inicio).
  • Nivel de registro.
  • Multiprocessing.Bloqueo(), o bloqueo de archivos
  • Tiempo de espera de bloqueo.

Importante: Todas sus funciones son llamadas con el objeto TaskRunner como primer parámetro. Esto significa que tiene acceso a los atributos y métodos de TaskRunner como:

  • get_lock()
  • get_logger()

El código

El código consta de las siguientes partes

  • Clase TaskRunner y clases de soporte
  • Sus funciones de tarea
  • TaskRunner instatiation con sus parámetros

Cuando ejecutas el código, la salida es algo así:

INFO     counter = 100 <- final value
INFO     ready in 2.0454471111297607 seconds

Aquí está el código por si quieres intentarlo tú mismo:

import fcntl
import logging
import multiprocessing
import os
import sys
import time

import sqlite3


class DummyLogger:
    def __getattr__(self, name):
        return lambda *args, **kwargs: None

# file locker context manager
class f_locker:
    def __init__(
        self,
        lock_file=None,
        timeout=10,
        logger=DummyLogger(),
        wait_secs=.01,
    ):
        self.lock_file = lock_file
        self.timeout = timeout
        self.logger = logger
        self.wait_secs = wait_secs
        # keep lock_file opened
        self.lock_file_fo = None

    def __enter__(self):
        pid = os.getpid()
        ts = time.time()
        while True:
            self.logger.debug('pid = {}: trying to acquire lock ...'.format(pid))
            if (time.time() - ts) > self.timeout:
                raise Exception('pid = {}: acquire lock timeout')
            # keep trying until lock or timeout
            try:
                self.lock_file_fo = open(self.lock_file, 'a')
                fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
                self.logger.debug('pid = {}: lock acquired'.format(pid))
                break
            except BlockingIOError as e:
                # another process locked the file, keep trying
                self.logger.debug('pid = {}: cannot acquire lock'.format(pid))
                time.sleep(self.wait_secs)
            # propagate other exceptions
        return True

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
        pid = os.getpid()
        self.logger.debug('pid = {}: trying to release lock ...'.format(pid))
        fcntl.flock(self.lock_file_fo, fcntl.LOCK_UN)
        self.logger.debug('pid = {}: lock released ...'.format(pid))


# multiprocessing locker context manager with timeout
class mp_locker:
    def __init__(
        self,
        mp_lock=None,
        timeout=10,
        logger=DummyLogger(),
    ):
        self.mp_lock = mp_lock
        self.timeout = timeout
        self.logger = logger

    def __enter__(self):
        self.pid = os.getpid()
        self.logger.debug('pid = {}: trying to acquire lock ...'.format(self.pid))
        self.mp_lock.acquire(timeout=self.timeout)
        self.logger.debug('pid = {}: lock acquired'.format(self.pid))

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
        self.logger.debug('pid = {}: trying to release lock ...'.format(self.pid))
        self.mp_lock.release()
        self.logger.debug('pid = {}: lock released ...'.format(self.pid))


class TaskRunner:
    def __init__(
        self,
        loop_count=1,
        parallel_tasks_count=1,
        release_time=1.,
        # functions
        func_before_tasks=None,
        func_task=None,
        func_after_tasks=None,
        func_result_ok=None,
        func_after_result=None,
        # logging
        logger_level=logging.DEBUG,
        # locking
        lock_timeout=10,
        use_file_locking=False,
        lock_file='./lock_file',
        lock_wait_secs=.01,
    ):
        self.loop_count = loop_count
        self.parallel_tasks_count = parallel_tasks_count
        self.release_time = release_time
        # functions
        self.func_before_tasks = func_before_tasks
        self.func_task = func_task
        self.func_after_tasks = func_after_tasks
        self.func_result_ok = func_result_ok
        self.func_after_result = func_after_result
        # logging
        self.logger_level = logger_level
        # locking
        self.lock_timeout = lock_timeout
        self.use_file_locking = use_file_locking
        self.lock_file = lock_file
        self.lock_wait_secs = lock_wait_secs

    def get_logger(self, proc_name, logger_level=None):
        if logger_level is None:
            logger_level = self.logger_level
        logger = logging.getLogger(proc_name)
        logger.setLevel(logging.DEBUG)
        console_handler = logging.StreamHandler()
        console_logger_format = '%(asctime)s %(proc_name)-8.8s %(levelname)-8.8s [%(filename)-20s%(funcName)20s():%(lineno)03s] %(message)s'
        console_handler.setFormatter(logging.Formatter(console_logger_format))
        logger.setLevel(logger_level)
        logger.addHandler(console_handler)
        logger = logging.LoggerAdapter(logger, {'proc_name': proc_name})
        return logger

    def get_lock(self, timeout=None):
        timeout = timeout or self.lock_timeout
        if not self.use_file_locking:
            return mp_locker(self.mp_lock, timeout=timeout, logger=self.logger)
        return f_locker(self.lock_file, timeout=timeout, wait_secs=self.lock_wait_secs)

    def run_parallel_tasks(self, parallel_tasks_count):
        # before tasks
        if self.func_before_tasks:
            self.func_before_tasks(self)

        self.mp_lock = multiprocessing.Lock()
        self.mp_event = multiprocessing.Event()
        tasks = []
        for task_no in range(parallel_tasks_count):
            p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
            p.start()
            tasks.append(p)

        # release waiting processes
        time.sleep(self.release_time)
        self.mp_event.set()

        # wait for all tasks to complete
        for p in tasks:
            p.join()

        # after tasks
        if self.func_after_tasks:
            return self.func_after_tasks(self)
        return None

    def run(
        self, 
        loop_count=None,
        parallel_tasks_count=None,
    ):
        self.logger = self.get_logger('main')
        if loop_count is not None:
            self.loop_count = loop_count
        if parallel_tasks_count is not None:
            self.parallel_tasks_count = parallel_tasks_count
    
        start_time = time.time()
        for loop_no in range(self.loop_count):
            self.logger.debug('loop_no = {}'.format(loop_no))

            result = self.run_parallel_tasks(self.parallel_tasks_count)
            if self.func_result_ok:
                if not self.func_result_ok(self, result):
                    self.logger.error('result = {}'.format(result))
                    break
                else:
                    self.logger.info('result ok')
                        
        if self.func_after_result:
            self.func_after_result(self)

        run_secs = time.time() - start_time
        self.logger.info('ready in {} seconds'.format(run_secs))

# ### YOUR CODE BELOW ### #

def before_tasks(task_runner):
    # create a table, insert row with counter = 0
    with sqlite3.connect('./test_tasks.db') as conn:
        cursor = conn.cursor()
        cursor.execute("""DROP TABLE IF EXISTS tasks""")
        cursor.execute("""CREATE TABLE tasks (counter INTEGER)""")
        cursor.execute("""INSERT INTO tasks (counter) VALUES (0)""")
        conn.commit()

def task(task_runner, task_no):
    logger = task_runner.get_logger('task' + str(task_no))
    pid = os.getpid()

    # wait for event
    logger.debug('pid = {} waiting for event at {}'.format(pid, time.time()))
    task_runner.mp_event.wait()

    # wait for lock
    lock = task_runner.get_lock()
    logger.debug('pid = {} waiting for lock at  {}'.format(pid, time.time()))
    with lock:
        # increment counter field
        with sqlite3.connect('./test_tasks.db', timeout=10) as conn:
            cursor = conn.cursor()
            counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
            logger.debug('counter = {}'.format(counter))
            counter += 1
            cursor.execute("""UPDATE tasks SET counter=?""", (counter,))
            conn.commit()

def after_tasks(task_runner):
    conn = sqlite3.connect('./test_tasks.db')
    cursor = conn.cursor()
    counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
    task_runner.logger.info('counter = {} <- final value'.format(counter))

def result_ok(task_runner, result):
    pass

def after_result(task_runner):
    pass

def main():
    tr = TaskRunner(
        # functions
        func_before_tasks=before_tasks,
        func_task=task,
        func_after_tasks=after_tasks,
        #func_result_ok=result_ok,
        func_after_result=after_result,
        # logging
        logger_level=logging.INFO,
        # locking
        use_file_locking=True,
    )
    tr.run(
        loop_count=1,
        parallel_tasks_count=100,
        #parallel_tasks_count=2,
    )

if __name__ == '__main__':
    main()

Resumen

Queríamos una manera fácil de probar las operaciones concurrentes. En el pasado usé el paquete Python 'Locust' para probar concurrencia, ver el post 'Using Locust to load test a FastAPI app with concurrent users'. Esta vez quería mantenerlo pequeño, flexible y extensible.
Además de eso, también quería un gestor de contexto de bloqueo de archivos de múltiples procesos. Implementamos ambos, las pruebas pasaron. Es hora de volver a mis otros proyectos.

Enlaces / créditos

Python - fcntl
https://docs.python.org/3/library/fcntl.html

Python - multiprocessing
https://docs.python.org/3/library/multiprocessing.html

Python - SQLite3
https://docs.python.org/3/library/sqlite3.html

Using Locust to load test a FastAPI app with concurrent users
https://www.peterspython.com/en/blog/using-locust-to-load-test-a-fastapi-app-with-concurrent-users

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.