angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Multiprocessing, verrouillage des fichiers, SQLite et tests

Il est plus difficile et plus long de tester les problèmes de concurrence, mais on ne peut s'en passer.

30 mars 2023
post main image

Je travaillais sur un projet avec SQLAlchemy et PostgreSQL. Pour quelques tables, je voulais limiter le nombre de lignes par user, et je l'ai fait en ajoutant une fonction de contrôle et un déclencheur PostgreSQL .

Lors des tests manuels, tout semblait fonctionner correctement, mais que se passerait-il si un user démarrait plusieurs processus et ajoutait des lignes exactement au même moment ? J'ai ajouté le 'pg_advisory_xact_lock' mais cela fonctionnera-t-il vraiment ? Ai-je vraiment compris la documentation ?

Dans ce billet, je montre une classe universelle TaskRunner qui peut être utilisée pour tester des actions simultanées (concurrentes). Comme cas de test, nous utilisons une base de données SQLite que nous écrivons avec des processus séparés.

Nous démarrons tous les processus à partir d'un processus unique. Dans ce cas, nous pouvons utiliser Multiprocessing.Lock() pour contrôler l'accès à SQLite. Mais j'ai également implémenté un casier de fichiers qui peut être utilisé lorsque nous avons des processus totalement indépendants.

Comme toujours, j'utilise Ubuntu 22.04.

Démarrer des actions en même temps

Dans notre configuration de test, nous utilisons Multiprocessing.Event() pour faire attendre tous les processus à la même ligne dans le code de la tâche, une ligne avant l'"action critique". Ensuite, lorsque tous les processus ont atteint ce point, nous les "libérons" et voyons ce qui se passe.

                         stop & release
                              |
                              v

task1  |--------------------->|-------->
task2      |----------------->|-------->
task3        |--------------->|-------->
                              |
taskN               |-------->|-------->

        --------------------------------------> t

Dans la classe TaskRunner :

class TaskRunner:
    ...
    def run_parallel_tasks(self, parallel_tasks_count):
        ...
        self.mp_event = multiprocessing.Event()
        ...
        for task_no in range(parallel_tasks_count):
            p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
            ...

        # release waiting all processes
        time.sleep(self.release_time)
        self.mp_event.set()
        ...

Dans notre fonction de tâche :

def task(task_runner, task_no):
    ...
    # all tasks will wait here 
    task_runner.mp_event.wait()

    # critical action
    ...

Incrémentation d'un champ de la table SQLite

Dans notre test, les tâches (processus) essaient simultanément d'incrémenter un champ de la table SQLite , 'counter',
en :

  • en lisant la valeur du champ
  • en l'incrémentant
  • en mettant à jour le champ

Si nous avons 100 tâches, le résultat dans le champ de la table doit être 100. Toute autre valeur est erronée.

Verrouillage

Une tâche ne peut pas effectuer l'opération d'incrémentation de manière fiable sans obtenir un accès exclusif à SQLite. Ici, nous utilisons un verrou externe à SQLite.

Nous pouvons dist distinguer ce qui suit :

  1. les tâches (concurrentes) sont lancées par un seul processus
  2. Les tâches (concurrentes) sont indépendantes

Dans le premier cas, nous pouvons utiliser Multiprocessing.Lock() et partager ce verrou entre toutes nos tâches. Pour les tests, c'est parfait.

Le deuxième cas est un scénario plus réaliste. Nous ne pouvons pas utiliser Multiprocessing.Lock() ici mais nous pouvons utiliser Linux file locking. Cette méthode est rapide et fiable.

Verrouillage - Multiprocessing.Lock()

Je veux utiliser Multiprocessing.Lock() comme gestionnaire de contexte. Malheureusement, il n'est pas possible de spécifier un délai d'attente. Cela signifie que nous devons écrire le gestionnaire de contexte nous-mêmes :

# multiprocessing locker context manager with timeout
class mp_locker:
    def __init__(
        self,
        mp_lock=None,
        timeout=10,
    ):
        self.mp_lock = mp_lock
        self.timeout = timeout

    def __enter__(self):
        self.mp_lock.acquire(timeout=self.timeout)

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.mp_lock.release()

Verrouillage - Verrouillage de fichiers

Il existe de nombreux exemples sur Internet sur la manière de procéder. Une fois de plus, je veux utiliser ceci comme gestionnaire de contexte. Ici, je ne montre que la méthode '__enter__()'.

# file locker context manager
    ...
    def __enter__(self):
        while True:
            if (time.time() - ts) > self.timeout:
                raise Exception('pid = {}: acquire lock timeout')
            try:
                self.lock_file_fo = open(self.lock_file, 'a')
                fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
                break
            except BlockingIOError as e:
                # another process locked the file, keep trying
                time.sleep(self.wait_secs)
            # propagate other exceptions

Nous restons dans la boucle "while" jusqu'à ce que nous obtenions le verrou ou qu'un dépassement de délai se produise.

La classe TaskRunner

La classe TaskRunner contient toute la logique nécessaire pour lancer plusieurs tâches (processus).

Fonctions :

  • before_tasks()
  • tâche()
  • after_tasks()
  • result_ok()
  • après_résultat()

Options :

  • Nombre de tâches simultanées.
  • Nombre de répétitions.
  • Délai de libération des tâches en attente (après le démarrage).
  • Niveau de journalisation.
  • Multiprocessing.Lock() verrouillage, ou verrouillage de fichier
  • Délai de verrouillage.

Important : toutes vos fonctions sont appelées avec l'objet TaskRunner comme premier paramètre. Cela signifie que vous avez accès aux attributs et aux méthodes de l'objet TaskRunner, tels que :

  • get_lock()
  • get_logger()

Le code

Le code se compose des parties suivantes :

  • la classe TaskRunner et les classes de support
  • Les fonctions de votre tâche
  • Instauration du TaskRunner avec vos paramètres

Lorsque vous exécutez le code, le résultat ressemble à quelque chose comme :

INFO     counter = 100 <- final value
INFO     ready in 2.0454471111297607 seconds

Voici le code au cas où vous voudriez l'essayer vous-même :

import fcntl
import logging
import multiprocessing
import os
import sys
import time

import sqlite3


class DummyLogger:
    def __getattr__(self, name):
        return lambda *args, **kwargs: None

# file locker context manager
class f_locker:
    def __init__(
        self,
        lock_file=None,
        timeout=10,
        logger=DummyLogger(),
        wait_secs=.01,
    ):
        self.lock_file = lock_file
        self.timeout = timeout
        self.logger = logger
        self.wait_secs = wait_secs
        # keep lock_file opened
        self.lock_file_fo = None

    def __enter__(self):
        pid = os.getpid()
        ts = time.time()
        while True:
            self.logger.debug('pid = {}: trying to acquire lock ...'.format(pid))
            if (time.time() - ts) > self.timeout:
                raise Exception('pid = {}: acquire lock timeout')
            # keep trying until lock or timeout
            try:
                self.lock_file_fo = open(self.lock_file, 'a')
                fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
                self.logger.debug('pid = {}: lock acquired'.format(pid))
                break
            except BlockingIOError as e:
                # another process locked the file, keep trying
                self.logger.debug('pid = {}: cannot acquire lock'.format(pid))
                time.sleep(self.wait_secs)
            # propagate other exceptions
        return True

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
        pid = os.getpid()
        self.logger.debug('pid = {}: trying to release lock ...'.format(pid))
        fcntl.flock(self.lock_file_fo, fcntl.LOCK_UN)
        self.logger.debug('pid = {}: lock released ...'.format(pid))


# multiprocessing locker context manager with timeout
class mp_locker:
    def __init__(
        self,
        mp_lock=None,
        timeout=10,
        logger=DummyLogger(),
    ):
        self.mp_lock = mp_lock
        self.timeout = timeout
        self.logger = logger

    def __enter__(self):
        self.pid = os.getpid()
        self.logger.debug('pid = {}: trying to acquire lock ...'.format(self.pid))
        self.mp_lock.acquire(timeout=self.timeout)
        self.logger.debug('pid = {}: lock acquired'.format(self.pid))

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
        self.logger.debug('pid = {}: trying to release lock ...'.format(self.pid))
        self.mp_lock.release()
        self.logger.debug('pid = {}: lock released ...'.format(self.pid))


class TaskRunner:
    def __init__(
        self,
        loop_count=1,
        parallel_tasks_count=1,
        release_time=1.,
        # functions
        func_before_tasks=None,
        func_task=None,
        func_after_tasks=None,
        func_result_ok=None,
        func_after_result=None,
        # logging
        logger_level=logging.DEBUG,
        # locking
        lock_timeout=10,
        use_file_locking=False,
        lock_file='./lock_file',
        lock_wait_secs=.01,
    ):
        self.loop_count = loop_count
        self.parallel_tasks_count = parallel_tasks_count
        self.release_time = release_time
        # functions
        self.func_before_tasks = func_before_tasks
        self.func_task = func_task
        self.func_after_tasks = func_after_tasks
        self.func_result_ok = func_result_ok
        self.func_after_result = func_after_result
        # logging
        self.logger_level = logger_level
        # locking
        self.lock_timeout = lock_timeout
        self.use_file_locking = use_file_locking
        self.lock_file = lock_file
        self.lock_wait_secs = lock_wait_secs

    def get_logger(self, proc_name, logger_level=None):
        if logger_level is None:
            logger_level = self.logger_level
        logger = logging.getLogger(proc_name)
        logger.setLevel(logging.DEBUG)
        console_handler = logging.StreamHandler()
        console_logger_format = '%(asctime)s %(proc_name)-8.8s %(levelname)-8.8s [%(filename)-20s%(funcName)20s():%(lineno)03s] %(message)s'
        console_handler.setFormatter(logging.Formatter(console_logger_format))
        logger.setLevel(logger_level)
        logger.addHandler(console_handler)
        logger = logging.LoggerAdapter(logger, {'proc_name': proc_name})
        return logger

    def get_lock(self, timeout=None):
        timeout = timeout or self.lock_timeout
        if not self.use_file_locking:
            return mp_locker(self.mp_lock, timeout=timeout, logger=self.logger)
        return f_locker(self.lock_file, timeout=timeout, wait_secs=self.lock_wait_secs)

    def run_parallel_tasks(self, parallel_tasks_count):
        # before tasks
        if self.func_before_tasks:
            self.func_before_tasks(self)

        self.mp_lock = multiprocessing.Lock()
        self.mp_event = multiprocessing.Event()
        tasks = []
        for task_no in range(parallel_tasks_count):
            p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
            p.start()
            tasks.append(p)

        # release waiting processes
        time.sleep(self.release_time)
        self.mp_event.set()

        # wait for all tasks to complete
        for p in tasks:
            p.join()

        # after tasks
        if self.func_after_tasks:
            return self.func_after_tasks(self)
        return None

    def run(
        self, 
        loop_count=None,
        parallel_tasks_count=None,
    ):
        self.logger = self.get_logger('main')
        if loop_count is not None:
            self.loop_count = loop_count
        if parallel_tasks_count is not None:
            self.parallel_tasks_count = parallel_tasks_count
    
        start_time = time.time()
        for loop_no in range(self.loop_count):
            self.logger.debug('loop_no = {}'.format(loop_no))

            result = self.run_parallel_tasks(self.parallel_tasks_count)
            if self.func_result_ok:
                if not self.func_result_ok(self, result):
                    self.logger.error('result = {}'.format(result))
                    break
                else:
                    self.logger.info('result ok')
                        
        if self.func_after_result:
            self.func_after_result(self)

        run_secs = time.time() - start_time
        self.logger.info('ready in {} seconds'.format(run_secs))

# ### YOUR CODE BELOW ### #

def before_tasks(task_runner):
    # create a table, insert row with counter = 0
    with sqlite3.connect('./test_tasks.db') as conn:
        cursor = conn.cursor()
        cursor.execute("""DROP TABLE IF EXISTS tasks""")
        cursor.execute("""CREATE TABLE tasks (counter INTEGER)""")
        cursor.execute("""INSERT INTO tasks (counter) VALUES (0)""")
        conn.commit()

def task(task_runner, task_no):
    logger = task_runner.get_logger('task' + str(task_no))
    pid = os.getpid()

    # wait for event
    logger.debug('pid = {} waiting for event at {}'.format(pid, time.time()))
    task_runner.mp_event.wait()

    # wait for lock
    lock = task_runner.get_lock()
    logger.debug('pid = {} waiting for lock at  {}'.format(pid, time.time()))
    with lock:
        # increment counter field
        with sqlite3.connect('./test_tasks.db', timeout=10) as conn:
            cursor = conn.cursor()
            counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
            logger.debug('counter = {}'.format(counter))
            counter += 1
            cursor.execute("""UPDATE tasks SET counter=?""", (counter,))
            conn.commit()

def after_tasks(task_runner):
    conn = sqlite3.connect('./test_tasks.db')
    cursor = conn.cursor()
    counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
    task_runner.logger.info('counter = {} <- final value'.format(counter))

def result_ok(task_runner, result):
    pass

def after_result(task_runner):
    pass

def main():
    tr = TaskRunner(
        # functions
        func_before_tasks=before_tasks,
        func_task=task,
        func_after_tasks=after_tasks,
        #func_result_ok=result_ok,
        func_after_result=after_result,
        # logging
        logger_level=logging.INFO,
        # locking
        use_file_locking=True,
    )
    tr.run(
        loop_count=1,
        parallel_tasks_count=100,
        #parallel_tasks_count=2,
    )

if __name__ == '__main__':
    main()

Résumé

Nous voulions un moyen facile de tester les opérations concurrentes. Dans le passé, j'ai utilisé le package Python 'Locust' pour tester la concurrence, voir l'article 'Using Locust to load test a FastAPI app with concurrent users'. Cette fois-ci, je voulais le garder petit, flexible et extensible.
En plus de cela, je voulais aussi un gestionnaire de contexte de verrouillage de fichiers pour plusieurs processus. Nous avons implémenté les deux, les tests ont été réussis. Il est temps de retourner à mes autres projets.

Liens / crédits

Python - fcntl
https://docs.python.org/3/library/fcntl.html

Python - multiprocessing
https://docs.python.org/3/library/multiprocessing.html

Python - SQLite3
https://docs.python.org/3/library/sqlite3.html

Using Locust to load test a FastAPI app with concurrent users
https://www.peterspython.com/en/blog/using-locust-to-load-test-a-fastapi-app-with-concurrent-users

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.