SQLAlchemy, PostgreSQL, максимальное количество строк для user
Используйте PostgreSQL function и PostgreSQL trigger , чтобы ограничить максимальное количество строк в одном user.

У вас есть приложение с несколькими user , использующее SQLAlchemy и PostgreSQL , и вы хотите ограничить количество строк на user определенной таблицы. Например, в каждом user может быть не более пяти сообщений.
Вам нужна такая операция, как:
- Заблокировать таблицу
- Подсчитать количество сообщений в user
- Если число меньше пяти:
- Добавить новый пост
- Разблокировать таблицу
- Элс:
- Разблокировать таблицу
- Сгенерировать исключение
Мы можем сделать это, используя только SQLAlchemy , но это очень много кода, и не очень быстро.
Лучше реализовать это с помощью:
- A PostgreSQL function, и
- A PostgreSQL trigger
PostgreSQL function используется для выполнения операции, а PostgreSQL trigger вызывает PostgreSQL function на операции INSERT .
PostgreSQL function и PostgreSQL trigger находятся на сервере базы данных. Мы загружаем их на сервер базы данных один раз с помощью SQLAlchemy. Это означает, что запросы PostgreSQL function выполняются на сервере базы данных.
Как обычно, я делаю это на Ubuntu 22.04.
Код
Я не буду много объяснять, я создал рабочий пример, в коде все закомментировано, смотрите ниже.
В примере у нас есть граждане и автомобили. Гражданин может иметь максимум два автомобиля. Это количество хранится в другой таблице (configuarion).
Сначала мы добавляем гражданину два автомобиля. Затем, когда мы пытаемся добавить третий автомобиль, генерируется исключение. Обратите внимание, что это исключение является исключением PostgreSQL . В SQLAlchemy оно доступно в 'e.orig'.
...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
e_orig = getattr(e.orig, None)
if e_orig is not None:
# postgresql exception
Вы сами решаете, какая информация будет содержаться в исключении. Здесь исключение выглядит так:
CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4
Чтобы упростить тестирование, таблицы PostgreSQL function и PostgreSQL trigger создаются при каждом запуске.
Для выполнения примера создайте базу данных PostgreSQL . Затем создайте virtual environment и установите следующее:
pip install SQLAlchemy
pip install psycopg2-binary
Вот код, не забудьте добавить параметры базы данных:
# max_num_rows.py
import logging
import sys
import uuid
from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg
# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)
# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''
connection_uri = ''.join([
'postgresql+psycopg2://',
DB_USER + ':' + DB_PASSWORD,
'@',
DB_HOST + ':' + str(DB_PORT),
'/',
DB_NAME,
])
engine = sa.create_engine(connection_uri)
# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
max_rows INTEGER;
row_count INTEGER;
lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
-- get advisory lock to serialize access to the check
PERFORM pg_advisory_xact_lock(hashtext(lock_key));
-- get the maximum number of cars allowed per citizen
SELECT COALESCE(
(
SELECT max_cars
FROM config
),
0) INTO max_rows;
-- get current number of cars for this citizen
SELECT COUNT(*) INTO row_count
FROM car
WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;
-- we cannot insert if max_cars is reached
IF row_count >= max_rows THEN
-- release lock before raising an exception
PERFORM pg_advisory_unlock(hashtext(lock_key));
RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
END IF;
-- release lock
PERFORM pg_advisory_unlock(hashtext(lock_key));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""
# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""
# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""
# define tables
Base = declarative_base()
class CommonFields(object):
# Base - start
id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
deleted_on = sa.Column(sa.DateTime)
class Config(CommonFields, Base):
__tablename__ = 'config'
max_cars = sa.Column(sa.Integer)
def __repr__(self):
return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'
class Citizen(CommonFields, Base):
__tablename__ = 'citizen'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: cars
cars = relationship(
'Car',
back_populates='citizen',
)
def __repr__(self):
return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'
class Car(CommonFields, Base):
__tablename__ = 'car'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: citizen
citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
citizen = relationship(
'Citizen',
back_populates='cars',
)
def __repr__(self):
return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'
print(f'create tables')
must_create = True
if must_create:
print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
sys.exit() # remove this line to continue
Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(engine)
print('create session')
Session = sessionmaker(bind=engine)
db = Session()
def run_sql(sql):
stmt = sa.text(sql)
try:
db.execute(stmt)
except Exception as e:
exception = type(e).__name__
print(f'run_sql exception = {exception}, e.args = {e.args}')
sys.exit()
if must_create:
print('drop function & trigger')
run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('create function & trigger')
run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('add config ...')
config = Config(max_cars=2)
print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')
db.add(config)
db.add_all([john, jane, bob, alice])
print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')
# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]
db.commit()
db.flush()
print('show citizens and cars ...')
stmt = sa.select(Citizen).\
order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()
for i, citizen in enumerate(citizens):
print(f'citizen[{i}] {citizen}')
print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('remove a car from john ...')
print('##################################################################')
db.delete(johns_toyota)
db.commit()
print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')
stmt = sa.select(sa.func.count(Car.id)).\
where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')
print('ready')
Листинг функций и триггеров
В pgAdmin это очень просто. Перейдите в:
Schemas -> Tables -> car -> Triggers
Отсюда вы также можете просмотреть исходный код.
Вы также можете использовать запросы. Чтобы получить список всех функций, выполните запрос:
SELECT proname, prosrc
FROM pg_proc;
Или, чтобы увидеть только свои собственные функции, содержащие слово 'check':
SELECT proname, prosrc
FROM pg_proc
WHERE
proname LIKE '%check%'
Чтобы получить список всех триггеров, выполните запрос:
SELECT tgname FROM pg_trigger;
Summary
Использование PostgreSQL functions и триггеров не так уж сложно, но вы должны потратить некоторое время на создание тестового сценария, с которым будет легко играть. Исключение PostgreSQL можно легко извлечь из исключения Python . Наконец, SQLAlchemy - это (в большинстве случаев) отличный инструмент. Наслаждайтесь!
Ссылки / кредиты
Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql
PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html
Подробнее
PostgreSQL SQLAlchemy
Недавний
- График временного ряда с Flask, Bootstrap и Chart.js
- Использование IPv6 с Microk8s
- Использование Ingress для доступа к RabbitMQ на кластере Microk8s
- Простая видеогалерея с Flask, Jinja, Bootstrap и JQuery
- Базовое планирование заданий с помощью APScheduler
- Коммутатор базы данных с HAProxy и HAProxy Runtime API
Большинство просмотренных
- Использование PyInstaller и Cython для создания исполняемого файла Python
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Подключение к службе на хосте Docker из контейнера Docker
- График временного ряда с Flask, Bootstrap и Chart.js
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb