angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

SQLAlchemy, PostgreSQL, número máximo de filas por user

Utilice un PostgreSQL function y un PostgreSQL trigger para limitar el número máximo de filas por user.

5 febrero 2024 Actualizado 7 febrero 2024
post main image
https://pixabay.com/users/engin_akyurt-3656355

Tiene una aplicación multi-user que utiliza SQLAlchemy y PostgreSQL y desea limitar el número de filas por user de una tabla determinada. Por ejemplo, cada user puede tener un máximo de cinco entradas.

Necesitas una operación como

  • Bloquear la tabla
  • Contar el número de entradas del user
  • Si el número es inferior a cinco:
    • Añadir un nuevo mensaje
    • Desbloquear la tabla
  • Else:
    • Desbloquear la tabla
    • Generar excepción

Podemos hacer esto usando SQLAlchemy solamente pero esto es mucho código, y no muy rápido.

Una mejor manera es implementar esto usando:

  • A PostgreSQL function, y
  • A PostgreSQL trigger

El PostgreSQL function se utiliza para realizar la operación, y el PostgreSQL trigger llama al PostgreSQL function en una operación INSERT .

El PostgreSQL function y el PostgreSQL trigger están en el servidor de base de datos. Los cargamos en el servidor de base de datos una vez utilizando SQLAlchemy. Esto significa que las consultas del PostgreSQL function, se ejecutan en el servidor de base de datos.

Como siempre estoy haciendo esto en Ubuntu 22.04.

El código

No voy a explicar mucho aquí, he creado un ejemplo de trabajo, todo está comentado en el código, ver más abajo.

En el ejemplo tenemos ciudadanos y coches. Un ciudadano puede tener un número máximo de dos coches. Este número se almacena en otra tabla (configuarion).

Primero, añadimos dos coches a un ciudadano. Luego, cuando intentamos añadir un tercer coche, se genera una excepción. Observe que esta excepción es una excepción PostgreSQL . En SQLAlchemy, está disponible en 'e.orig'.

...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    e_orig = getattr(e.orig, None)
    if e_orig is not None:
        # postgresql exception

Usted decide qué información contiene la excepción. Aquí se ve la excepción:

CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4

Para facilitar las pruebas, las tablas y los PostgreSQL function y PostgreSQL trigger se crean en cada ejecución.

Para ejecutar el ejemplo cree una base de datos PostgreSQL . A continuación, cree una virtual environment e instale lo siguiente:

pip install SQLAlchemy
pip install psycopg2-binary

Aquí está el código, no olvide añadir los parámetros de la base de datos:

# max_num_rows.py
import logging
import sys
import uuid

from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg

# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)

# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''

connection_uri = ''.join([
    'postgresql+psycopg2://',
    DB_USER + ':' + DB_PASSWORD,
    '@',
    DB_HOST + ':' + str(DB_PORT),
    '/',
    DB_NAME,
])

engine = sa.create_engine(connection_uri)

# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
  max_rows INTEGER;
  row_count INTEGER;
  lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
  -- get advisory lock to serialize access to the check
  PERFORM pg_advisory_xact_lock(hashtext(lock_key));

  -- get the maximum number of cars allowed per citizen
  SELECT COALESCE(
    (
    SELECT max_cars
    FROM config
    ),
    0) INTO max_rows;

  -- get current number of cars for this citizen
  SELECT COUNT(*) INTO row_count 
  FROM car
  WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;

  -- we cannot insert if max_cars is reached
  IF row_count >= max_rows THEN
    -- release lock before raising an exception
    PERFORM pg_advisory_unlock(hashtext(lock_key));
    RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
  END IF;

  -- release lock
  PERFORM pg_advisory_unlock(hashtext(lock_key));
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""

# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""

# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""

# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""


# define tables
Base = declarative_base()

class CommonFields(object):
    # Base - start 
    id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
    created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
    deleted_on = sa.Column(sa.DateTime)

class Config(CommonFields, Base):
    __tablename__ = 'config'

    max_cars = sa.Column(sa.Integer)

    def __repr__(self):
        return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'

class Citizen(CommonFields, Base):
    __tablename__ = 'citizen'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: cars
    cars = relationship(
        'Car',
        back_populates='citizen',
    )

    def __repr__(self):
        return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'

class Car(CommonFields, Base):
    __tablename__ = 'car'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: citizen
    citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
    citizen = relationship(
        'Citizen',
        back_populates='cars',
    )

    def __repr__(self):
        return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'

print(f'create tables')
must_create = True
if must_create:
    print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
    print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
    sys.exit() # remove this line to continue
    Base.metadata.drop_all(engine, checkfirst=True)
    Base.metadata.create_all(engine)

print('create session')
Session = sessionmaker(bind=engine)
db = Session()

def run_sql(sql):
    stmt = sa.text(sql)
    try:
        db.execute(stmt)
    except Exception as e:
        exception = type(e).__name__
        print(f'run_sql exception = {exception}, e.args = {e.args}')
        sys.exit()

if must_create:
    print('drop function & trigger')
    run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()
    print('create function & trigger')
    run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()

print('add config ...')
config = Config(max_cars=2)

print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')

db.add(config)
db.add_all([john, jane, bob, alice])

print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')

# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]

db.commit()
db.flush()

print('show citizens and cars ...')
stmt = sa.select(Citizen).\
    order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()

for i, citizen in enumerate(citizens):
    print(f'citizen[{i}] {citizen}')

print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('remove a car from john ...')
print('##################################################################')

db.delete(johns_toyota)
db.commit()

print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')

stmt = sa.select(sa.func.count(Car.id)).\
    where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')

print('ready')

Listado de funciones y triggers

En pgAdmin esto es fácil. Navega hasta:

Schemas -> Tables -> car -> Triggers

Desde aquí también puedes ver el código fuente.

También puede utilizar consultas. Para obtener una lista de todas las funciones, ejecute la consulta:

SELECT proname, prosrc 
FROM pg_proc;

O, para ver sólo tus propias funciones que contengan la palabra 'check':

SELECT proname, prosrc 
FROM pg_proc
WHERE 
proname LIKE '%check%'

Para obtener una lista de todos los activadores, ejecute la consulta:

SELECT tgname FROM pg_trigger;

Resumen

Utilizar PostgreSQL functions y triggers no es tan difícil, pero hay que dedicar algo de tiempo a crear un script de prueba que facilite el juego. La excepción PostgreSQL puede extraerse fácilmente de la excepción Python . Finalmente, SQLAlchemy es (la mayoría de las veces) una gran herramienta. ¡Que la disfrutes!

Enlaces / créditos

Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql

PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html

Deje un comentario

Comente de forma anónima o inicie sesión para comentar.

Comentarios

Deje una respuesta.

Responda de forma anónima o inicie sesión para responder.