是否有一种简单的方法来遍历列名和值对?

我的SQLAlchemy版本是0.5.6

下面是我尝试使用dict(row)的示例代码:

import sqlalchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

print "sqlalchemy version:",sqlalchemy.__version__ 

engine = create_engine('sqlite:///:memory:', echo=False)
metadata = MetaData()
users_table = Table('users', metadata,
     Column('id', Integer, primary_key=True),
     Column('name', String),
)
metadata.create_all(engine) 

class User(declarative_base()):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    
    def __init__(self, name):
        self.name = name

Session = sessionmaker(bind=engine)
session = Session()

user1 = User("anurag")
session.add(user1)
session.commit()

# uncommenting next line throws exception 'TypeError: 'User' object is not iterable'
#print dict(user1)
# this one also throws 'TypeError: 'User' object is not iterable'
for u in session.query(User).all():
    print dict(u)

在我的系统输出上运行这段代码:

Traceback (most recent call last):
  File "untitled-1.py", line 37, in <module>
    print dict(u)
TypeError: 'User' object is not iterable

@zzzeek在评论中写道:

注意,这是现代版本的正确答案 SQLAlchemy,假设“row”是核心行对象,而不是orm映射对象 实例。

for row in resultproxy:
    row_as_dict = row._mapping  # SQLAlchemy 1.4 and greater
    # row_as_dict = dict(row)  # SQLAlchemy 1.3 and earlier

行背景。_mapping, SQLAlchemy 1.4新增:https://docs.sqlalchemy.org/en/stable/core/connections.html#sqlalchemy.engine.Row._mapping


我不能得到一个好的答案,所以我用这个:

def row2dict(row):
    d = {}
    for column in row.__table__.columns:
        d[column.name] = str(getattr(row, column.name))

    return d

编辑:如果上面的函数太长,不适合某些口味,这里是一个一行(python 2.7+)

row2dict = lambda r: {c.name: str(getattr(r, c.name)) for c in r.__table__.columns}

正在迭代的表达式求值为模型对象列表,而不是行。下面是正确的用法:

for u in session.query(User).all():
    print u.id, u.name

你真的需要把它们转换成字典吗?当然,有很多方法,但是你不需要SQLAlchemy的ORM部分:

result = session.execute(User.__table__.select())
for row in result:
    print dict(row)

更新:看一下sqlalchemy.orm.attributes模块。它有一组处理对象状态的函数,这可能对您很有用,特别是instance_dict()。


class User(object):
    def to_dict(self):
        return dict([(k, getattr(self, k)) for k in self.__dict__.keys() if not k.startswith("_")])

这应该有用。


from sqlalchemy.orm import class_mapper

def asdict(obj):
    return dict((col.name, getattr(obj, col.name))
                for col in class_mapper(obj.__class__).mapped_table.c)

我对马可·马里亚尼(Marco Mariani)的回答有一个变体,以装饰者的身份表达。主要的区别是它将处理实体列表,以及安全地忽略一些其他类型的返回值(这在使用mock编写测试时非常有用):

@decorator
def to_dict(f, *args, **kwargs):
  result = f(*args, **kwargs)
  if is_iterable(result) and not is_dict(result):
    return map(asdict, result)

  return asdict(result)

def asdict(obj):
  return dict((col.name, getattr(obj, col.name))
              for col in class_mapper(obj.__class__).mapped_table.c)

def is_dict(obj):
  return isinstance(obj, dict)

def is_iterable(obj):
  return True if getattr(obj, '__iter__', False) else False

Elixir是这样做的。这个解决方案的价值在于,它允许递归地包括关系的字典表示。

def to_dict(self, deep={}, exclude=[]):
    """Generate a JSON-style nested dict/list structure from an object."""
    col_prop_names = [p.key for p in self.mapper.iterate_properties \
                                  if isinstance(p, ColumnProperty)]
    data = dict([(name, getattr(self, name))
                 for name in col_prop_names if name not in exclude])
    for rname, rdeep in deep.iteritems():
        dbdata = getattr(self, rname)
        #FIXME: use attribute names (ie coltoprop) instead of column names
        fks = self.mapper.get_property(rname).remote_side
        exclude = [c.name for c in fks]
        if dbdata is None:
            data[rname] = None
        elif isinstance(dbdata, list):
            data[rname] = [o.to_dict(rdeep, exclude) for o in dbdata]
        else:
            data[rname] = dbdata.to_dict(rdeep, exclude)
    return data

你可以访问SQLAlchemy对象的内部__dict__,如下所示:

for u in session.query(User).all():
    print u.__dict__

我找到这篇文章是因为我正在寻找一种将SQLAlchemy行转换为dict的方法。我正在使用SqlSoup…但答案是我自己想出来的,所以,如果它能帮助到别人,我的意见是:

a = db.execute('select * from acquisizioni_motes')
b = a.fetchall()
c = b[0]

# and now, finally...
dict(zip(c.keys(), c.values()))

这里有一个超级简单的方法

row2dict = lambda r: dict(r.items())

行有一个_asdict()函数,它给出一个字典

In [8]: r1 = db.session.query(Topic.name).first()

In [9]: r1
Out[9]: (u'blah')

In [10]: r1.name
Out[10]: u'blah'

In [11]: r1._asdict()
Out[11]: {'name': u'blah'}

正如@balki提到的:

如果您正在查询特定的字段,可以使用_asdict()方法,因为它作为KeyedTuple返回。

In [1]: foo = db.session.query(Topic.name).first()
In [2]: foo._asdict()
Out[2]: {'name': u'blah'}

然而,如果您没有指定列,则可以使用其他建议的方法之一——例如@charlax提供的方法。注意,此方法仅对2.7+有效。

In [1]: foo = db.session.query(Topic).first()
In [2]: {x.name: getattr(foo, x.name) for x in foo.__table__.columns}
Out[2]: {'name': u'blah'}

在@balki回答之后,从SQLAlchemy 0.8开始,您可以使用_asdict(),可用于KeyedTuple对象。这为最初的问题提供了一个非常直接的答案。只是,在你的例子中改变最后两行(for循环):

for u in session.query(User).all():
   print u._asdict()

这是因为在上面的代码中u是类型类KeyedTuple的对象,因为.all()返回KeyedTuple的列表。因此,它有_asdict()方法,该方法很好地将u作为字典返回。

WRT @STB: AFAIK的答案,任何。all()返回的是一个KeypedTuple的列表。因此,无论是否指定列,只要处理的是应用于Query对象的.all()的结果,上述方法都是有效的。


假设下列函数将被添加到User类中,下面将返回所有列的所有键值对:

def columns_to_dict(self):
    dict_ = {}
    for key in self.__mapper__.c.keys():
        dict_[key] = getattr(self, key)
    return dict_

与其他答案不同的是,只有对象的那些属性被返回,这些属性是对象类级别的列属性。因此,不包括_sa_instance_state或SQLalchemy或您添加到对象中的任何其他属性。参考

编辑:忘记说,这也适用于继承的列。

hybrid_property延伸

如果你还想包含hybrid_property属性,下面的方法可以工作:

from sqlalchemy import inspect
from sqlalchemy.ext.hybrid import hybrid_property

def publics_to_dict(self) -> {}:
    dict_ = {}
    for key in self.__mapper__.c.keys():
        if not key.startswith('_'):
            dict_[key] = getattr(self, key)

    for key, prop in inspect(self.__class__).all_orm_descriptors.items():
        if isinstance(prop, hybrid_property):
            dict_[key] = getattr(self, key)
    return dict_

我假设您在这里用_开头标记Columns,以表明您想隐藏它们,或者是因为您通过hybrid_property访问属性,或者您只是不想显示它们。参考

Tipp all_orm_descriptors还返回hybrid_method和AssociationProxy,如果你也想包括它们的话。

其他答案备注

每个基于__dict__属性的答案(如1,2)只是返回对象的所有属性。这可以是你想要的更多的属性。如我所说,这包括_sa_instance_state或您在该对象上定义的任何其他属性。

基于dict()函数的每个答案(如1,2)只适用于session.execute()返回的SQLalchemy行对象,而不适用于您定义要使用的类,如问题中的User类。

基于row.__table__的求解答案。列肯定不行。row.__table__。columns包含SQL数据库的列名。这些只能等于python对象的属性名。如果不是,你会得到一个AttributeError。 对于基于class_mapper(obj.__class__).mapped_table.c的答案(如1,2)也是一样的。


在大多数情况下,列名适合它们。但是你可能会像下面这样写代码:

class UserModel(BaseModel):
    user_id = Column("user_id", INT, primary_key=True)
    email = Column("user_email", STRING)

column.name“user_email”而字段名是“email”,column.name不能像以前那样工作。

sqlalchemy_base_model.py

我把答案写在这里


我是一个新晋的Python程序员,遇到了使用join表获取JSON的问题。使用这里的答案中的信息,我构建了一个函数,将合理的结果返回到JSON,其中包括表名,避免使用别名或字段冲突。

简单地传递会话查询的结果:

test = Session()。查询(VMInfo、客户). join(客户).order_by (VMInfo.vm_name) .limit (50) .offset (10)

json = sqlAl2json(test)

def sqlAl2json(self, result):
    arr = []
    for rs in result.all():
        proc = []
        try:
            iterator = iter(rs)
        except TypeError:
            proc.append(rs)
        else:
            for t in rs:
                proc.append(t)

        dict = {}
        for p in proc:
            tname = type(p).__name__
            for d in dir(p):
                if d.startswith('_') | d.startswith('metadata'):
                    pass
                else:
                    key = '%s_%s' %(tname, d)
                    dict[key] = getattr(p, d)
        arr.append(dict)
    return json.dumps(arr)

我在这方面没有太多经验,但下面的方法似乎对我来说很有用:

dict(row)

这似乎太简单了(与这里的其他答案相比)。我错过了什么?


在SQLAlchemy v0.8及更新版本中,使用检查系统。

from sqlalchemy import inspect

def object_as_dict(obj):
    return {c.key: getattr(obj, c.key)
            for c in inspect(obj).mapper.column_attrs}

user = session.query(User).first()

d = object_as_dict(user)

注意.key是属性名,可以与列名不同,例如:

class_ = Column('class', Text)

此方法也适用于column_property。


一个适用于继承类的解决方案:

from itertools import chain
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()


class Mixin(object):
    def as_dict(self):
        tables = [base.__table__ for base in self.__class__.__bases__ if base not in [Base, Mixin]]
        tables.append(self.__table__)
        return {c.name: getattr(self, c.name) for c in chain.from_iterable([x.columns for x in tables])}

我对使用(太多?)字典的看法:

def serialize(_query):
#d = dictionary written to per row
#D = dictionary d is written to each time, then reset
#Master = dictionary of dictionaries; the id Key (int, unique from database) from D is used as the Key for the dictionary D entry in Master
Master = {}
D = {}
x = 0
for u in _query:
    d = u.__dict__
    D = {}
    for n in d.keys():
        if n != '_sa_instance_state':
            D[n] = d[n]
    x = d['id']
    Master[x] = D
return Master

使用flask(包括jsonify)和flask_sqlalchemy将输出打印为JSON。

使用jsonify(serialize())调用该函数。

与我迄今为止尝试过的所有SQLAlchemy查询一起工作(运行SQLite3)


老问题,但由于这是谷歌中“sqlalchemy row to dict”的第一个结果,它值得一个更好的答案。

SqlAlchemy返回的RowProxy对象具有items()方法: http://docs.sqlalchemy.org/en/latest/core/connections.html#sqlalchemy.engine.RowProxy.items

它只是返回一个(key, value)元组列表。因此可以使用以下方法将行转换为dict:

在Python中<= 2.6:

rows = conn.execute(query)
list_of_dicts = [dict((key, value) for key, value in row.items()) for row in rows]

在Python中>= 2.7:

rows = conn.execute(query)
list_of_dicts = [{key: value for (key, value) in row.items()} for row in rows]

参考Alex Brasetvik的答案,你可以用一行代码来解决这个问题

row_as_dict = [dict(row) for row in resultproxy]

在Alex Brasetvik的回答的评论部分,SQLAlchemy的创建者zzzeek表示这是解决这个问题的“正确方法”。


有了这段代码,您还可以添加到您的查询“过滤器”或“连接”,这工作!

query = session.query(User)
def query_to_dict(query):
        def _create_dict(r):
            return {c.get('name'): getattr(r, c.get('name')) for c in query.column_descriptions}

    return [_create_dict(r) for r in query]

两种方式:

1.

for row in session.execute(session.query(User).statement):
    print(dict(row))

2.

selected_columns = User.__table__.columns
rows = session.query(User).with_entities(*selected_columns).all()
for row in rows :
    print(row._asdict())

您可以像这样将sqlalchemy对象转换为字典,并将其作为json/dictionary返回。

辅助功能:

import json
from collections import OrderedDict


def asdict(self):
    result = OrderedDict()
    for key in self.__mapper__.c.keys():
        if getattr(self, key) is not None:
            result[key] = str(getattr(self, key))
        else:
            result[key] = getattr(self, key)
    return result


def to_array(all_vendors):
    v = [ ven.asdict() for ven in all_vendors ]
    return json.dumps(v) 

驱动程序功能:

def all_products():
    all_products = Products.query.all()
    return to_array(all_products)

如果你的模型表列不需要mysql列。

例如:

class People:
    id: int = Column(name='id', type_=Integer, primary_key=True)
    createdTime: datetime = Column(name='create_time', type_=TIMESTAMP,
                               nullable=False,
                               server_default=text("CURRENT_TIMESTAMP"),
                               default=func.now())
    modifiedTime: datetime = Column(name='modify_time', type_=TIMESTAMP,
                                server_default=text("CURRENT_TIMESTAMP"),
                                default=func.now())

需要使用:

 from sqlalchemy.orm import class_mapper 
 def asDict(self):
        return {x.key: getattr(self, x.key, None) for x in
            class_mapper(Application).iterate_properties}

如果你使用这种方式,你可以得到modify_time和create_time都是None

{'id': 1, 'create_time': None, 'modify_time': None}


    def to_dict(self):
        return {c.name: getattr(self, c.name, None)
         for c in self.__table__.columns}

因为类属性名称不等于列存储在mysql


一个非常简单的解决方案:row._asdict()。

sqlalchemy引擎。Row _asdict () (v1。4)。 sqlalchemy KeyedTuple util。_asdict (3) (v1。)

> data = session.query(Table).all()
> [row._asdict() for row in data]

返回this:class:的内容。KeyedTuple作为字典

In [46]: result = aggregate_events[0]

In [47]: type(result)
Out[47]: sqlalchemy.util._collections.result

In [48]: def to_dict(query_result=None):
    ...:     cover_dict = {key: getattr(query_result, key) for key in query_result.keys()}
    ...:     return cover_dict
    ...: 
    ...:     

In [49]: to_dict(result)
Out[49]: 
{'calculate_avg': None,
 'calculate_max': None,
 'calculate_min': None,
 'calculate_sum': None,
 'dataPointIntID': 6,
 'data_avg': 10.0,
 'data_max': 10.0,
 'data_min': 10.0,
 'data_sum': 60.0,
 'deviceID': u'asas',
 'productID': u'U7qUDa',
 'tenantID': u'CvdQcYzUM'}

为了大家和我自己,以下是我如何使用它:

def run_sql(conn_String):
  output_connection = engine.create_engine(conn_string, poolclass=NullPool).connect()
  rows = output_connection.execute('select * from db1.t1').fetchall()  
  return [dict(row) for row in rows]

你可以试着这样做。

for u in session.query(User).all():
    print(u._asdict())

它使用查询对象中的内置方法返回查询对象的字典对象。

引用:https://docs.sqlalchemy.org/en/latest/orm/query.html


def to_dict(row):
    return {column.name: getattr(row, row.__mapper__.get_property_by_column(column).key) for column in row.__table__.columns}


for u in session.query(User).all():
    print(to_dict(u))

这个函数可能会有帮助。 当属性名与列名不同时,我找不到更好的解决方案来解决问题。


为了完成@Anurag Uniyal的回答,这里有一个递归地遵循关系的方法:

from sqlalchemy.inspection import inspect

def to_dict(obj, with_relationships=True):
    d = {}
    for column in obj.__table__.columns:
        if with_relationships and len(column.foreign_keys) > 0:
             # Skip foreign keys
            continue
        d[column.name] = getattr(obj, column.name)

    if with_relationships:
        for relationship in inspect(type(obj)).relationships:
            val = getattr(obj, relationship.key)
            d[relationship.key] = to_dict(val) if val else None
    return d

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    first_name = Column(TEXT)
    address_id = Column(Integer, ForeignKey('addresses.id')
    address = relationship('Address')

class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    city = Column(TEXT)


user = User(first_name='Nathan', address=Address(city='Lyon'))
# Add and commit user to session to create ids

to_dict(user)
# {'id': 1, 'first_name': 'Nathan', 'address': {'city': 'Lyon'}}
to_dict(user, with_relationship=False)
# {'id': 1, 'first_name': 'Nathan', 'address_id': 1}

你在你的项目中到处都需要它,我很欣赏@anurag的回答,它很好。直到这一点上,我正在使用它,但它会混乱你所有的代码,也不会与实体改变工作。

不如试试这个, 继承SQLAlchemy中的基查询类

from flask_sqlalchemy import SQLAlchemy, BaseQuery


class Query(BaseQuery):
    def as_dict(self):
        context = self._compile_context()
        context.statement.use_labels = False
        columns = [column.name for column in context.statement.columns]

        return list(map(lambda row: dict(zip(columns, row)), self.all()))


db = SQLAlchemy(query_class=Query)

在那之后,无论你在哪里定义你的对象“as_dict”方法都会在那里。


Python 3.6.8 +

内置str()方法自动转换datetime。Datetime对象到iso-8806-1。

print(json.dumps([dict(row.items()) for row in rows], default=str, indent="  "))

注意:默认的func只会应用于有错误的值,所以int和float值不会被转换…除非出现错误:)。


在python 3.8+中,我们可以使用数据类和它附带的asdict方法来实现这一点:

from dataclasses import dataclass, asdict

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, Integer, create_engine

Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)


@dataclass
class User(Base):
    __tablename__ = 'users'

    id: int = Column(Integer, primary_key=True)
    name: str = Column(String)
    email = Column(String)

    def __init__(self, name):
        self.name = name
        self.email = 'hello@example.com'


Base.metadata.create_all(engine)

SessionMaker = sessionmaker(bind=engine)
session = SessionMaker()

user1 = User("anurag")
session.add(user1)
session.commit()

query_result = session.query(User).one()  # type: User
print(f'{query_result.id=:}, {query_result.name=:}, {query_result.email=:}')
# query_result.id=1, query_result.name=anurag, query_result.email=hello@example.com

query_result_dict = asdict(query_result)
print(query_result_dict)
# {'id': 1, 'name': 'anurag'}

关键是使用@dataclass装饰器,并用它的类型(name: str = column (String)行的:str部分)注释每一列。

还要注意,由于电子邮件没有注释,因此它不包括在query_result_dict中。


我们可以在dict中得到一个对象列表:

def queryset_to_dict(query_result):
   query_columns = query_result[0].keys()
   res = [list(ele) for ele in query_result]
   dict_list = [dict(zip(query_columns, l)) for l in res]
   return dict_list

query_result = db.session.query(LanguageMaster).all()
dictvalue=queryset_to_dict(query_result)

我只是花了几分钟来处理这个问题。 标记为正确的答案不尊重字段的类型。 解决方案来自于dictalchemy,添加了一些有趣的功能。 https://pythonhosted.org/dictalchemy/ 我刚刚测试过,工作正常。

Base = declarative_base(cls=DictableModel)

session.query(User).asdict()
{'id': 1, 'username': 'Gerald'}

session.query(User).asdict(exclude=['id'])
{'username': 'Gerald'}

使用字典推导式

for u in session.query(User).all():
    print ({column.name: str(getattr(row, column.name)) for column in row.__table__.columns})

from copy import copy

def to_record(row):
    record = copy(row.__dict__)
    del record["_sa_instance_state"]
    return record

如果不使用复制,可能会遇到错误。


Anurag Uniyal版本的改进版本,考虑了类型:

def sa_vars(row):
    return {
        column.name: column.type.python_type(getattr(row, column.name))
        for column in row.__table__.columns
    }

使用sqlalchemy 1.4

session.execute(select(User.id, User.username)).mappings().all()
>> [{'id': 1, 'username': 'Bob'}, {'id': 2, 'username': 'Alice'}]

如OP所述,调用dict初始化器会引发一个异常,消息为“User”对象不可迭代。所以真正的问题是如何使一个SQLAlchemy模型可迭代?

We'll have to implement the special methods __iter__ and __next__, but if we inherit directly from the declarative_base model, we would still run into the undesirable "_sa_instance_state" key. What's worse, is we would have to loop through __dict__.keys() for every call to __next__ because the keys() method returns a View -- an iterable that is not indexed. This would increase the time complexity by a factor of N, where N is the number of keys in __dict__. Generating the dict would cost O(N^2). We can do better.

我们可以实现自己的基类,它实现所需的特殊方法,并存储可以通过索引访问的列名列表,从而降低生成O(N)字典的时间复杂性。这有一个额外的好处,我们可以定义一次逻辑,并在任何时候从基类继承,我们希望我们的模型类是可迭代的。

class IterableBase(declarative_base()):
    __abstract__ = True

    def _init_keys(self):
        self._keys = [c.name for c in self.__table__.columns]
        self._dict = {c.name: getattr(self, c.name) for c in self.__table__.columns}

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._init_keys()

    def __setattr__(self, name, value):
        super().__setattr__(name, value)
        if name not in ('_dict', '_keys', '_n') and '_dict' in self.__dict__:
            self._dict[name] = value

    def __iter__(self):
        self._n = 0
        return self

    def __next__(self):
        if self._n >= len(self._keys):
            raise StopIteration
        self._n += 1
        key = self._keys[self._n-1]
        return (key, self._dict[key])

现在User类可以直接从IterableBase类继承。

class User(IterableBase):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)

您可以确认,以User实例作为参数调用dict函数将返回所需的字典,没有"_sa_instance_state"。你可能已经注意到在IterableBase类中声明的__setattr__方法。这确保在初始化后属性发生变化或设置时更新_dict。

def main():
    user1 = User('Bob')
    print(dict(user1))
    # outputs {'id': None, 'name': 'Bob'}
    user1.id = 42
    print(dict(user1))
    # outputs {'id': 42, 'name': 'Bob'}

if __name__ == '__main__':
    main()

使用以下SQLAlchemy代码查询数据库后:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


SQLALCHEMY_DATABASE_URL = 'sqlite:///./examples/sql_app.db'
engine = create_engine(SQLALCHEMY_DATABASE_URL, echo=True)
query = sqlalchemy.select(TABLE)
result = engine.execute(query).fetchall()

你可以使用下面的一行代码:

query_dict = [record._mapping for record in results]

Sqlalchemy-utils有get_columns来帮助实现这一点。

你可以这样写:

{column: getattr(row, column) for column in get_columns(row)}