是否有一种简单的方法来遍历列名和值对?

我的SQLAlchemy版本是0.5.6

下面是我尝试使用dict(row)的示例代码:

import sqlalchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

print "sqlalchemy version:",sqlalchemy.__version__ 

engine = create_engine('sqlite:///:memory:', echo=False)
metadata = MetaData()
users_table = Table('users', metadata,
     Column('id', Integer, primary_key=True),
     Column('name', String),
)
metadata.create_all(engine) 

class User(declarative_base()):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    
    def __init__(self, name):
        self.name = name

Session = sessionmaker(bind=engine)
session = Session()

user1 = User("anurag")
session.add(user1)
session.commit()

# uncommenting next line throws exception 'TypeError: 'User' object is not iterable'
#print dict(user1)
# this one also throws 'TypeError: 'User' object is not iterable'
for u in session.query(User).all():
    print dict(u)

在我的系统输出上运行这段代码:

Traceback (most recent call last):
  File "untitled-1.py", line 37, in <module>
    print dict(u)
TypeError: 'User' object is not iterable

当前回答

一个适用于继承类的解决方案:

from itertools import chain
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()


class Mixin(object):
    def as_dict(self):
        tables = [base.__table__ for base in self.__class__.__bases__ if base not in [Base, Mixin]]
        tables.append(self.__table__)
        return {c.name: getattr(self, c.name) for c in chain.from_iterable([x.columns for x in tables])}

其他回答

行有一个_asdict()函数,它给出一个字典

In [8]: r1 = db.session.query(Topic.name).first()

In [9]: r1
Out[9]: (u'blah')

In [10]: r1.name
Out[10]: u'blah'

In [11]: r1._asdict()
Out[11]: {'name': u'blah'}
from copy import copy

def to_record(row):
    record = copy(row.__dict__)
    del record["_sa_instance_state"]
    return record

如果不使用复制,可能会遇到错误。

使用以下SQLAlchemy代码查询数据库后:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


SQLALCHEMY_DATABASE_URL = 'sqlite:///./examples/sql_app.db'
engine = create_engine(SQLALCHEMY_DATABASE_URL, echo=True)
query = sqlalchemy.select(TABLE)
result = engine.execute(query).fetchall()

你可以使用下面的一行代码:

query_dict = [record._mapping for record in results]

如OP所述,调用dict初始化器会引发一个异常,消息为“User”对象不可迭代。所以真正的问题是如何使一个SQLAlchemy模型可迭代?

We'll have to implement the special methods __iter__ and __next__, but if we inherit directly from the declarative_base model, we would still run into the undesirable "_sa_instance_state" key. What's worse, is we would have to loop through __dict__.keys() for every call to __next__ because the keys() method returns a View -- an iterable that is not indexed. This would increase the time complexity by a factor of N, where N is the number of keys in __dict__. Generating the dict would cost O(N^2). We can do better.

我们可以实现自己的基类,它实现所需的特殊方法,并存储可以通过索引访问的列名列表,从而降低生成O(N)字典的时间复杂性。这有一个额外的好处,我们可以定义一次逻辑,并在任何时候从基类继承,我们希望我们的模型类是可迭代的。

class IterableBase(declarative_base()):
    __abstract__ = True

    def _init_keys(self):
        self._keys = [c.name for c in self.__table__.columns]
        self._dict = {c.name: getattr(self, c.name) for c in self.__table__.columns}

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._init_keys()

    def __setattr__(self, name, value):
        super().__setattr__(name, value)
        if name not in ('_dict', '_keys', '_n') and '_dict' in self.__dict__:
            self._dict[name] = value

    def __iter__(self):
        self._n = 0
        return self

    def __next__(self):
        if self._n >= len(self._keys):
            raise StopIteration
        self._n += 1
        key = self._keys[self._n-1]
        return (key, self._dict[key])

现在User类可以直接从IterableBase类继承。

class User(IterableBase):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)

您可以确认,以User实例作为参数调用dict函数将返回所需的字典,没有"_sa_instance_state"。你可能已经注意到在IterableBase类中声明的__setattr__方法。这确保在初始化后属性发生变化或设置时更新_dict。

def main():
    user1 = User('Bob')
    print(dict(user1))
    # outputs {'id': None, 'name': 'Bob'}
    user1.id = 42
    print(dict(user1))
    # outputs {'id': 42, 'name': 'Bob'}

if __name__ == '__main__':
    main()

使用字典推导式

for u in session.query(User).all():
    print ({column.name: str(getattr(row, column.name)) for column in row.__table__.columns})