angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

SQLAlchemy, PostgreSQL, maximum aantal rijen per user

Gebruik een PostgreSQL function en een PostgreSQL trigger om het maximum aantal rijen per user te beperken.

5 februari 2024 Bijgewerkt 7 februari 2024
post main image
https://pixabay.com/users/engin_akyurt-3656355

Je hebt een multi-user applicatie die SQLAlchemy en PostgreSQL gebruikt en je wilt het aantal rijen per user van een bepaalde tabel beperken. Bijvoorbeeld, elke user kan maximaal vijf rijen hebben.

Je hebt een bewerking nodig zoals:

  • Vergrendel de tabel
  • Tel het aantal berichten van de user
  • Als het aantal minder dan vijf is:
    • Nieuwe post toevoegen
    • De tabel ontgrendelen
  • Else:
    • Ontgrendel de tabel
    • Uitzondering genereren

We kunnen dit alleen doen met SQLAlchemy , maar dit is veel code en niet erg snel.

Een betere manier is om dit te implementeren met:

  • Een PostgreSQL function, en
  • EEN PostgreSQL trigger

De PostgreSQL function wordt gebruikt om de operatie uit te voeren, en de PostgreSQL trigger roept de PostgreSQL function aan op een INSERT operatie.

De PostgreSQL function en PostgreSQL trigger staan op de databaseserver. We uploaden ze eenmalig naar de databaseserver met SQLAlchemy. Dit betekent dat de query's van de PostgreSQL function op de databaseserver draaien.

Zoals altijd doe ik dit op Ubuntu 22.04.

De code

Ik ga hier niet veel uitleggen, ik heb een werkend voorbeeld gemaakt, het is allemaal becommentarieerd in de code, zie hieronder.

In het voorbeeld hebben we burgers en auto's. Een burger kan maximaal twee auto's hebben. Dit aantal is opgeslagen in een andere (configuarion) tabel.

Eerst voegen we twee auto's toe aan een burger. Wanneer we vervolgens een derde auto proberen toe te voegen, wordt er een uitzondering gegenereerd. Merk op dat deze uitzondering een PostgreSQL uitzondering is. In SQLAlchemy is deze beschikbaar in 'e.orig'.

...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    e_orig = getattr(e.orig, None)
    if e_orig is not None:
        # postgresql exception

Jij bepaalt welke informatie in de uitzondering staat. Hier ziet de uitzondering er zo uit:

CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4

Om het testen gemakkelijk te maken, worden de tabellen en de PostgreSQL function en PostgreSQL trigger bij elke run aangemaakt.

Om het voorbeeld uit te voeren, maak je een PostgreSQL database. Maak vervolgens een virtual environment en installeer het volgende:

pip install SQLAlchemy
pip install psycopg2-binary

Hier is de code, vergeet niet de databaseparameters toe te voegen:

# max_num_rows.py
import logging
import sys
import uuid

from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg

# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)

# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''

connection_uri = ''.join([
    'postgresql+psycopg2://',
    DB_USER + ':' + DB_PASSWORD,
    '@',
    DB_HOST + ':' + str(DB_PORT),
    '/',
    DB_NAME,
])

engine = sa.create_engine(connection_uri)

# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
  max_rows INTEGER;
  row_count INTEGER;
  lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
  -- get advisory lock to serialize access to the check
  PERFORM pg_advisory_xact_lock(hashtext(lock_key));

  -- get the maximum number of cars allowed per citizen
  SELECT COALESCE(
    (
    SELECT max_cars
    FROM config
    ),
    0) INTO max_rows;

  -- get current number of cars for this citizen
  SELECT COUNT(*) INTO row_count 
  FROM car
  WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;

  -- we cannot insert if max_cars is reached
  IF row_count >= max_rows THEN
    -- release lock before raising an exception
    PERFORM pg_advisory_unlock(hashtext(lock_key));
    RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
  END IF;

  -- release lock
  PERFORM pg_advisory_unlock(hashtext(lock_key));
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""

# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""

# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""

# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""


# define tables
Base = declarative_base()

class CommonFields(object):
    # Base - start 
    id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
    created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
    deleted_on = sa.Column(sa.DateTime)

class Config(CommonFields, Base):
    __tablename__ = 'config'

    max_cars = sa.Column(sa.Integer)

    def __repr__(self):
        return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'

class Citizen(CommonFields, Base):
    __tablename__ = 'citizen'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: cars
    cars = relationship(
        'Car',
        back_populates='citizen',
    )

    def __repr__(self):
        return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'

class Car(CommonFields, Base):
    __tablename__ = 'car'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: citizen
    citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
    citizen = relationship(
        'Citizen',
        back_populates='cars',
    )

    def __repr__(self):
        return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'

print(f'create tables')
must_create = True
if must_create:
    print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
    print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
    sys.exit() # remove this line to continue
    Base.metadata.drop_all(engine, checkfirst=True)
    Base.metadata.create_all(engine)

print('create session')
Session = sessionmaker(bind=engine)
db = Session()

def run_sql(sql):
    stmt = sa.text(sql)
    try:
        db.execute(stmt)
    except Exception as e:
        exception = type(e).__name__
        print(f'run_sql exception = {exception}, e.args = {e.args}')
        sys.exit()

if must_create:
    print('drop function & trigger')
    run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()
    print('create function & trigger')
    run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()

print('add config ...')
config = Config(max_cars=2)

print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')

db.add(config)
db.add_all([john, jane, bob, alice])

print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')

# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]

db.commit()
db.flush()

print('show citizens and cars ...')
stmt = sa.select(Citizen).\
    order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()

for i, citizen in enumerate(citizens):
    print(f'citizen[{i}] {citizen}')

print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('remove a car from john ...')
print('##################################################################')

db.delete(johns_toyota)
db.commit()

print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')

stmt = sa.select(sa.func.count(Car.id)).\
    where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')

print('ready')

Listing functies en triggers

In pgAdmin is dit eenvoudig. Navigeer naar:

Schemas -> Tables -> car -> Triggers

Vanaf hier kun je ook de broncode bekijken.

Je kunt ook query's gebruiken. Voer de query uit om een lijst met alle functies te krijgen:

SELECT proname, prosrc 
FROM pg_proc;

Of, om alleen je eigen functies te zien die het woord 'check' bevatten:

SELECT proname, prosrc 
FROM pg_proc
WHERE 
proname LIKE '%check%'

Om een lijst van alle triggers te krijgen, voer je de query uit:

SELECT tgname FROM pg_trigger;

Samenvatting

Het gebruik van PostgreSQL functions en triggers is niet zo moeilijk, maar je moet echt even de tijd nemen om een testscript te maken waarmee je gemakkelijk kunt spelen. De PostgreSQL uitzondering kan eenvoudig worden gehaald uit de Python uitzondering. Tot slot is SQLAlchemy (meestal) een geweldig hulpmiddel. Veel plezier!

Links / credits

Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql

PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html

Laat een reactie achter

Reageer anoniem of log in om commentaar te geven.

Opmerkingen

Laat een antwoord achter

Antwoord anoniem of log in om te antwoorden.