SQLAlchemy, PostgreSQL, número máximo de filas por user
Utilice un PostgreSQL function y un PostgreSQL trigger para limitar el número máximo de filas por user.
Tiene una aplicación multi-user que utiliza SQLAlchemy y PostgreSQL y desea limitar el número de filas por user de una tabla determinada. Por ejemplo, cada user puede tener un máximo de cinco entradas.
Necesitas una operación como
- Bloquear la tabla
- Contar el número de entradas del user
- Si el número es inferior a cinco:
- Añadir un nuevo mensaje
- Desbloquear la tabla
- Else:
- Desbloquear la tabla
- Generar excepción
Podemos hacer esto usando SQLAlchemy solamente pero esto es mucho código, y no muy rápido.
Una mejor manera es implementar esto usando:
- A PostgreSQL function, y
- A PostgreSQL trigger
El PostgreSQL function se utiliza para realizar la operación, y el PostgreSQL trigger llama al PostgreSQL function en una operación INSERT .
El PostgreSQL function y el PostgreSQL trigger están en el servidor de base de datos. Los cargamos en el servidor de base de datos una vez utilizando SQLAlchemy. Esto significa que las consultas del PostgreSQL function, se ejecutan en el servidor de base de datos.
Como siempre estoy haciendo esto en Ubuntu 22.04.
El código
No voy a explicar mucho aquí, he creado un ejemplo de trabajo, todo está comentado en el código, ver más abajo.
En el ejemplo tenemos ciudadanos y coches. Un ciudadano puede tener un número máximo de dos coches. Este número se almacena en otra tabla (configuarion).
Primero, añadimos dos coches a un ciudadano. Luego, cuando intentamos añadir un tercer coche, se genera una excepción. Observe que esta excepción es una excepción PostgreSQL . En SQLAlchemy, está disponible en 'e.orig'.
...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
e_orig = getattr(e.orig, None)
if e_orig is not None:
# postgresql exception
Usted decide qué información contiene la excepción. Aquí se ve la excepción:
CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4
Para facilitar las pruebas, las tablas y los PostgreSQL function y PostgreSQL trigger se crean en cada ejecución.
Para ejecutar el ejemplo cree una base de datos PostgreSQL . A continuación, cree una virtual environment e instale lo siguiente:
pip install SQLAlchemy
pip install psycopg2-binary
Aquí está el código, no olvide añadir los parámetros de la base de datos:
# max_num_rows.py
import logging
import sys
import uuid
from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg
# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)
# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''
connection_uri = ''.join([
'postgresql+psycopg2://',
DB_USER + ':' + DB_PASSWORD,
'@',
DB_HOST + ':' + str(DB_PORT),
'/',
DB_NAME,
])
engine = sa.create_engine(connection_uri)
# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
max_rows INTEGER;
row_count INTEGER;
lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
-- get advisory lock to serialize access to the check
PERFORM pg_advisory_xact_lock(hashtext(lock_key));
-- get the maximum number of cars allowed per citizen
SELECT COALESCE(
(
SELECT max_cars
FROM config
),
0) INTO max_rows;
-- get current number of cars for this citizen
SELECT COUNT(*) INTO row_count
FROM car
WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;
-- we cannot insert if max_cars is reached
IF row_count >= max_rows THEN
-- release lock before raising an exception
PERFORM pg_advisory_unlock(hashtext(lock_key));
RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
END IF;
-- release lock
PERFORM pg_advisory_unlock(hashtext(lock_key));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""
# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""
# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""
# define tables
Base = declarative_base()
class CommonFields(object):
# Base - start
id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
deleted_on = sa.Column(sa.DateTime)
class Config(CommonFields, Base):
__tablename__ = 'config'
max_cars = sa.Column(sa.Integer)
def __repr__(self):
return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'
class Citizen(CommonFields, Base):
__tablename__ = 'citizen'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: cars
cars = relationship(
'Car',
back_populates='citizen',
)
def __repr__(self):
return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'
class Car(CommonFields, Base):
__tablename__ = 'car'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: citizen
citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
citizen = relationship(
'Citizen',
back_populates='cars',
)
def __repr__(self):
return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'
print(f'create tables')
must_create = True
if must_create:
print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
sys.exit() # remove this line to continue
Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(engine)
print('create session')
Session = sessionmaker(bind=engine)
db = Session()
def run_sql(sql):
stmt = sa.text(sql)
try:
db.execute(stmt)
except Exception as e:
exception = type(e).__name__
print(f'run_sql exception = {exception}, e.args = {e.args}')
sys.exit()
if must_create:
print('drop function & trigger')
run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('create function & trigger')
run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('add config ...')
config = Config(max_cars=2)
print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')
db.add(config)
db.add_all([john, jane, bob, alice])
print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')
# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]
db.commit()
db.flush()
print('show citizens and cars ...')
stmt = sa.select(Citizen).\
order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()
for i, citizen in enumerate(citizens):
print(f'citizen[{i}] {citizen}')
print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('remove a car from john ...')
print('##################################################################')
db.delete(johns_toyota)
db.commit()
print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')
stmt = sa.select(sa.func.count(Car.id)).\
where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')
print('ready')
Listado de funciones y triggers
En pgAdmin esto es fácil. Navega hasta:
Schemas -> Tables -> car -> Triggers
Desde aquí también puedes ver el código fuente.
También puede utilizar consultas. Para obtener una lista de todas las funciones, ejecute la consulta:
SELECT proname, prosrc
FROM pg_proc;
O, para ver sólo tus propias funciones que contengan la palabra 'check':
SELECT proname, prosrc
FROM pg_proc
WHERE
proname LIKE '%check%'
Para obtener una lista de todos los activadores, ejecute la consulta:
SELECT tgname FROM pg_trigger;
Resumen
Utilizar PostgreSQL functions y triggers no es tan difícil, pero hay que dedicar algo de tiempo a crear un script de prueba que facilite el juego. La excepción PostgreSQL puede extraerse fácilmente de la excepción Python . Finalmente, SQLAlchemy es (la mayoría de las veces) una gran herramienta. ¡Que la disfrutes!
Enlaces / créditos
Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql
PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html
Leer más
PostgreSQL SQLAlchemy
Recientes
- Un conmutador de base de datos con HAProxy y el HAProxy Runtime API
- Docker Swarm rolling updates
- Cómo ocultar las claves primarias de la base de datos UUID de su aplicación web
- Don't Repeat Yourself (DRY) con Jinja2
- SQLAlchemy, PostgreSQL, número máximo de filas por user
- Mostrar los valores en filtros dinámicos SQLAlchemy
Más vistos
- Usando Python's pyOpenSSL para verificar los certificados SSL descargados de un host
- Usando PyInstaller y Cython para crear un ejecutable de Python
- Reducir los tiempos de respuesta de las páginas de un sitio Flask SQLAlchemy web
- Conectarse a un servicio en un host Docker desde un contenedor Docker
- Usando UUIDs en lugar de Integer Autoincrement Primary Keys con SQLAlchemy y MariaDb
- SQLAlchemy: Uso de Cascade Deletes para eliminar objetos relacionados