angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

SQLAlchemy, PostgreSQL, максимальное количество строк для user

Используйте PostgreSQL function и PostgreSQL trigger , чтобы ограничить максимальное количество строк в одном user.

5 февраля 2024 Обновленный 7 февраля 2024
post main image
https://pixabay.com/users/engin_akyurt-3656355

У вас есть приложение с несколькими user , использующее SQLAlchemy и PostgreSQL , и вы хотите ограничить количество строк на user определенной таблицы. Например, в каждом user может быть не более пяти сообщений.

Вам нужна такая операция, как:

  • Заблокировать таблицу
  • Подсчитать количество сообщений в user
  • Если число меньше пяти:
    • Добавить новый пост
    • Разблокировать таблицу
  • Элс:
    • Разблокировать таблицу
    • Сгенерировать исключение

Мы можем сделать это, используя только SQLAlchemy , но это очень много кода, и не очень быстро.

Лучше реализовать это с помощью:

  • A PostgreSQL function, и
  • A PostgreSQL trigger

PostgreSQL function используется для выполнения операции, а PostgreSQL trigger вызывает PostgreSQL function на операции INSERT .

PostgreSQL function и PostgreSQL trigger находятся на сервере базы данных. Мы загружаем их на сервер базы данных один раз с помощью SQLAlchemy. Это означает, что запросы PostgreSQL function выполняются на сервере базы данных.

Как обычно, я делаю это на Ubuntu 22.04.

Код

Я не буду много объяснять, я создал рабочий пример, в коде все закомментировано, смотрите ниже.

В примере у нас есть граждане и автомобили. Гражданин может иметь максимум два автомобиля. Это количество хранится в другой таблице (configuarion).

Сначала мы добавляем гражданину два автомобиля. Затем, когда мы пытаемся добавить третий автомобиль, генерируется исключение. Обратите внимание, что это исключение является исключением PostgreSQL . В SQLAlchemy оно доступно в 'e.orig'.

...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    e_orig = getattr(e.orig, None)
    if e_orig is not None:
        # postgresql exception

Вы сами решаете, какая информация будет содержаться в исключении. Здесь исключение выглядит так:

CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4

Чтобы упростить тестирование, таблицы PostgreSQL function и PostgreSQL trigger создаются при каждом запуске.

Для выполнения примера создайте базу данных PostgreSQL . Затем создайте virtual environment и установите следующее:

pip install SQLAlchemy
pip install psycopg2-binary

Вот код, не забудьте добавить параметры базы данных:

# max_num_rows.py
import logging
import sys
import uuid

from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg

# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)

# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''

connection_uri = ''.join([
    'postgresql+psycopg2://',
    DB_USER + ':' + DB_PASSWORD,
    '@',
    DB_HOST + ':' + str(DB_PORT),
    '/',
    DB_NAME,
])

engine = sa.create_engine(connection_uri)

# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
  max_rows INTEGER;
  row_count INTEGER;
  lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
  -- get advisory lock to serialize access to the check
  PERFORM pg_advisory_xact_lock(hashtext(lock_key));

  -- get the maximum number of cars allowed per citizen
  SELECT COALESCE(
    (
    SELECT max_cars
    FROM config
    ),
    0) INTO max_rows;

  -- get current number of cars for this citizen
  SELECT COUNT(*) INTO row_count 
  FROM car
  WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;

  -- we cannot insert if max_cars is reached
  IF row_count >= max_rows THEN
    -- release lock before raising an exception
    PERFORM pg_advisory_unlock(hashtext(lock_key));
    RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
  END IF;

  -- release lock
  PERFORM pg_advisory_unlock(hashtext(lock_key));
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""

# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""

# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""

# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""


# define tables
Base = declarative_base()

class CommonFields(object):
    # Base - start 
    id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
    created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
    deleted_on = sa.Column(sa.DateTime)

class Config(CommonFields, Base):
    __tablename__ = 'config'

    max_cars = sa.Column(sa.Integer)

    def __repr__(self):
        return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'

class Citizen(CommonFields, Base):
    __tablename__ = 'citizen'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: cars
    cars = relationship(
        'Car',
        back_populates='citizen',
    )

    def __repr__(self):
        return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'

class Car(CommonFields, Base):
    __tablename__ = 'car'

    name = sa.Column(sa.String(100), nullable=False, index=True)

    # relationship: citizen
    citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
    citizen = relationship(
        'Citizen',
        back_populates='cars',
    )

    def __repr__(self):
        return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'

print(f'create tables')
must_create = True
if must_create:
    print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
    print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
    sys.exit() # remove this line to continue
    Base.metadata.drop_all(engine, checkfirst=True)
    Base.metadata.create_all(engine)

print('create session')
Session = sessionmaker(bind=engine)
db = Session()

def run_sql(sql):
    stmt = sa.text(sql)
    try:
        db.execute(stmt)
    except Exception as e:
        exception = type(e).__name__
        print(f'run_sql exception = {exception}, e.args = {e.args}')
        sys.exit()

if must_create:
    print('drop function & trigger')
    run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()
    print('create function & trigger')
    run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
    run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
    db.commit()

print('add config ...')
config = Config(max_cars=2)

print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')

db.add(config)
db.add_all([john, jane, bob, alice])

print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')

# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]

db.commit()
db.flush()

print('show citizens and cars ...')
stmt = sa.select(Citizen).\
    order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()

for i, citizen in enumerate(citizens):
    print(f'citizen[{i}] {citizen}')

print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('remove a car from john ...')
print('##################################################################')

db.delete(johns_toyota)
db.commit()

print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')

johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])

try:
    db.commit()
except Exception as e:
    # rollback 
    db.rollback()
    e_orig = getattr(e, 'orig', None)
    if e_orig is not None:
        print(f'postgres exception = {e_orig}')
    else:
        print(f'other exception = {e}, e.args = {e.args}')
else:
    print('ERROR: no exception on insert')
    sys.exit()

print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')

stmt = sa.select(sa.func.count(Car.id)).\
    where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')

print('ready')

Листинг функций и триггеров

В pgAdmin это очень просто. Перейдите в:

Schemas -> Tables -> car -> Triggers

Отсюда вы также можете просмотреть исходный код.

Вы также можете использовать запросы. Чтобы получить список всех функций, выполните запрос:

SELECT proname, prosrc 
FROM pg_proc;

Или, чтобы увидеть только свои собственные функции, содержащие слово 'check':

SELECT proname, prosrc 
FROM pg_proc
WHERE 
proname LIKE '%check%'

Чтобы получить список всех триггеров, выполните запрос:

SELECT tgname FROM pg_trigger;

Summary

Использование PostgreSQL functions и триггеров не так уж сложно, но вы должны потратить некоторое время на создание тестового сценария, с которым будет легко играть. Исключение PostgreSQL можно легко извлечь из исключения Python . Наконец, SQLAlchemy - это (в большинстве случаев) отличный инструмент. Наслаждайтесь!

Ссылки / кредиты

Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql

PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html

Подробнее

PostgreSQL SQLAlchemy

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.