angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Multiprocessing, Dateisperren, SQLite und Prüfung

Das Testen auf Gleichzeitigkeitsprobleme ist schwieriger und nimmt mehr Zeit in Anspruch, aber man kann nicht darauf verzichten.

30 März 2023
post main image

Ich habe an einem Projekt mit SQLAlchemy und PostgreSQL gearbeitet. Für einige Tabellen wollte ich die Anzahl der Zeilen pro user begrenzen und habe dies durch Hinzufügen einer PostgreSQL -Prüffunktion und eines Triggers erreicht.

Beim manuellen Testen schien alles gut zu funktionieren, aber was, wenn ein user mehrere Prozesse starten und Zeilen genau zur gleichen Zeit hinzufügen würde? Ich habe das "pg_advisory_xact_lock" hinzugefügt, aber wird das wirklich funktionieren? Habe ich die Dokumentation richtig verstanden?

In diesem Beitrag zeige ich eine universelle TaskRunner-Klasse, die zum Testen von gleichzeitigen (concurrent) Aktionen verwendet werden kann. Als Testfall verwenden wir eine SQLite Datenbank, die wir mit separaten Prozessen schreiben.

Wir starten alle Prozesse von einem einzigen Prozess aus. In diesem Fall können wir Multiprocessing.Lock() verwenden, um den Zugriff auf SQLite zu kontrollieren. Ich habe aber auch einen File Locker implementiert, der verwendet werden kann, wenn wir völlig unabhängige Prozesse haben.

Wie immer habe ich dies auf Ubuntu 22.04 ausgeführt.

Gleichzeitiger Start von Aktionen

In unserem Testaufbau verwenden wir Multiprocessing.Event(), um alle Prozesse an der gleichen Zeile im Taskcode warten zu lassen, eine Zeile vor der "kritischen Aktion". Wenn alle Prozesse diesen Punkt erreicht haben, geben wir die Prozesse "frei" und sehen, was passiert.

                         stop & release
                              |
                              v

task1  |--------------------->|-------->
task2      |----------------->|-------->
task3        |--------------->|-------->
                              |
taskN               |-------->|-------->

        --------------------------------------> t

In der TaskRunner-Klasse:

class TaskRunner:
    ...
    def run_parallel_tasks(self, parallel_tasks_count):
        ...
        self.mp_event = multiprocessing.Event()
        ...
        for task_no in range(parallel_tasks_count):
            p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
            ...

        # release waiting all processes
        time.sleep(self.release_time)
        self.mp_event.set()
        ...

In unserer Task-Funktion:

def task(task_runner, task_no):
    ...
    # all tasks will wait here 
    task_runner.mp_event.wait()

    # critical action
    ...

Inkrementieren eines Tabellenfeldes SQLite

In unserem Test versuchen die Tasks (Prozesse) gleichzeitig, ein SQLite -Tabellenfeld, 'counter',
, zu inkrementieren, indem sie

  • den Wert des Feldes zu lesen
  • Inkrementieren
  • das Feld zu aktualisieren

Wenn wir 100 Aufgaben haben, dann muss das Ergebnis im Tabellenfeld 100 sein. Jeder andere Wert ist falsch.

Sperren von

Eine Aufgabe kann die Inkrementierungsoperation nicht zuverlässig durchführen, ohne exklusiven Zugriff auf SQLite zu erhalten. Hier verwenden wir eine Sperre außerhalb von SQLite.

Wir können distdas Folgende unterscheiden:

  1. Die (konkurrierenden) Aufgaben werden von einem einzigen Prozess gestartet
  2. Die (konkurrierenden) Aufgaben sind unabhängig

Im ersten Fall können wir Multiprocessing.Lock() verwenden und diese Sperre für alle unsere Aufgaben freigeben. Für Testzwecke ist das in Ordnung.

Der zweite Fall ist eher ein Szenario aus der Praxis. Hier können wir Multiprocessing.Lock() nicht verwenden, aber wir können Linux file locking verwenden. Dies ist schnell und zuverlässig.

Sperren - Multiprocessing.Lock()

Ich möchte Multiprocessing.Lock() als Kontextmanager verwenden. Leider können wir dann keinen Timeout angeben. Das heißt, wir müssen den Kontextmanager selbst schreiben:

# multiprocessing locker context manager with timeout
class mp_locker:
    def __init__(
        self,
        mp_lock=None,
        timeout=10,
    ):
        self.mp_lock = mp_lock
        self.timeout = timeout

    def __enter__(self):
        self.mp_lock.acquire(timeout=self.timeout)

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.mp_lock.release()

Sperren - Dateisperren

Es gibt viele Beispiele im Internet, wie man das machen kann. Auch hier möchte ich dies als Kontextmanager verwenden. Hier zeige ich nur die '__enter__()' Methode.

# file locker context manager
    ...
    def __enter__(self):
        while True:
            if (time.time() - ts) > self.timeout:
                raise Exception('pid = {}: acquire lock timeout')
            try:
                self.lock_file_fo = open(self.lock_file, 'a')
                fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
                break
            except BlockingIOError as e:
                # another process locked the file, keep trying
                time.sleep(self.wait_secs)
            # propagate other exceptions

Wir bleiben in der 'while-Schleife', bis wir die Sperre erhalten oder ein Timeout auftritt.

Die TaskRunner-Klasse

Der TaskRunner enthält die gesamte Logik zum Starten mehrerer Tasks (Prozesse).

Funktionen:

  • before_tasks()
  • task()
  • after_tasks()
  • result_ok()
  • after_result()

Optionen:

  • Anzahl der gleichzeitigen Aufgaben.
  • Anzahl der Wiederholungen.
  • Freigabezeit der wartenden Aufgaben (nach dem Start).
  • Protokollierungsebene.
  • Multiprocessing.Lock() Sperren, oder Dateisperren
  • Zeitüberschreitung beim Sperren.

Wichtig: Alle Ihre Funktionen werden mit dem TaskRunner-Objekt als erstem Parameter aufgerufen. Das bedeutet, dass Sie Zugriff auf TaskRunner-Attribute und -Methoden haben wie:

  • get_lock()
  • get_logger()

Der Code

Der Code besteht aus den folgenden Teilen:

  • TaskRunner-Klasse und unterstützende Klassen
  • Ihre Task-Funktionen
  • TaskRunner-Initiation mit Ihren Parametern

Wenn Sie den Code ausführen, sieht die Ausgabe ungefähr so aus:

INFO     counter = 100 <- final value
INFO     ready in 2.0454471111297607 seconds

Hier ist der Code, falls Sie es selbst versuchen wollen:

import fcntl
import logging
import multiprocessing
import os
import sys
import time

import sqlite3


class DummyLogger:
    def __getattr__(self, name):
        return lambda *args, **kwargs: None

# file locker context manager
class f_locker:
    def __init__(
        self,
        lock_file=None,
        timeout=10,
        logger=DummyLogger(),
        wait_secs=.01,
    ):
        self.lock_file = lock_file
        self.timeout = timeout
        self.logger = logger
        self.wait_secs = wait_secs
        # keep lock_file opened
        self.lock_file_fo = None

    def __enter__(self):
        pid = os.getpid()
        ts = time.time()
        while True:
            self.logger.debug('pid = {}: trying to acquire lock ...'.format(pid))
            if (time.time() - ts) > self.timeout:
                raise Exception('pid = {}: acquire lock timeout')
            # keep trying until lock or timeout
            try:
                self.lock_file_fo = open(self.lock_file, 'a')
                fcntl.flock(self.lock_file_fo, fcntl.LOCK_EX | fcntl.LOCK_NB)
                self.logger.debug('pid = {}: lock acquired'.format(pid))
                break
            except BlockingIOError as e:
                # another process locked the file, keep trying
                self.logger.debug('pid = {}: cannot acquire lock'.format(pid))
                time.sleep(self.wait_secs)
            # propagate other exceptions
        return True

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
        pid = os.getpid()
        self.logger.debug('pid = {}: trying to release lock ...'.format(pid))
        fcntl.flock(self.lock_file_fo, fcntl.LOCK_UN)
        self.logger.debug('pid = {}: lock released ...'.format(pid))


# multiprocessing locker context manager with timeout
class mp_locker:
    def __init__(
        self,
        mp_lock=None,
        timeout=10,
        logger=DummyLogger(),
    ):
        self.mp_lock = mp_lock
        self.timeout = timeout
        self.logger = logger

    def __enter__(self):
        self.pid = os.getpid()
        self.logger.debug('pid = {}: trying to acquire lock ...'.format(self.pid))
        self.mp_lock.acquire(timeout=self.timeout)
        self.logger.debug('pid = {}: lock acquired'.format(self.pid))

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.logger.debug('exc_type = {}, exc_value = {}, exc_tb = {}'.format(exc_type, exc_value, exc_tb))
        self.logger.debug('pid = {}: trying to release lock ...'.format(self.pid))
        self.mp_lock.release()
        self.logger.debug('pid = {}: lock released ...'.format(self.pid))


class TaskRunner:
    def __init__(
        self,
        loop_count=1,
        parallel_tasks_count=1,
        release_time=1.,
        # functions
        func_before_tasks=None,
        func_task=None,
        func_after_tasks=None,
        func_result_ok=None,
        func_after_result=None,
        # logging
        logger_level=logging.DEBUG,
        # locking
        lock_timeout=10,
        use_file_locking=False,
        lock_file='./lock_file',
        lock_wait_secs=.01,
    ):
        self.loop_count = loop_count
        self.parallel_tasks_count = parallel_tasks_count
        self.release_time = release_time
        # functions
        self.func_before_tasks = func_before_tasks
        self.func_task = func_task
        self.func_after_tasks = func_after_tasks
        self.func_result_ok = func_result_ok
        self.func_after_result = func_after_result
        # logging
        self.logger_level = logger_level
        # locking
        self.lock_timeout = lock_timeout
        self.use_file_locking = use_file_locking
        self.lock_file = lock_file
        self.lock_wait_secs = lock_wait_secs

    def get_logger(self, proc_name, logger_level=None):
        if logger_level is None:
            logger_level = self.logger_level
        logger = logging.getLogger(proc_name)
        logger.setLevel(logging.DEBUG)
        console_handler = logging.StreamHandler()
        console_logger_format = '%(asctime)s %(proc_name)-8.8s %(levelname)-8.8s [%(filename)-20s%(funcName)20s():%(lineno)03s] %(message)s'
        console_handler.setFormatter(logging.Formatter(console_logger_format))
        logger.setLevel(logger_level)
        logger.addHandler(console_handler)
        logger = logging.LoggerAdapter(logger, {'proc_name': proc_name})
        return logger

    def get_lock(self, timeout=None):
        timeout = timeout or self.lock_timeout
        if not self.use_file_locking:
            return mp_locker(self.mp_lock, timeout=timeout, logger=self.logger)
        return f_locker(self.lock_file, timeout=timeout, wait_secs=self.lock_wait_secs)

    def run_parallel_tasks(self, parallel_tasks_count):
        # before tasks
        if self.func_before_tasks:
            self.func_before_tasks(self)

        self.mp_lock = multiprocessing.Lock()
        self.mp_event = multiprocessing.Event()
        tasks = []
        for task_no in range(parallel_tasks_count):
            p = multiprocessing.Process(target=self.func_task, args=(self, task_no))
            p.start()
            tasks.append(p)

        # release waiting processes
        time.sleep(self.release_time)
        self.mp_event.set()

        # wait for all tasks to complete
        for p in tasks:
            p.join()

        # after tasks
        if self.func_after_tasks:
            return self.func_after_tasks(self)
        return None

    def run(
        self, 
        loop_count=None,
        parallel_tasks_count=None,
    ):
        self.logger = self.get_logger('main')
        if loop_count is not None:
            self.loop_count = loop_count
        if parallel_tasks_count is not None:
            self.parallel_tasks_count = parallel_tasks_count
    
        start_time = time.time()
        for loop_no in range(self.loop_count):
            self.logger.debug('loop_no = {}'.format(loop_no))

            result = self.run_parallel_tasks(self.parallel_tasks_count)
            if self.func_result_ok:
                if not self.func_result_ok(self, result):
                    self.logger.error('result = {}'.format(result))
                    break
                else:
                    self.logger.info('result ok')
                        
        if self.func_after_result:
            self.func_after_result(self)

        run_secs = time.time() - start_time
        self.logger.info('ready in {} seconds'.format(run_secs))

# ### YOUR CODE BELOW ### #

def before_tasks(task_runner):
    # create a table, insert row with counter = 0
    with sqlite3.connect('./test_tasks.db') as conn:
        cursor = conn.cursor()
        cursor.execute("""DROP TABLE IF EXISTS tasks""")
        cursor.execute("""CREATE TABLE tasks (counter INTEGER)""")
        cursor.execute("""INSERT INTO tasks (counter) VALUES (0)""")
        conn.commit()

def task(task_runner, task_no):
    logger = task_runner.get_logger('task' + str(task_no))
    pid = os.getpid()

    # wait for event
    logger.debug('pid = {} waiting for event at {}'.format(pid, time.time()))
    task_runner.mp_event.wait()

    # wait for lock
    lock = task_runner.get_lock()
    logger.debug('pid = {} waiting for lock at  {}'.format(pid, time.time()))
    with lock:
        # increment counter field
        with sqlite3.connect('./test_tasks.db', timeout=10) as conn:
            cursor = conn.cursor()
            counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
            logger.debug('counter = {}'.format(counter))
            counter += 1
            cursor.execute("""UPDATE tasks SET counter=?""", (counter,))
            conn.commit()

def after_tasks(task_runner):
    conn = sqlite3.connect('./test_tasks.db')
    cursor = conn.cursor()
    counter = cursor.execute('SELECT counter FROM tasks').fetchone()[0]
    task_runner.logger.info('counter = {} <- final value'.format(counter))

def result_ok(task_runner, result):
    pass

def after_result(task_runner):
    pass

def main():
    tr = TaskRunner(
        # functions
        func_before_tasks=before_tasks,
        func_task=task,
        func_after_tasks=after_tasks,
        #func_result_ok=result_ok,
        func_after_result=after_result,
        # logging
        logger_level=logging.INFO,
        # locking
        use_file_locking=True,
    )
    tr.run(
        loop_count=1,
        parallel_tasks_count=100,
        #parallel_tasks_count=2,
    )

if __name__ == '__main__':
    main()

Zusammenfassung

Wir wollten einen einfachen Weg, um gleichzeitige Operationen zu testen. In der Vergangenheit habe ich das Python -Paket 'Locust' verwendet, um Gleichzeitigkeit zu testen, siehe den Beitrag 'Using Locust to load test a FastAPI app with concurrent users'. Diesmal wollte ich es klein, flexibel und erweiterbar halten.
Außerdem wollte ich auch einen Dateisperrkontextmanager für mehrere Prozesse. Wir haben beides implementiert, die Tests haben bestanden. Zeit, sich wieder meinen anderen Projekten zuzuwenden.

Links / Impressum

Python - fcntl
https://docs.python.org/3/library/fcntl.html

Python - multiprocessing
https://docs.python.org/3/library/multiprocessing.html

Python - SQLite3
https://docs.python.org/3/library/sqlite3.html

Using Locust to load test a FastAPI app with concurrent users
https://www.peterspython.com/en/blog/using-locust-to-load-test-a-fastapi-app-with-concurrent-users

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.