是否有一种简单的方法来遍历列名和值对?

我的SQLAlchemy版本是0.5.6

下面是我尝试使用dict(row)的示例代码:

import sqlalchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

print "sqlalchemy version:",sqlalchemy.__version__ 

engine = create_engine('sqlite:///:memory:', echo=False)
metadata = MetaData()
users_table = Table('users', metadata,
     Column('id', Integer, primary_key=True),
     Column('name', String),
)
metadata.create_all(engine) 

class User(declarative_base()):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    
    def __init__(self, name):
        self.name = name

Session = sessionmaker(bind=engine)
session = Session()

user1 = User("anurag")
session.add(user1)
session.commit()

# uncommenting next line throws exception 'TypeError: 'User' object is not iterable'
#print dict(user1)
# this one also throws 'TypeError: 'User' object is not iterable'
for u in session.query(User).all():
    print dict(u)

在我的系统输出上运行这段代码:

Traceback (most recent call last):
  File "untitled-1.py", line 37, in <module>
    print dict(u)
TypeError: 'User' object is not iterable

当前回答

使用字典推导式

for u in session.query(User).all():
    print ({column.name: str(getattr(row, column.name)) for column in row.__table__.columns})

其他回答

如OP所述,调用dict初始化器会引发一个异常,消息为“User”对象不可迭代。所以真正的问题是如何使一个SQLAlchemy模型可迭代?

We'll have to implement the special methods __iter__ and __next__, but if we inherit directly from the declarative_base model, we would still run into the undesirable "_sa_instance_state" key. What's worse, is we would have to loop through __dict__.keys() for every call to __next__ because the keys() method returns a View -- an iterable that is not indexed. This would increase the time complexity by a factor of N, where N is the number of keys in __dict__. Generating the dict would cost O(N^2). We can do better.

我们可以实现自己的基类,它实现所需的特殊方法,并存储可以通过索引访问的列名列表,从而降低生成O(N)字典的时间复杂性。这有一个额外的好处,我们可以定义一次逻辑,并在任何时候从基类继承,我们希望我们的模型类是可迭代的。

class IterableBase(declarative_base()):
    __abstract__ = True

    def _init_keys(self):
        self._keys = [c.name for c in self.__table__.columns]
        self._dict = {c.name: getattr(self, c.name) for c in self.__table__.columns}

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._init_keys()

    def __setattr__(self, name, value):
        super().__setattr__(name, value)
        if name not in ('_dict', '_keys', '_n') and '_dict' in self.__dict__:
            self._dict[name] = value

    def __iter__(self):
        self._n = 0
        return self

    def __next__(self):
        if self._n >= len(self._keys):
            raise StopIteration
        self._n += 1
        key = self._keys[self._n-1]
        return (key, self._dict[key])

现在User类可以直接从IterableBase类继承。

class User(IterableBase):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)

您可以确认,以User实例作为参数调用dict函数将返回所需的字典,没有"_sa_instance_state"。你可能已经注意到在IterableBase类中声明的__setattr__方法。这确保在初始化后属性发生变化或设置时更新_dict。

def main():
    user1 = User('Bob')
    print(dict(user1))
    # outputs {'id': None, 'name': 'Bob'}
    user1.id = 42
    print(dict(user1))
    # outputs {'id': 42, 'name': 'Bob'}

if __name__ == '__main__':
    main()

@zzzeek在评论中写道:

注意,这是现代版本的正确答案 SQLAlchemy,假设“row”是核心行对象,而不是orm映射对象 实例。

for row in resultproxy:
    row_as_dict = row._mapping  # SQLAlchemy 1.4 and greater
    # row_as_dict = dict(row)  # SQLAlchemy 1.3 and earlier

行背景。_mapping, SQLAlchemy 1.4新增:https://docs.sqlalchemy.org/en/stable/core/connections.html#sqlalchemy.engine.Row._mapping

为了完成@Anurag Uniyal的回答,这里有一个递归地遵循关系的方法:

from sqlalchemy.inspection import inspect

def to_dict(obj, with_relationships=True):
    d = {}
    for column in obj.__table__.columns:
        if with_relationships and len(column.foreign_keys) > 0:
             # Skip foreign keys
            continue
        d[column.name] = getattr(obj, column.name)

    if with_relationships:
        for relationship in inspect(type(obj)).relationships:
            val = getattr(obj, relationship.key)
            d[relationship.key] = to_dict(val) if val else None
    return d

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    first_name = Column(TEXT)
    address_id = Column(Integer, ForeignKey('addresses.id')
    address = relationship('Address')

class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    city = Column(TEXT)


user = User(first_name='Nathan', address=Address(city='Lyon'))
# Add and commit user to session to create ids

to_dict(user)
# {'id': 1, 'first_name': 'Nathan', 'address': {'city': 'Lyon'}}
to_dict(user, with_relationship=False)
# {'id': 1, 'first_name': 'Nathan', 'address_id': 1}

返回this:class:的内容。KeyedTuple作为字典

In [46]: result = aggregate_events[0]

In [47]: type(result)
Out[47]: sqlalchemy.util._collections.result

In [48]: def to_dict(query_result=None):
    ...:     cover_dict = {key: getattr(query_result, key) for key in query_result.keys()}
    ...:     return cover_dict
    ...: 
    ...:     

In [49]: to_dict(result)
Out[49]: 
{'calculate_avg': None,
 'calculate_max': None,
 'calculate_min': None,
 'calculate_sum': None,
 'dataPointIntID': 6,
 'data_avg': 10.0,
 'data_max': 10.0,
 'data_min': 10.0,
 'data_sum': 60.0,
 'deviceID': u'asas',
 'productID': u'U7qUDa',
 'tenantID': u'CvdQcYzUM'}

使用以下SQLAlchemy代码查询数据库后:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


SQLALCHEMY_DATABASE_URL = 'sqlite:///./examples/sql_app.db'
engine = create_engine(SQLALCHEMY_DATABASE_URL, echo=True)
query = sqlalchemy.select(TABLE)
result = engine.execute(query).fetchall()

你可以使用下面的一行代码:

query_dict = [record._mapping for record in results]