angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

SQLAlchemy, PostgreSQL, maximale Anzahl von Zeilen pro user

Verwenden Sie eine PostgreSQL function und eine PostgreSQL trigger , um die maximale Anzahl der Zeilen pro user zu begrenzen.

5 Februar 2024 Aktualisiert 7 Februar 2024
post main image
https://pixabay.com/users/engin_akyurt-3656355

Sie haben eine Multi-user -Anwendung mit SQLAlchemy und PostgreSQL und möchten die Anzahl der Zeilen pro user einer bestimmten Tabelle begrenzen. Zum Beispiel kann jede user maximal fünf Einträge haben.

Sie benötigen eine Operation wie:

  • Sperren Sie die Tabelle
  • Zählen Sie die Anzahl der Beiträge der user
  • Wenn die Anzahl kleiner als fünf ist:
    • Neuen Beitrag hinzufügen
    • Entsperren Sie die Tabelle
  • Else:
    • Entsperre die Tabelle
    • Ausnahme generieren

Wir können dies nur mit SQLAlchemy tun, aber das ist eine Menge Code und nicht sehr schnell.

Ein besserer Weg ist es, dies mit zu implementieren:

  • A PostgreSQL function, und
  • A PostgreSQL trigger

Der PostgreSQL function wird zur Durchführung der Operation verwendet, und der PostgreSQL trigger ruft den PostgreSQL function bei einer INSERT -Operation auf.

Die PostgreSQL function und PostgreSQL trigger befinden sich auf dem Datenbankserver. Wir laden sie einmal mit SQLAlchemy auf den Datenbankserver hoch. Das bedeutet, dass die Abfragen des PostgreSQL function auf dem Datenbankserver laufen.

Wie immer mache ich das auf Ubuntu 22.04.

Der Code

Ich werde hier nicht viel erklären, ich habe ein funktionierendes Beispiel erstellt, es ist alles im Code kommentiert, siehe unten.

In dem Beispiel haben wir Bürger und Autos. Ein Bürger kann eine maximale Anzahl von zwei Autos haben. Diese Anzahl wird in einer anderen (Konfigurations-)Tabelle gespeichert.

Zuerst fügen wir einem Bürger zwei Autos hinzu. Wenn wir dann versuchen, ein drittes Auto hinzuzufügen, wird eine Ausnahme erzeugt. Beachten Sie, dass diese Ausnahme eine PostgreSQL -Ausnahme ist. In SQLAlchemy ist sie in 'e.orig' verfügbar.

...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    e_orig = getattr(e.orig, None)
    if e_orig is not None:
        # postgresql exception

Sie entscheiden, welche Informationen in der Ausnahme enthalten sind. Hier sieht die Ausnahme wie folgt aus:

CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4

Um das Testen zu vereinfachen, werden die Tabellen und die PostgreSQL function und PostgreSQL trigger bei jedem Lauf erstellt.

Um das Beispiel auszuführen, erstellen Sie eine Datenbank PostgreSQL . Erstellen Sie dann eine virtual environment und installieren Sie das Folgende:

pip install SQLAlchemy
pip install psycopg2-binary

Hier ist der Code, vergessen Sie nicht, die Datenbankparameter hinzuzufügen:

# max_num_rows.py
import logging
import sys
import uuid

from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg

# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)

# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''

connection_uri = ''.join([
    'postgresql+psycopg2://',
    DB_USER + ':' + DB_PASSWORD,
    '@',
    DB_HOST + ':' + str(DB_PORT),
    '/',
    DB_NAME,
])

engine = sa.create_engine(connection_uri)

# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
  max_rows INTEGER;
  row_count INTEGER;
  lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
  -- get advisory lock to serialize access to the check
  PERFORM pg_advisory_xact_lock(hashtext(lock_key));

  -- get the maximum number of cars allowed per citizen
  SELECT COALESCE(
    (
    SELECT max_cars
    FROM config
    ),
    0) INTO max_rows;

  -- get current number of cars for this citizen
  SELECT COUNT(*) INTO row_count 
  FROM car
  WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;

  -- we cannot insert if max_cars is reached
  IF row_count >= max_rows THEN
    -- release lock before raising an exception
    PERFORM pg_advisory_unlock(hashtext(lock_key));
    RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
  END IF;

  -- release lock
  PERFORM pg_advisory_unlock(hashtext(lock_key));
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""

# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""

# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""

# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""


# define tables
Base = declarative_base()

class CommonFields(object):
    # Base - start 
    id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
    created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
    deleted_on = sa.Column(sa.DateTime)

class Config(CommonFields, Base):
    __tablename__ = 'config'

    max_cars = sa.Column(sa.Integer)

    def __repr__(self):
        return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'

class Citizen(CommonFields, Base):
    __tablename__ = 'citizen'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: cars
    cars = relationship(
        'Car',
        back_populates='citizen',
    )

    def __repr__(self):
        return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'

class Car(CommonFields, Base):
    __tablename__ = 'car'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: citizen
    citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
    citizen = relationship(
        'Citizen',
        back_populates='cars',
    )

    def __repr__(self):
        return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'

print(f'create tables')
must_create = True
if must_create:
    print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
    print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
    sys.exit() # remove this line to continue
    Base.metadata.drop_all(engine, checkfirst=True)
    Base.metadata.create_all(engine)

print('create session')
Session = sessionmaker(bind=engine)
db = Session()

def run_sql(sql):
    stmt = sa.text(sql)
    try:
        db.execute(stmt)
    except Exception as e:
        exception = type(e).__name__
        print(f'run_sql exception = {exception}, e.args = {e.args}')
        sys.exit()

if must_create:
    print('drop function & trigger')
    run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()
    print('create function & trigger')
    run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()

print('add config ...')
config = Config(max_cars=2)

print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')

db.add(config)
db.add_all([john, jane, bob, alice])

print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')

# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]

db.commit()
db.flush()

print('show citizens and cars ...')
stmt = sa.select(Citizen).\
    order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()

for i, citizen in enumerate(citizens):
    print(f'citizen[{i}] {citizen}')

print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('remove a car from john ...')
print('##################################################################')

db.delete(johns_toyota)
db.commit()

print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')

stmt = sa.select(sa.func.count(Car.id)).\
    where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')

print('ready')

Auflisten von Funktionen und Triggern

In pgAdmin ist das ganz einfach. Navigieren Sie zu:

Schemas -> Tables -> car -> Triggers

Von hier aus können Sie auch den Quellcode einsehen.

Sie können auch Abfragen verwenden. Um eine Liste aller Funktionen zu erhalten, führen Sie die Abfrage aus:

SELECT proname, prosrc 
FROM pg_proc;

Oder, um nur Ihre eigenen Funktionen zu sehen, die das Wort "check" enthalten:

SELECT proname, prosrc 
FROM pg_proc
WHERE 
proname LIKE '%check%'

Um eine Liste aller Auslöser zu erhalten, führen Sie die Abfrage aus:

SELECT tgname FROM pg_trigger;

Zusammenfassung

Die Verwendung von PostgreSQL functions und Triggern ist nicht besonders schwierig, aber man muss sich wirklich etwas Zeit nehmen, um ein Testskript zu erstellen, mit dem man leicht spielen kann. Die Ausnahme PostgreSQL lässt sich leicht aus der Ausnahme Python extrahieren. Schließlich ist SQLAlchemy ein (meistens) großartiges Werkzeug. Viel Spaß damit!

Links / Impressum

Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql

PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.