angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

SQLAlchemy, PostgreSQL, nombre maximal de lignes par user

Utilisez un PostgreSQL function et un PostgreSQL trigger pour limiter le nombre maximal de lignes par user.

5 février 2024 Mise à jour 7 février 2024
post main image
https://pixabay.com/users/engin_akyurt-3656355

Vous avez une application multi-user utilisant SQLAlchemy et PostgreSQL et vous voulez limiter le nombre de lignes par user d'une certaine table. Par exemple, chaque user peut avoir un maximum de cinq messages.

Vous avez besoin d'une opération comme :

  • Verrouiller la table
  • Compter le nombre de messages du user
  • Si le nombre est inférieur à cinq :
    • Ajouter un nouveau message
    • Déverrouiller le tableau
  • Else :
    • Déverrouiller le tableau
    • Générer une exception

Nous pouvons faire cela en utilisant uniquement SQLAlchemy , mais cela représente beaucoup de code et n'est pas très rapide.

Une meilleure solution consiste à utiliser SQLAlchemy :

  • un PostgreSQL function, et
  • UN PostgreSQL trigger

Le PostgreSQL function est utilisé pour effectuer l'opération, et le PostgreSQL trigger appelle le PostgreSQL function sur une opération INSERT .

Les PostgreSQL function et PostgreSQL trigger se trouvent sur le serveur de base de données. Nous les téléchargeons sur le serveur de base de données une fois en utilisant SQLAlchemy. Cela signifie que les requêtes de PostgreSQL function s'exécutent sur le serveur de base de données.

Comme toujours, je fais cela sur Ubuntu 22.04.

Le code

Je ne vais pas expliquer grand chose ici, j'ai créé un exemple qui fonctionne, tout est commenté dans le code, voir ci-dessous.

Dans l'exemple, nous avons des citoyens et des voitures. Un citoyen peut avoir un nombre maximum de deux voitures. Ce nombre est stocké dans une autre table (de configuration).

Tout d'abord, nous ajoutons deux voitures à un citoyen. Ensuite, lorsque nous essayons d'ajouter une troisième voiture, une exception est générée. Notez que cette exception est une exception PostgreSQL . Dans SQLAlchemy, elle est disponible dans 'e.orig'.

...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    e_orig = getattr(e.orig, None)
    if e_orig is not None:
        # postgresql exception

C'est vous qui décidez quelles informations doivent figurer dans l'exception. Ici, l'exception se présente comme suit :

CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4

Pour faciliter les tests, les tables et les PostgreSQL function et PostgreSQL trigger sont créées à chaque exécution.

Pour exécuter l'exemple, créez une base de données PostgreSQL . Ensuite, créez une base virtual environment et installez ce qui suit :

pip install SQLAlchemy
pip install psycopg2-binary

Voici le code, n'oubliez pas d'ajouter les paramètres de la base de données :

# max_num_rows.py
import logging
import sys
import uuid

from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg

# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)

# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''

connection_uri = ''.join([
    'postgresql+psycopg2://',
    DB_USER + ':' + DB_PASSWORD,
    '@',
    DB_HOST + ':' + str(DB_PORT),
    '/',
    DB_NAME,
])

engine = sa.create_engine(connection_uri)

# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
  max_rows INTEGER;
  row_count INTEGER;
  lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
  -- get advisory lock to serialize access to the check
  PERFORM pg_advisory_xact_lock(hashtext(lock_key));

  -- get the maximum number of cars allowed per citizen
  SELECT COALESCE(
    (
    SELECT max_cars
    FROM config
    ),
    0) INTO max_rows;

  -- get current number of cars for this citizen
  SELECT COUNT(*) INTO row_count 
  FROM car
  WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;

  -- we cannot insert if max_cars is reached
  IF row_count >= max_rows THEN
    -- release lock before raising an exception
    PERFORM pg_advisory_unlock(hashtext(lock_key));
    RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
  END IF;

  -- release lock
  PERFORM pg_advisory_unlock(hashtext(lock_key));
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""

# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""

# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""

# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""


# define tables
Base = declarative_base()

class CommonFields(object):
    # Base - start 
    id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
    created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
    deleted_on = sa.Column(sa.DateTime)

class Config(CommonFields, Base):
    __tablename__ = 'config'

    max_cars = sa.Column(sa.Integer)

    def __repr__(self):
        return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'

class Citizen(CommonFields, Base):
    __tablename__ = 'citizen'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: cars
    cars = relationship(
        'Car',
        back_populates='citizen',
    )

    def __repr__(self):
        return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'

class Car(CommonFields, Base):
    __tablename__ = 'car'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: citizen
    citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
    citizen = relationship(
        'Citizen',
        back_populates='cars',
    )

    def __repr__(self):
        return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'

print(f'create tables')
must_create = True
if must_create:
    print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
    print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
    sys.exit() # remove this line to continue
    Base.metadata.drop_all(engine, checkfirst=True)
    Base.metadata.create_all(engine)

print('create session')
Session = sessionmaker(bind=engine)
db = Session()

def run_sql(sql):
    stmt = sa.text(sql)
    try:
        db.execute(stmt)
    except Exception as e:
        exception = type(e).__name__
        print(f'run_sql exception = {exception}, e.args = {e.args}')
        sys.exit()

if must_create:
    print('drop function & trigger')
    run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()
    print('create function & trigger')
    run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()

print('add config ...')
config = Config(max_cars=2)

print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')

db.add(config)
db.add_all([john, jane, bob, alice])

print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')

# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]

db.commit()
db.flush()

print('show citizens and cars ...')
stmt = sa.select(Citizen).\
    order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()

for i, citizen in enumerate(citizens):
    print(f'citizen[{i}] {citizen}')

print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('remove a car from john ...')
print('##################################################################')

db.delete(johns_toyota)
db.commit()

print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')

stmt = sa.select(sa.func.count(Car.id)).\
    where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')

print('ready')

Lister les fonctions et les déclencheurs

Dans pgAdmin , c'est facile. Naviguez jusqu'à :

Schemas -> Tables -> car -> Triggers

À partir d'ici, vous pouvez également consulter le code source.

Vous pouvez également utiliser des requêtes. Pour obtenir une liste de toutes les fonctions, exécutez la requête :

SELECT proname, prosrc 
FROM pg_proc;

Ou, pour ne voir que vos propres fonctions contenant le mot "check" :

SELECT proname, prosrc 
FROM pg_proc
WHERE 
proname LIKE '%check%'

Pour obtenir une liste de tous les déclencheurs, lancez la requête :

SELECT tgname FROM pg_trigger;

Résumé

L'utilisation des PostgreSQL function et des déclencheurs n'est pas très difficile, mais vous devez vraiment prendre le temps de créer un script de test qui vous permette de jouer facilement avec. L'exception PostgreSQL peut être facilement extraite de l'exception Python . Enfin, SQLAlchemy est (la plupart du temps) un excellent outil. Nous vous souhaitons beaucoup de plaisir !

Liens / crédits

Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql

PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html

En savoir plus...

PostgreSQL SQLAlchemy

Laissez un commentaire

Commentez anonymement ou connectez-vous pour commenter.

Commentaires

Laissez une réponse

Répondez de manière anonyme ou connectez-vous pour répondre.