SQLAlchemy, PostgreSQL, maximum aantal rijen per user
Gebruik een PostgreSQL function en een PostgreSQL trigger om het maximum aantal rijen per user te beperken.
Je hebt een multi-user applicatie die SQLAlchemy en PostgreSQL gebruikt en je wilt het aantal rijen per user van een bepaalde tabel beperken. Bijvoorbeeld, elke user kan maximaal vijf rijen hebben.
Je hebt een bewerking nodig zoals:
- Vergrendel de tabel
- Tel het aantal berichten van de user
- Als het aantal minder dan vijf is:
- Nieuwe post toevoegen
- De tabel ontgrendelen
- Else:
- Ontgrendel de tabel
- Uitzondering genereren
We kunnen dit alleen doen met SQLAlchemy , maar dit is veel code en niet erg snel.
Een betere manier is om dit te implementeren met:
- Een PostgreSQL function, en
- EEN PostgreSQL trigger
De PostgreSQL function wordt gebruikt om de operatie uit te voeren, en de PostgreSQL trigger roept de PostgreSQL function aan op een INSERT operatie.
De PostgreSQL function en PostgreSQL trigger staan op de databaseserver. We uploaden ze eenmalig naar de databaseserver met SQLAlchemy. Dit betekent dat de query's van de PostgreSQL function op de databaseserver draaien.
Zoals altijd doe ik dit op Ubuntu 22.04.
De code
Ik ga hier niet veel uitleggen, ik heb een werkend voorbeeld gemaakt, het is allemaal becommentarieerd in de code, zie hieronder.
In het voorbeeld hebben we burgers en auto's. Een burger kan maximaal twee auto's hebben. Dit aantal is opgeslagen in een andere (configuarion) tabel.
Eerst voegen we twee auto's toe aan een burger. Wanneer we vervolgens een derde auto proberen toe te voegen, wordt er een uitzondering gegenereerd. Merk op dat deze uitzondering een PostgreSQL uitzondering is. In SQLAlchemy is deze beschikbaar in 'e.orig'.
...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
e_orig = getattr(e.orig, None)
if e_orig is not None:
# postgresql exception
Jij bepaalt welke informatie in de uitzondering staat. Hier ziet de uitzondering er zo uit:
CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4
Om het testen gemakkelijk te maken, worden de tabellen en de PostgreSQL function en PostgreSQL trigger bij elke run aangemaakt.
Om het voorbeeld uit te voeren, maak je een PostgreSQL database. Maak vervolgens een virtual environment en installeer het volgende:
pip install SQLAlchemy
pip install psycopg2-binary
Hier is de code, vergeet niet de databaseparameters toe te voegen:
# max_num_rows.py
import logging
import sys
import uuid
from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg
# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)
# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''
connection_uri = ''.join([
'postgresql+psycopg2://',
DB_USER + ':' + DB_PASSWORD,
'@',
DB_HOST + ':' + str(DB_PORT),
'/',
DB_NAME,
])
engine = sa.create_engine(connection_uri)
# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
max_rows INTEGER;
row_count INTEGER;
lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
-- get advisory lock to serialize access to the check
PERFORM pg_advisory_xact_lock(hashtext(lock_key));
-- get the maximum number of cars allowed per citizen
SELECT COALESCE(
(
SELECT max_cars
FROM config
),
0) INTO max_rows;
-- get current number of cars for this citizen
SELECT COUNT(*) INTO row_count
FROM car
WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;
-- we cannot insert if max_cars is reached
IF row_count >= max_rows THEN
-- release lock before raising an exception
PERFORM pg_advisory_unlock(hashtext(lock_key));
RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
END IF;
-- release lock
PERFORM pg_advisory_unlock(hashtext(lock_key));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""
# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""
# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""
# define tables
Base = declarative_base()
class CommonFields(object):
# Base - start
id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
deleted_on = sa.Column(sa.DateTime)
class Config(CommonFields, Base):
__tablename__ = 'config'
max_cars = sa.Column(sa.Integer)
def __repr__(self):
return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'
class Citizen(CommonFields, Base):
__tablename__ = 'citizen'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: cars
cars = relationship(
'Car',
back_populates='citizen',
)
def __repr__(self):
return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'
class Car(CommonFields, Base):
__tablename__ = 'car'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: citizen
citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
citizen = relationship(
'Citizen',
back_populates='cars',
)
def __repr__(self):
return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'
print(f'create tables')
must_create = True
if must_create:
print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
sys.exit() # remove this line to continue
Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(engine)
print('create session')
Session = sessionmaker(bind=engine)
db = Session()
def run_sql(sql):
stmt = sa.text(sql)
try:
db.execute(stmt)
except Exception as e:
exception = type(e).__name__
print(f'run_sql exception = {exception}, e.args = {e.args}')
sys.exit()
if must_create:
print('drop function & trigger')
run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('create function & trigger')
run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('add config ...')
config = Config(max_cars=2)
print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')
db.add(config)
db.add_all([john, jane, bob, alice])
print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')
# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]
db.commit()
db.flush()
print('show citizens and cars ...')
stmt = sa.select(Citizen).\
order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()
for i, citizen in enumerate(citizens):
print(f'citizen[{i}] {citizen}')
print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('remove a car from john ...')
print('##################################################################')
db.delete(johns_toyota)
db.commit()
print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')
stmt = sa.select(sa.func.count(Car.id)).\
where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')
print('ready')
Listing functies en triggers
In pgAdmin is dit eenvoudig. Navigeer naar:
Schemas -> Tables -> car -> Triggers
Vanaf hier kun je ook de broncode bekijken.
Je kunt ook query's gebruiken. Voer de query uit om een lijst met alle functies te krijgen:
SELECT proname, prosrc
FROM pg_proc;
Of, om alleen je eigen functies te zien die het woord 'check' bevatten:
SELECT proname, prosrc
FROM pg_proc
WHERE
proname LIKE '%check%'
Om een lijst van alle triggers te krijgen, voer je de query uit:
SELECT tgname FROM pg_trigger;
Samenvatting
Het gebruik van PostgreSQL functions en triggers is niet zo moeilijk, maar je moet echt even de tijd nemen om een testscript te maken waarmee je gemakkelijk kunt spelen. De PostgreSQL uitzondering kan eenvoudig worden gehaald uit de Python uitzondering. Tot slot is SQLAlchemy (meestal) een geweldig hulpmiddel. Veel plezier!
Links / credits
Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql
PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html
Lees meer
PostgreSQL SQLAlchemy
Recent
- Database UUID primaire sleutels van je webapplicatie verbergen
- Don't Repeat Yourself (DRY) met Jinja2
- SQLAlchemy, PostgreSQL, maximum aantal rijen per user
- Toon de waarden in SQLAlchemy dynamische filters
- Veilige gegevensoverdracht met Public Key versleuteling en pyNaCl
- rqlite: een alternatief voor SQLite met hoge beschikbaarheid en distributed
Meest bekeken
- Met behulp van Python's pyOpenSSL om SSL-certificaten die van een host zijn gedownload te controleren
- Gebruik van UUIDs in plaats van Integer Autoincrement Primary Keys met SQLAlchemy en MariaDb
- Maak verbinding met een dienst op een Docker host vanaf een Docker container
- PyInstaller en Cython gebruiken om een Python executable te maken
- SQLAlchemy: Gebruik van Cascade Deletes om verwante objecten te verwijderen
- Flask RESTful API verzoekparametervalidatie met Marshmallow-schema's