angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Anzeige der Werte in den dynamischen Filtern SQLAlchemy

Erstellen Sie eine kleine Hilfsfunktion, um das Debuggen von dynamischen SQLAlchemy -Filtern zum Kinderspiel zu machen.

18 Januar 2024
post main image
https://unsplash.com/@dillonjshook

Bei der Verwendung von SQLAlchemy verwende ich oft dynamische Filter in meinen Abfragen. Das heißt, ich beginne mit einer Liste mit einigen Bedingungen und füge weitere Bedingungen hinzu, die von anderen Variablen abhängen.
Hier ist eine Abfrage mit einem statischen Filter:

# query with static filter

product_colors = ['white']

stmt = sa.select(Product).\
    where(sa.and(
        Product.category.in_(my_categories),
        Product.color.in_(product_colors),
        Product.price < 400,
        ...
    )).\
    order_by(Product.price)

Wir können dies mit einem dynamischen Filter ändern:

# query with dynamic filter

product_colors = ['white']

# start with:
filter_items = [
    Product.category.in_(my_categories)
]

# add somewhere else:
filter_items.extend([
    Product.color.in_(product_colors),
    Product.price < 400,
    ...
])
    
# construct query
filter_tuple = tuple(filter_items)
stmt = sa.select(Product).\
    where(sa.and_(*filter_tuple)).\
    )).\
    order_by(Product.price)

Der Filter ist eine Liste von BinaryExpression-Objekten, die sich leicht ausdrucken lassen:

print(f'filter_items:')
for i, filter_item in enumerate(filter_items):
    print(f'[{i}] {filter_item}')

Und das Ergebnis ist zum Beispiel:

filter_items:
[0] product.customer_id = customer.id
[1] product.category = :category_1
[2] product.color IN (__[POSTCOMPILE_color_1])
[3] product.price IS NOT NULL
[4] product.price < :price_1

Das ist sehr schön, aber nicht sehr nützlich für die Fehlersuche, da die Werte nicht angezeigt werden.

Ich habe eine kleine Hilfsfunktion 'dump_filter_items' erstellt, die die filter_items einschließlich ihrer Werte ausgibt. Diese Funktion erzeugt einen mehr detailed Dump der filter_items und enthält die Werte:

filter_items with values:
[0] product.customer_id = customer.id
[0] - left: product.customer_id
[0] - operator: <built-in function eq>
[0] - right: customer.id
[1] product.category = :category_1
[1] - left: product.category
[1] - operator: <built-in function eq>
[1] - right: shirt (value)
[2] product.color IN (__[POSTCOMPILE_color_1])
[2] - left: product.color
[2] - operator: <function in_op at 0x7f88efce5260>
[2] - right: ['white'] (value)
[3] product.price IS NOT NULL
[3] - left: product.price
[3] - operator: <function is_not at 0x7f88efce4cc0>
[3] - right: NULL
[4] product.price < :price_1
[4] - left: product.price
[4] - operator: <built-in function lt>
[4] - right: 400 (value)

Dies ist sehr einfach, es gibt viel mehr Informationen im BinaryExpression-Objekt.

Hier ist der Code, wenn Sie es selbst versuchen wollen.

# customer_product.py
import sys

import sqlalchemy as sa
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

from sqlalchemy.engine import Engine
from sqlalchemy import event
from sqlite3 import Connection as SQLite3Connection

# ENABLE FOREIGN KEYS
@event.listens_for(Engine, "connect")
def _set_sqlite_pragma(dbapi_connection, connection_record):
    if isinstance(dbapi_connection, SQLite3Connection):
        print('turning on foreign keys ...')
        cursor = dbapi_connection.cursor()
        cursor.execute('PRAGMA foreign_keys=ON;')
        cursor.close()

Base = declarative_base()

class Customer(Base):
    __tablename__ = 'customer'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String(100), nullable=False)
    city = sa.Column(sa.String(100), nullable=False)
    age = sa.Column(sa.Integer, nullable=False)

    # relationship: orders
    products = relationship(
        'Product',
        back_populates='customer',
    )

    def __repr__(self):
        return f'<Customer: id = {self.id}, name = {self.name}, city = {self.city}, age = {self.age}>'


class Product(Base):
    __tablename__ = 'product'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String(100), nullable=False)
    category = sa.Column(sa.String(100), nullable=False)
    color = sa.Column(sa.String(100), nullable=False)
    price = sa.Column(sa.Integer, nullable=False)

    # relationship: customer
    customer_id = sa.Column(sa.Integer, sa.ForeignKey('customer.id'), nullable=False, index=True)
    customer = relationship(
        'Customer',
        back_populates='products',
    )

    def __repr__(self):
        return f'<Product: id = {self.id}, name = {self.name}, color = {self.color}, price = {self.price}>'

# get engine and create all
engine_echo = True
engine = sa.create_engine('sqlite:///:memory:', echo=engine_echo)
Base.metadata.drop_all(engine, checkfirst=True)
Base.metadata.create_all(engine)

print('create session')
Session = sessionmaker(bind=engine)
session = Session()

print('add some customers ...')
customers = [
    Customer(name='John', city='Houston', age='45'), 
    Customer(name='Jane', city='red', age='44'), 
    Customer(name='Bob',  city='blue', age='60'), 
    Customer(name='Alice', city='white', age='55'), 
]
session.add_all(customers)

print('add some products ...')
products = [
    Product(name='Shirt1', category='shirt', color='white', price='100'), 
    Product(name='Dress1', category='dress', color='red', price='200'), 
    Product(name='Socks1', category='socks', color='blue', price='300'), 
    Product(name='Shirt2', category='shirt', color='white', price='300'), 
]
session.add_all(products)

print('add some products to some customers ...')
customers[0].products = [products[0], products[1]]
customers[1].products = [products[1], products[2]]
customers[2].products = [products[1], products[2], products[3]]

# stuff
session.commit()
session.flush()

product_colors = ['white']

# query1: select all customers with products: white shirts with price below 400, sort by highest price
stmt = sa.select(Customer, Product).\
    where(sa.and_(
        Product.customer_id == Customer.id,
        Product.category == 'shirt',
        Product.color.in_(product_colors),
        Product.price != None,
        Product.price < 400,
    )).\
    order_by(Product.price.desc())
customer_product = session.execute(stmt).all()

print(f'query1 result:')
for customer, product in customer_product:
    print(f'{customer}: {product}')

# now use filter_items

# start with:
filter_items = [
    Product.customer_id == Customer.id,
    Product.category == 'shirt',
]

# add somewhere else:
filter_items.extend([
    Product.color.in_(product_colors),
    Product.price != None,
    Product.price < 400,
])

print(f'filter_items:')
for i, filter_item in enumerate(filter_items):
    print(f'[{i}] {filter_item}')

filter_tuple = tuple(filter_items)
stmt = sa.select(Customer, Product).\
    where(sa.and_(*filter_tuple)).\
    order_by(Product.price.desc())
customer_product = session.execute(stmt).all()

print(f'query2 result:')
for customer, product in customer_product:
    print(f'{customer}: {product}')

# at a certain moment you want to see the values in the filter_items
# every filter_item is a sqlalchemy.sql.elements.BinaryExpression object
def dump_filter_items(filter_items):
    for i, filter_item in enumerate(filter_items):
        print(f'[{i}] {filter_item}')
        #print(f'[{i}] dict = {filter_item.__dict__}')
        if isinstance(filter_item, sa.sql.elements.BinaryExpression):
            print(f'[{i}] - left: {filter_item.left}')
            print(f'[{i}] - operator: {filter_item.operator}')
            if not hasattr(filter_item, 'right'):
                print(f'[{i}] - filter_item.right not present')
            else:
                filter_item_right = getattr(filter_item, 'right')
                if not hasattr(filter_item_right, 'value'):
                    print(f'[{i}] - right: {filter_item_right}')
                else:
                    value = getattr(filter_item_right, 'value')
                    print(f'[{i}] - right: {filter_item.right.value} (value)')
        else:
            print(f'[{i}] ?')

print(f'filter_items with values:')
dump_filter_items(filter_items)

Zusammenfassung

Ich wollte die Werte der dynamischen Filter SQLAlchemy in meinem Code überprüfen. Ich habe eine kleine Hilfsfunktion erstellt, die mir das Debuggen erleichtert.

Links / Impressum

Get filtered values from SQL Alchemy ORM Query
https://stackoverflow.com/questions/47478837/get-filtered-values-from-sql-alchemy-orm-query

SQLAlchemy - Column Elements and Expressions
https://docs.sqlalchemy.org/en/20/core/sqlelement.html

Mehr erfahren

SQLAlchemy

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.