angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

SLQAlchemy dynamische Abfrageerstellung und -filterung einschließlich soft deletes

Dieser Beitrag zeigt, wie Sie einen Query Builder für alle Ihre ausgewählten Abfragen erstellen können.

21 Juni 2019
post main image
Original photo unsplash.com/@grohsfabian.

Aufbauend auf dem vorherigen Beitrag 'Flask, Jinja2 und SLQAlchemy many-to-many relationship with conditions' suchte ich nach einem Weg, Filterbedingungen dynamisch hinzuzufügen und wenn möglich auch eine Lösung für das soft delete Muster zu finden.

Beim Soft Delete werden Datensätze nicht aus einer Tabelle gelöscht, sondern als gelöscht markiert. Das bedeutet, dass jede Tabelle ein Löschkennzeichen haben muss und alle Abfragen Datensätze ausschließen müssen, die als gelöscht markiert sind. Denn ORM so SQLAlchemy etwas ist umso komplexer, als es sich nicht um Datensätze, sondern um Objekte handelt. Die Implementierung soft delete ist schwierig, aber nicht nur beschränkt auf soft deletesjede Klasse in meinem Modell hat auch ein Statusfeld. Dies kann verwendet werden, um die Anzeige dieses Objekts für Besucher ohne Administratorstatus vorübergehend zu deaktivieren.

Ich glaube, der beste Weg zur Umsetzung soft delete und/oder zum Status ist, dies in sich SQLAlchemy selbst zu integrieren und als Nutzenfunktion zur Verfügung zu stellen. Dies ist jedoch nicht der Fall, aber einige Rezepte sind verfügbar (mit der Option before_compile).

Auf der Suche nach Select-Only-Abfragen entschied ich mich, meinen eigenen Query Builder für Select Statements zu entwickeln. Voraussetzung war, dass mehr als eine Klasse, z.B.[Parent, Child], und/oder Spalten, z.B.[Parent.id, Child], hinzugefügt werden können und dass es möglich sein sollte, Filterbedingungen dynamisch hinzuzufügen, einschließlich der automatischen Hinzufügung der gelöschten Spalte und Statusspalte. Nachfolgend finden Sie einige Referenzen zum Thema Dynamic Query Building.

Natürlich habe ich auch andere Probleme wie: AttributeError: Das Objekt'scoped_session' hat kein Attribut'_autoflush'. Glücklicherweise hat jemand eine Lösung dafür gefunden, siehe Referenzen.

Als nächstes habe ich ein Backup gemacht und danach angefangen, dieses zu verwenden. Falls ich auf Probleme stoße, kann ich jederzeit das Rezept FilteredQuery hinzufügen.

Natürlich nimmt dies einen Teil des "Spaßes" am Schreiben SQLAlchemy von Abfragen weg, aber hey, lasst uns funktionierende Anwendungen erstellen!

Für den Fall, dass du das ausprobieren willst:

from sqlalchemy import Table, Column, Integer, String, Boolean, BigInteger, DateTime, ForeignKey, func, and_, or_, desc, asc, create_engine, inspect, sql
from sqlalchemy.orm import relationship, Session, with_polymorphic, backref, contains_eager, Query
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func, label
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy import inspect

import os
import sys


Base = declarative_base()

# many-to-many link table: parent - child
parent_mtm_child_table = Table('parent_mtm_child', Base.metadata,
    Column('parent_id', Integer, ForeignKey('parent.id')),
    Column('child_id', Integer, ForeignKey('child.id'))
)


class Parent(Base):
    __tablename__ = 'parent'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    # many-to-many relationship with child
    children = relationship(
        'Child', 
        secondary=parent_mtm_child_table)

    def __repr__(self):
        return "%s(name=%r)" % \
            (self.__class__.__name__, self.name) 


class Child(Base):
    __tablename__ = 'child'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    age = Column(Integer)
    hair_color = Column(String)
    # many-to-many relationship with parent
    parents = relationship(
        'Parent',
        secondary=parent_mtm_child_table)

    def __repr__(self):
        return "%s(name=%r, age=%r, hair_color=%r)" % \
            (self.__class__.__name__, self.name,
            self.age, self.hair_color) 


# show/hide sql
#engine = create_engine('sqlite://', echo=True)
engine = create_engine('sqlite://')
Base.metadata.create_all(engine)

#session = sessionmaker()
#session.configure(bind=engine)
#db = session()
# use scoped_session
db = scoped_session(sessionmaker(bind=engine))

# Attaching a pre-built query to a scoped_session in SQLAlchemy
# https://stackoverflow.com/questions/43685758/attaching-a-pre-built-query-to-a-scoped-session-in-sqlalchemy
db_local = db()


STATUS_ENABLED = 1

# parents
john = Parent(name='John', status=STATUS_ENABLED)
mary = Parent(name='Mary', status=STATUS_ENABLED)
gina = Parent(name='Gina', status=STATUS_ENABLED)
ryan = Parent(name='Ryan', status=STATUS_ENABLED)
eric = Parent(name='Eric', status=STATUS_ENABLED)
# children
liam = Child(name='Liam', age=6, hair_color='brown', status=STATUS_ENABLED)
emma = Child(name='Emma', age=8, hair_color='blond', status=STATUS_ENABLED)
alex = Child(name='Alex', age=10, hair_color='blond', status=STATUS_ENABLED)
sara = Child(name='Sara', age=9, hair_color='blond', status=STATUS_ENABLED)
rose = Child(name='Rose', age=9, hair_color='blond', status=STATUS_ENABLED)
# assign children to parents
john.children.append(liam)
john.children.append(emma)
john.children.append(alex)
mary.children.append(liam)
gina.children.append(sara)
eric.children.append(rose)
db.add_all([john, mary, gina, ryan, eric, liam, emma, alex, sara, rose])
db.commit()

# delete some
sara.deleted = True
db.commit()

''' 

db_select()
description: build a select query based on input,
also filter on deleted and status attributes


example 1: single table/object select
-------------------------------------------------------

qry = db_select(
    model_class_list=[Parent], 
    order_by_list=[(parent, 'name', 'asc')]
).first()


example 2: two table with many-to-many select
-------------------------------------------------------

qry = db_select(
   model_class_list=[Parent, Child], 
   filter_by_list=[
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
   ],
   order_by_list=[
       (Child, 'name', 'asc'), 
   ],
   limit 10, offset 4, 
).all()


example 3: select column attribute instead of object
-------------------------------------------------------

qry = db_select(
   model_class=[(Parent, 'id'), Child], 
   filter_by_list=[
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
   ],
   order_by_list=[
       (Child, 'name', 'asc'), 
   ],
   limit 10, offset 4, 
).all()


''' 

def db_select(model_class_list=None, filter_by_list=None, order_by_list=None, limit=None, offset=None, filter_deleted=False, filter_status=STATUS_ENABLED):
    fname = 'db_select'
    dbg_print = False

    if dbg_print:
        print(fname + ": len(model_class_list) = {}".format(len(model_class_list)))

    if filter_by_list == None:
        filter_by_list = []
    if order_by_list == None:
        order_by_list = []

    if not isinstance(model_class_list, list):
        raise Exception('model_class_list not list')
    if not isinstance(filter_by_list, list):
        raise Exception('filter_by_list not list')
    if not isinstance(order_by_list, list):
        raise Exception('order_by_list not list')

    # collector for model_classes
    mcs = []
    # collector for columns
    columns = []
    for model_class_item in model_class_list:
        if isinstance(model_class_item, tuple):
            m, key = model_class_item
            column = getattr(m, key, None)
            columns.append(column)
            mcs.append(m)
        else:
            columns.append(model_class_item)
            mcs.append(model_class_item)
    query = Query(columns)

    if dbg_print:
        print(fname + ": after creating query, query = {}".format(query))

    # add deleted filter if column deleted exists
    if not filter_deleted is None:
        for model_class in mcs:
            if 'deleted' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'deleted', 'eq', filter_deleted) )

    # add status filter if column status exists
    if not filter_status is None:
        for model_class in mcs:
            if 'status' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'status', 'eq', filter_status) )

    if dbg_print:
        # filter_by_items
        for filter_by_item in filter_by_list:
            print(fname + ": filter_by_item = {}".format(filter_by_item))
        # order_by_items
        for order_by_item in order_by_list:
            print(fname + ": order_by_item = {}".format(order_by_item))

    for filter_by_item in filter_by_list:
        if dbg_print:
            print(fname + ": processing filter_by_item = {}".format(filter_by_item))
        try:
            model_class, key, op, value = filter_by_item
        except ValueError:
            raise Exception('Invalid filter_by_item: %s' % filter_by_item)

        if dbg_print:
           print(fname + ": processing key, op, value = {}, {}, {}".format(key, op, value))

        column = getattr(model_class, key, None)
        if not column:
            raise Exception('Invalid filter column: %s' % key)

        if op == 'in':
            if isinstance(value, list):
                filt = column.in_(value)
            else:
                filt = column.in_(value.split(','))

            if dbg_print:
                print(fname + ": if, filt = {}".format(filt))
        else:
            try:
                attr = list(filter(
                    lambda e: hasattr(column, e % op),
                    ['%s', '%s_', '__%s__']
                ))[0] % op
            except IndexError:
                raise Exception('Invalid filter operator: %s' % op)

            if dbg_print:
                print(fname + ": processing filter_cond, attr = {}".format(attr))

            if value == 'null':
                value = None
            filt = getattr(column, attr)(value)

            if dbg_print:
                print(fname + ": else, filt = {}".format(filt))

        if dbg_print:
            print(fname + ": adding filt")
        query = query.filter(filt)

    for order_by_item in order_by_list:
        if dbg_print:
            print(fname + ": processing order_by_item = {}".format(order_by_item))

        try:
            model_class, key, op = order_by_item
        except ValueError:
            raise Exception('Invalid order_by_item: %s' % order_by_item)

        if dbg_print:
            print(fname + ": processing model_class = {}, key = {}, op = {}".format(model_class, key, op))

        column = getattr(model_class, key, None)
        column_sorted = getattr(column, op)()
        query = query.order_by(column_sorted)        

    if limit:
        if dbg_print:
            print(fname + ": processing limit = {}".format(limit))
        query = query.limit(limit)

    if offset:
        if dbg_print:
            print(fname + ": processing offset = {}".format(offset))
        query = query.offset(offset)

    if dbg_print:
        print(fname + ": after building query, query = {}".format(query))
    return query.with_session(db_local)


print("\nPARENTS\n")

parents = db_select(
    model_class_list=[Parent], 
    order_by_list=[
        (Parent, 'name', 'asc'), 
    ],
    limit=3,
).all()

for parent in parents:
    print("parent.name = {}".format(parent.name))	


# get parent_ids for next query
parent_ids = [parent.id for parent in parents]
print("parent_ids = {}".format(parent_ids))


print("\nPARENTS-CHILDREN\n")

parent_child_tuples = db_select(
    model_class_list=[Parent, Child], 
    filter_by_list=[
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
    ],
    order_by_list=[
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    ],
    limit=10, offset=0,
).all()

# show tuples
print("parent_child_tuples: {}".format(parent_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent, child in parent_child_tuples:
    parent_id2children[parent.id].append(child)

# show parent_id2children
print("parent_id2children:")
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	


print("\nPARENT_IDS-CHILDREN\n")

parent_id_child_tuples = db_select(
    model_class_list=[(Parent, 'id'), Child], 
    filter_by_list=[
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
    ],
    order_by_list=[
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    ],
    limit=10, offset=0,
).all()

# show tuples
print("parent_id_child_tuples: {}".format(parent_id_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent_id, child in parent_id_child_tuples:
    parent_id2children[parent_id].append(child)

# show parent_id2children
print("\nparent_id2children:")
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	


print("\nshow columns:")

# debug: show columns in parent 
for c in Parent.__table__.columns:
    print("parent table column c = {}".format(c))

# debug: show columns in parent using inspect
from sqlalchemy import inspect
mapper = inspect(Parent)
for column in mapper.attrs:
    print("column.key = {}".format(column.key))

for key in inspect(Parent).columns.keys():
    print("key = {}".format(key))

if 'deleted' in inspect(Parent).columns.keys():
    print("deleted found")
else:
    print("deleted NOT found")

Links / Impressum

Attaching a pre-built query to a scoped_session in SQLAlchemy
https://stackoverflow.com/questions/43685758/attaching-a-pre-built-query-to-a-scoped-session-in-sqlalchemy

Dynamically constructing filters based on string input using SQLAlchemy
https://ruddra.com/posts/dynamically-constructing-filters-based-on-string-input-using-sqlalchemy/

Dynamically constructing filters in SQLAlchemy
https://ruddra.com/posts/dynamically-constructing-filters-based-on-string-input-using-sqlalchemy/

FilteredQuery
https://github.com/sqlalchemy/sqlalchemy/wiki/FilteredQuery

Implementing the "Soft Delete" Pattern with Flask and SQLAlchemy
https://blog.miguelgrinberg.com/post/implementing-the-soft-delete-pattern-with-flask-and-sqlalchemy

method of iterating over sqlalchemy model's defined columns?
https://stackoverflow.com/questions/2537471/method-of-iterating-over-sqlalchemy-models-defined-columns

Python - SqlAlchemy: convert lists of tuples to list of atomic values [duplicate]
https://stackoverflow.com/questions/44355850/python-sqlalchemy-convert-lists-of-tuples-to-list-of-atomic-values

Mehr erfahren

SQLAlchemy

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare (1)

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.

avatar

how does one do or_ with this?