是否有一种简单的方法来遍历列名和值对?

我的SQLAlchemy版本是0.5.6

下面是我尝试使用dict(row)的示例代码:

import sqlalchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

print "sqlalchemy version:",sqlalchemy.__version__ 

engine = create_engine('sqlite:///:memory:', echo=False)
metadata = MetaData()
users_table = Table('users', metadata,
     Column('id', Integer, primary_key=True),
     Column('name', String),
)
metadata.create_all(engine) 

class User(declarative_base()):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    
    def __init__(self, name):
        self.name = name

Session = sessionmaker(bind=engine)
session = Session()

user1 = User("anurag")
session.add(user1)
session.commit()

# uncommenting next line throws exception 'TypeError: 'User' object is not iterable'
#print dict(user1)
# this one also throws 'TypeError: 'User' object is not iterable'
for u in session.query(User).all():
    print dict(u)

在我的系统输出上运行这段代码:

Traceback (most recent call last):
  File "untitled-1.py", line 37, in <module>
    print dict(u)
TypeError: 'User' object is not iterable

当前回答

正如@balki提到的:

如果您正在查询特定的字段,可以使用_asdict()方法,因为它作为KeyedTuple返回。

In [1]: foo = db.session.query(Topic.name).first()
In [2]: foo._asdict()
Out[2]: {'name': u'blah'}

然而,如果您没有指定列,则可以使用其他建议的方法之一——例如@charlax提供的方法。注意,此方法仅对2.7+有效。

In [1]: foo = db.session.query(Topic).first()
In [2]: {x.name: getattr(foo, x.name) for x in foo.__table__.columns}
Out[2]: {'name': u'blah'}

其他回答

老问题,但由于这是谷歌中“sqlalchemy row to dict”的第一个结果,它值得一个更好的答案。

SqlAlchemy返回的RowProxy对象具有items()方法: http://docs.sqlalchemy.org/en/latest/core/connections.html#sqlalchemy.engine.RowProxy.items

它只是返回一个(key, value)元组列表。因此可以使用以下方法将行转换为dict:

在Python中<= 2.6:

rows = conn.execute(query)
list_of_dicts = [dict((key, value) for key, value in row.items()) for row in rows]

在Python中>= 2.7:

rows = conn.execute(query)
list_of_dicts = [{key: value for (key, value) in row.items()} for row in rows]

如果你的模型表列不需要mysql列。

例如:

class People:
    id: int = Column(name='id', type_=Integer, primary_key=True)
    createdTime: datetime = Column(name='create_time', type_=TIMESTAMP,
                               nullable=False,
                               server_default=text("CURRENT_TIMESTAMP"),
                               default=func.now())
    modifiedTime: datetime = Column(name='modify_time', type_=TIMESTAMP,
                                server_default=text("CURRENT_TIMESTAMP"),
                                default=func.now())

需要使用:

 from sqlalchemy.orm import class_mapper 
 def asDict(self):
        return {x.key: getattr(self, x.key, None) for x in
            class_mapper(Application).iterate_properties}

如果你使用这种方式,你可以得到modify_time和create_time都是None

{'id': 1, 'create_time': None, 'modify_time': None}


    def to_dict(self):
        return {c.name: getattr(self, c.name, None)
         for c in self.__table__.columns}

因为类属性名称不等于列存储在mysql

@zzzeek在评论中写道:

注意,这是现代版本的正确答案 SQLAlchemy,假设“row”是核心行对象,而不是orm映射对象 实例。

for row in resultproxy:
    row_as_dict = row._mapping  # SQLAlchemy 1.4 and greater
    # row_as_dict = dict(row)  # SQLAlchemy 1.3 and earlier

行背景。_mapping, SQLAlchemy 1.4新增:https://docs.sqlalchemy.org/en/stable/core/connections.html#sqlalchemy.engine.Row._mapping

如OP所述,调用dict初始化器会引发一个异常,消息为“User”对象不可迭代。所以真正的问题是如何使一个SQLAlchemy模型可迭代?

We'll have to implement the special methods __iter__ and __next__, but if we inherit directly from the declarative_base model, we would still run into the undesirable "_sa_instance_state" key. What's worse, is we would have to loop through __dict__.keys() for every call to __next__ because the keys() method returns a View -- an iterable that is not indexed. This would increase the time complexity by a factor of N, where N is the number of keys in __dict__. Generating the dict would cost O(N^2). We can do better.

我们可以实现自己的基类,它实现所需的特殊方法,并存储可以通过索引访问的列名列表,从而降低生成O(N)字典的时间复杂性。这有一个额外的好处,我们可以定义一次逻辑,并在任何时候从基类继承,我们希望我们的模型类是可迭代的。

class IterableBase(declarative_base()):
    __abstract__ = True

    def _init_keys(self):
        self._keys = [c.name for c in self.__table__.columns]
        self._dict = {c.name: getattr(self, c.name) for c in self.__table__.columns}

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._init_keys()

    def __setattr__(self, name, value):
        super().__setattr__(name, value)
        if name not in ('_dict', '_keys', '_n') and '_dict' in self.__dict__:
            self._dict[name] = value

    def __iter__(self):
        self._n = 0
        return self

    def __next__(self):
        if self._n >= len(self._keys):
            raise StopIteration
        self._n += 1
        key = self._keys[self._n-1]
        return (key, self._dict[key])

现在User类可以直接从IterableBase类继承。

class User(IterableBase):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)

您可以确认,以User实例作为参数调用dict函数将返回所需的字典,没有"_sa_instance_state"。你可能已经注意到在IterableBase类中声明的__setattr__方法。这确保在初始化后属性发生变化或设置时更新_dict。

def main():
    user1 = User('Bob')
    print(dict(user1))
    # outputs {'id': None, 'name': 'Bob'}
    user1.id = 42
    print(dict(user1))
    # outputs {'id': 42, 'name': 'Bob'}

if __name__ == '__main__':
    main()

我是一个新晋的Python程序员,遇到了使用join表获取JSON的问题。使用这里的答案中的信息,我构建了一个函数,将合理的结果返回到JSON,其中包括表名,避免使用别名或字段冲突。

简单地传递会话查询的结果:

test = Session()。查询(VMInfo、客户). join(客户).order_by (VMInfo.vm_name) .limit (50) .offset (10)

json = sqlAl2json(test)

def sqlAl2json(self, result):
    arr = []
    for rs in result.all():
        proc = []
        try:
            iterator = iter(rs)
        except TypeError:
            proc.append(rs)
        else:
            for t in rs:
                proc.append(t)

        dict = {}
        for p in proc:
            tname = type(p).__name__
            for d in dir(p):
                if d.startswith('_') | d.startswith('metadata'):
                    pass
                else:
                    key = '%s_%s' %(tname, d)
                    dict[key] = getattr(p, d)
        arr.append(dict)
    return json.dumps(arr)