SQLAlchemy, PostgreSQL, maximum number of rows per user
Use a PostgreSQL function and a PostgreSQL trigger to limit the maximum number of rows per user.
You have a multi-user application using SQLAlchemy and PostgreSQL and want to limit the number of rows per user of a certain table. For example, every user can have a maximum of five posts.
You need an operation like:
- Lock the table
- Count the number of posts of the user
- If the number less than five:
- Add new post
- Unlock the table
- Else:
- Unlock the table
- Generate exception
We can do this using SQLAlchemy only but this is a lot of code, and not very fast.
A better way is to implement this using:
- A PostgreSQL function, and
- A PostgreSQL trigger
The PostgreSQL function is used to perform the operation, and the PostgreSQL trigger calls the PostgreSQL function on an INSERT operation.
The PostgreSQL function and PostgreSQL trigger are on the database server. We upload them to the database server once using SQLAlchemy. This means that the queries of the PostgreSQL function, run on the database server.
As always I am doing this on Ubuntu 22.04.
The code
I am not going explain a lot here, I created a working example, it's all commented in the code, see below.
In the example we have citizens and cars. A citizen can have a maximum number of two cars. This number is stored in another (configuarion) table.
First, we add two cars to a citizen. Then, when we try to add a third car, an exception is generated. Note that this exception is a PostgreSQL exception. In SQLAlchemy, it is available in 'e.orig'.
...
# add a car to a citizen
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
e_orig = getattr(e.orig, None)
if e_orig is not None:
# postgresql exception
You decide which information is in the exception. Here the exception looks like:
CitizenCarsMaxRowsLimitReachedError|max_rows=2|row_count=2|citizen_id=7b38f947-de4e-4972-9e28-e17f510e87f4
To make testing easy, the tables and the PostgreSQL function and PostgreSQL trigger are created on every run.
To run the example create a PostgreSQL database. Then create a virtual environment and install the following:
pip install SQLAlchemy
pip install psycopg2-binary
Here is the code, dont forget to add the database parameters:
# max_num_rows.py
import logging
import sys
import uuid
from sqlalchemy.engine import Engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg
# sqlalchemy logging
logging.basicConfig()
sqlalchemy_logging_level = logging.DEBUG
#sqlalchemy_logging_level = logging.INFO
#sqlalchemy_logging_level = logging.ERROR
logging.getLogger('sqlalchemy.engine').setLevel(sqlalchemy_logging_level)
# your test (!) database parameters
DB_HOST = ''
DB_PORT = 5432
DB_NAME = ''
DB_USER = ''
DB_PASSWORD = ''
connection_uri = ''.join([
'postgresql+psycopg2://',
DB_USER + ':' + DB_PASSWORD,
'@',
DB_HOST + ':' + str(DB_PORT),
'/',
DB_NAME,
])
engine = sa.create_engine(connection_uri)
# create the check function
# the function is called by the trigger
pg_crea_func___check_insert_row_limit_cars_per_citizen = """
CREATE OR REPLACE FUNCTION check_insert_row_limit_cars_per_citizen()
RETURNS TRIGGER AS $$
DECLARE
max_rows INTEGER;
row_count INTEGER;
lock_key TEXT := 'insert_limit_car_' || NEW.citizen_id;
BEGIN
-- get advisory lock to serialize access to the check
PERFORM pg_advisory_xact_lock(hashtext(lock_key));
-- get the maximum number of cars allowed per citizen
SELECT COALESCE(
(
SELECT max_cars
FROM config
),
0) INTO max_rows;
-- get current number of cars for this citizen
SELECT COUNT(*) INTO row_count
FROM car
WHERE citizen_id = NEW.citizen_id AND deleted_on IS NULL;
-- we cannot insert if max_cars is reached
IF row_count >= max_rows THEN
-- release lock before raising an exception
PERFORM pg_advisory_unlock(hashtext(lock_key));
RAISE EXCEPTION 'CitizenCarsMaxRowsLimitReachedError|max_rows=%|row_count=%|citizen_id=%', max_rows, row_count, NEW.citizen_id;
END IF;
-- release lock
PERFORM pg_advisory_unlock(hashtext(lock_key));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
# create the trigger:
# calls the check function, before (!) an insert into table car
pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger = """
CREATE TRIGGER check_insert_row_limit_cars_per_citizen_trigger
BEFORE INSERT ON car
FOR EACH ROW
EXECUTE FUNCTION check_insert_row_limit_cars_per_citizen();
"""
# drop the function
pg_drop_func___check_insert_row_limit_cars_per_citizen = """
DROP FUNCTION IF EXISTS check_insert_row_limit_cars_per_citizen;
"""
# drop the trigger
pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger = """
DROP TRIGGER IF EXISTS tenant_insert_limit_status_page_trigger ON car;
"""
# define tables
Base = declarative_base()
class CommonFields(object):
# Base - start
id = sa.Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True)
created_on = sa.Column(sa.DateTime, server_default=sa.func.now(), index=True)
deleted_on = sa.Column(sa.DateTime)
class Config(CommonFields, Base):
__tablename__ = 'config'
max_cars = sa.Column(sa.Integer)
def __repr__(self):
return f'<Config: id = {self.id}, max_cars = {self.max_cars}>'
class Citizen(CommonFields, Base):
__tablename__ = 'citizen'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: cars
cars = relationship(
'Car',
back_populates='citizen',
)
def __repr__(self):
return f'<Citizen: id = {self.id}, name = {self.name}, cars = {self.cars}>'
class Car(CommonFields, Base):
__tablename__ = 'car'
name = sa.Column(sa.String(100), nullable=False, index=True)
# relationship: citizen
citizen_id = sa.Column(pg.UUID(as_uuid=True), sa.ForeignKey('citizen.id'), nullable=False, index=True)
citizen = relationship(
'Citizen',
back_populates='cars',
)
def __repr__(self):
return f'<Car: id = {self.id}, name = {self.name}, citizen_id = {self.citizen_id}>'
print(f'create tables')
must_create = True
if must_create:
print('WARNING: ONLY DO THIS ON A NEWLY CREATED TEST DATABASE')
print('REMOVE THE NEXT LINE TO CONTINUE, SEE CODE')
sys.exit() # remove this line to continue
Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(engine)
print('create session')
Session = sessionmaker(bind=engine)
db = Session()
def run_sql(sql):
stmt = sa.text(sql)
try:
db.execute(stmt)
except Exception as e:
exception = type(e).__name__
print(f'run_sql exception = {exception}, e.args = {e.args}')
sys.exit()
if must_create:
print('drop function & trigger')
run_sql(pg_drop_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_drop_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('create function & trigger')
run_sql(pg_crea_func___check_insert_row_limit_cars_per_citizen)
run_sql(pg_crea_trig___check_insert_row_limit_cars_per_citizen_trigger)
db.commit()
print('add config ...')
config = Config(max_cars=2)
print('add some citizens ...')
john = Citizen(name='John')
jane = Citizen(name='Jane')
bob = Citizen(name='Bob')
alice = Citizen(name='Alice')
db.add(config)
db.add_all([john, jane, bob, alice])
print('add some cars ...')
johns_ford = Car(name='Ford')
johns_toyota = Car(name='Toyota')
janes_volkswagen = Car(name='Volkswagen')
janes_kia = Car(name='Kia')
bobs_kia = Car(name='Kia')
# assign cars to citizens
john.cars = [johns_ford, johns_toyota]
jane.cars = [janes_volkswagen, janes_kia]
bob.cars = [bobs_kia]
db.commit()
db.flush()
print('show citizens and cars ...')
stmt = sa.select(Citizen).\
order_by(Citizen.name)
citizens = db.execute(stmt).scalars().all()
for i, citizen in enumerate(citizens):
print(f'citizen[{i}] {citizen}')
print('##################################################################')
print('add a third car to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
john.cars.append(johns_delorean)
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('remove a car from john ...')
print('##################################################################')
db.delete(johns_toyota)
db.commit()
print('##################################################################')
print('add two cars to john, should generate a postgres exception ...')
print('##################################################################')
johns_delorean = Car(name='DeLorean')
johns_suzuki = Car(name='Suzuki')
john.cars.extend([johns_delorean, johns_suzuki])
try:
db.commit()
except Exception as e:
# rollback
db.rollback()
e_orig = getattr(e, 'orig', None)
if e_orig is not None:
print(f'postgres exception = {e_orig}')
else:
print(f'other exception = {e}, e.args = {e.args}')
else:
print('ERROR: no exception on insert')
sys.exit()
print('##################################################################')
print('number of johns cars still should be one ...')
print('##################################################################')
stmt = sa.select(sa.func.count(Car.id)).\
where(Car.citizen_id == john.id)
n = db.execute(stmt).scalars().first()
print(f'n = {n}')
print('ready')
Listing functions and triggers
In pgAdmin this is easy. Navigate to:
Schemas -> Tables -> car -> Triggers
From here, you can also view the source code.
You can also use queries. To get a list of all functions, run the query:
SELECT proname, prosrc
FROM pg_proc;
Or, to see only your own functions containg the word 'check':
SELECT proname, prosrc
FROM pg_proc
WHERE
proname LIKE '%check%'
To get a list of all triggers, run the query:
SELECT tgname FROM pg_trigger;
Summary
Using PostgreSQL functions and triggers is not that difficult but you really must take some time to create a test script that makes it easy to play with. The PostgreSQL exception can be easily extracted from the Python exception. Finally, SQLAlchemy is a (most of the time) a great tool. Enjoy!
Links / credits
Limit the number of rows allowed in a table in PostgreSQL [closed]
https://gis.stackexchange.com/questions/261652/limit-the-number-of-rows-allowed-in-a-table-in-postgresql
PostgreSQL Triggers: Create, List & Drop with Example
https://www.guru99.com/postgresql-trigger-create-drop.html
Read more
PostgreSQL SQLAlchemy
Most viewed
- Using Python's pyOpenSSL to verify SSL certificates downloaded from a host
- Using PyInstaller and Cython to create a Python executable
- Reducing page response times of a Flask SQLAlchemy website
- Connect to a service on a Docker host from a Docker container
- SQLAlchemy: Using Cascade Deletes to delete related objects
- Using UUIDs instead of Integer Autoincrement Primary Keys with SQLAlchemy and MariaDb