是否有一种简单的方法来遍历列名和值对?
我的SQLAlchemy版本是0.5.6
下面是我尝试使用dict(row)的示例代码:
import sqlalchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
print "sqlalchemy version:",sqlalchemy.__version__
engine = create_engine('sqlite:///:memory:', echo=False)
metadata = MetaData()
users_table = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
)
metadata.create_all(engine)
class User(declarative_base()):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
def __init__(self, name):
self.name = name
Session = sessionmaker(bind=engine)
session = Session()
user1 = User("anurag")
session.add(user1)
session.commit()
# uncommenting next line throws exception 'TypeError: 'User' object is not iterable'
#print dict(user1)
# this one also throws 'TypeError: 'User' object is not iterable'
for u in session.query(User).all():
print dict(u)
在我的系统输出上运行这段代码:
Traceback (most recent call last):
File "untitled-1.py", line 37, in <module>
print dict(u)
TypeError: 'User' object is not iterable
正在迭代的表达式求值为模型对象列表,而不是行。下面是正确的用法:
for u in session.query(User).all():
print u.id, u.name
你真的需要把它们转换成字典吗?当然,有很多方法,但是你不需要SQLAlchemy的ORM部分:
result = session.execute(User.__table__.select())
for row in result:
print dict(row)
更新:看一下sqlalchemy.orm.attributes模块。它有一组处理对象状态的函数,这可能对您很有用,特别是instance_dict()。
我对马可·马里亚尼(Marco Mariani)的回答有一个变体,以装饰者的身份表达。主要的区别是它将处理实体列表,以及安全地忽略一些其他类型的返回值(这在使用mock编写测试时非常有用):
@decorator
def to_dict(f, *args, **kwargs):
result = f(*args, **kwargs)
if is_iterable(result) and not is_dict(result):
return map(asdict, result)
return asdict(result)
def asdict(obj):
return dict((col.name, getattr(obj, col.name))
for col in class_mapper(obj.__class__).mapped_table.c)
def is_dict(obj):
return isinstance(obj, dict)
def is_iterable(obj):
return True if getattr(obj, '__iter__', False) else False
Elixir是这样做的。这个解决方案的价值在于,它允许递归地包括关系的字典表示。
def to_dict(self, deep={}, exclude=[]):
"""Generate a JSON-style nested dict/list structure from an object."""
col_prop_names = [p.key for p in self.mapper.iterate_properties \
if isinstance(p, ColumnProperty)]
data = dict([(name, getattr(self, name))
for name in col_prop_names if name not in exclude])
for rname, rdeep in deep.iteritems():
dbdata = getattr(self, rname)
#FIXME: use attribute names (ie coltoprop) instead of column names
fks = self.mapper.get_property(rname).remote_side
exclude = [c.name for c in fks]
if dbdata is None:
data[rname] = None
elif isinstance(dbdata, list):
data[rname] = [o.to_dict(rdeep, exclude) for o in dbdata]
else:
data[rname] = dbdata.to_dict(rdeep, exclude)
return data
正如@balki提到的:
如果您正在查询特定的字段,可以使用_asdict()方法,因为它作为KeyedTuple返回。
In [1]: foo = db.session.query(Topic.name).first()
In [2]: foo._asdict()
Out[2]: {'name': u'blah'}
然而,如果您没有指定列,则可以使用其他建议的方法之一——例如@charlax提供的方法。注意,此方法仅对2.7+有效。
In [1]: foo = db.session.query(Topic).first()
In [2]: {x.name: getattr(foo, x.name) for x in foo.__table__.columns}
Out[2]: {'name': u'blah'}
假设下列函数将被添加到User类中,下面将返回所有列的所有键值对:
def columns_to_dict(self):
dict_ = {}
for key in self.__mapper__.c.keys():
dict_[key] = getattr(self, key)
return dict_
与其他答案不同的是,只有对象的那些属性被返回,这些属性是对象类级别的列属性。因此,不包括_sa_instance_state或SQLalchemy或您添加到对象中的任何其他属性。参考
编辑:忘记说,这也适用于继承的列。
hybrid_property延伸
如果你还想包含hybrid_property属性,下面的方法可以工作:
from sqlalchemy import inspect
from sqlalchemy.ext.hybrid import hybrid_property
def publics_to_dict(self) -> {}:
dict_ = {}
for key in self.__mapper__.c.keys():
if not key.startswith('_'):
dict_[key] = getattr(self, key)
for key, prop in inspect(self.__class__).all_orm_descriptors.items():
if isinstance(prop, hybrid_property):
dict_[key] = getattr(self, key)
return dict_
我假设您在这里用_开头标记Columns,以表明您想隐藏它们,或者是因为您通过hybrid_property访问属性,或者您只是不想显示它们。参考
Tipp all_orm_descriptors还返回hybrid_method和AssociationProxy,如果你也想包括它们的话。
其他答案备注
每个基于__dict__属性的答案(如1,2)只是返回对象的所有属性。这可以是你想要的更多的属性。如我所说,这包括_sa_instance_state或您在该对象上定义的任何其他属性。
基于dict()函数的每个答案(如1,2)只适用于session.execute()返回的SQLalchemy行对象,而不适用于您定义要使用的类,如问题中的User类。
基于row.__table__的求解答案。列肯定不行。row.__table__。columns包含SQL数据库的列名。这些只能等于python对象的属性名。如果不是,你会得到一个AttributeError。
对于基于class_mapper(obj.__class__).mapped_table.c的答案(如1,2)也是一样的。
我是一个新晋的Python程序员,遇到了使用join表获取JSON的问题。使用这里的答案中的信息,我构建了一个函数,将合理的结果返回到JSON,其中包括表名,避免使用别名或字段冲突。
简单地传递会话查询的结果:
test = Session()。查询(VMInfo、客户). join(客户).order_by (VMInfo.vm_name) .limit (50) .offset (10)
json = sqlAl2json(test)
def sqlAl2json(self, result):
arr = []
for rs in result.all():
proc = []
try:
iterator = iter(rs)
except TypeError:
proc.append(rs)
else:
for t in rs:
proc.append(t)
dict = {}
for p in proc:
tname = type(p).__name__
for d in dir(p):
if d.startswith('_') | d.startswith('metadata'):
pass
else:
key = '%s_%s' %(tname, d)
dict[key] = getattr(p, d)
arr.append(dict)
return json.dumps(arr)
我对使用(太多?)字典的看法:
def serialize(_query):
#d = dictionary written to per row
#D = dictionary d is written to each time, then reset
#Master = dictionary of dictionaries; the id Key (int, unique from database) from D is used as the Key for the dictionary D entry in Master
Master = {}
D = {}
x = 0
for u in _query:
d = u.__dict__
D = {}
for n in d.keys():
if n != '_sa_instance_state':
D[n] = d[n]
x = d['id']
Master[x] = D
return Master
使用flask(包括jsonify)和flask_sqlalchemy将输出打印为JSON。
使用jsonify(serialize())调用该函数。
与我迄今为止尝试过的所有SQLAlchemy查询一起工作(运行SQLite3)
老问题,但由于这是谷歌中“sqlalchemy row to dict”的第一个结果,它值得一个更好的答案。
SqlAlchemy返回的RowProxy对象具有items()方法:
http://docs.sqlalchemy.org/en/latest/core/connections.html#sqlalchemy.engine.RowProxy.items
它只是返回一个(key, value)元组列表。因此可以使用以下方法将行转换为dict:
在Python中<= 2.6:
rows = conn.execute(query)
list_of_dicts = [dict((key, value) for key, value in row.items()) for row in rows]
在Python中>= 2.7:
rows = conn.execute(query)
list_of_dicts = [{key: value for (key, value) in row.items()} for row in rows]
您可以像这样将sqlalchemy对象转换为字典,并将其作为json/dictionary返回。
辅助功能:
import json
from collections import OrderedDict
def asdict(self):
result = OrderedDict()
for key in self.__mapper__.c.keys():
if getattr(self, key) is not None:
result[key] = str(getattr(self, key))
else:
result[key] = getattr(self, key)
return result
def to_array(all_vendors):
v = [ ven.asdict() for ven in all_vendors ]
return json.dumps(v)
驱动程序功能:
def all_products():
all_products = Products.query.all()
return to_array(all_products)
如果你的模型表列不需要mysql列。
例如:
class People:
id: int = Column(name='id', type_=Integer, primary_key=True)
createdTime: datetime = Column(name='create_time', type_=TIMESTAMP,
nullable=False,
server_default=text("CURRENT_TIMESTAMP"),
default=func.now())
modifiedTime: datetime = Column(name='modify_time', type_=TIMESTAMP,
server_default=text("CURRENT_TIMESTAMP"),
default=func.now())
需要使用:
from sqlalchemy.orm import class_mapper
def asDict(self):
return {x.key: getattr(self, x.key, None) for x in
class_mapper(Application).iterate_properties}
如果你使用这种方式,你可以得到modify_time和create_time都是None
{'id': 1, 'create_time': None, 'modify_time': None}
def to_dict(self):
return {c.name: getattr(self, c.name, None)
for c in self.__table__.columns}
因为类属性名称不等于列存储在mysql
返回this:class:的内容。KeyedTuple作为字典
In [46]: result = aggregate_events[0]
In [47]: type(result)
Out[47]: sqlalchemy.util._collections.result
In [48]: def to_dict(query_result=None):
...: cover_dict = {key: getattr(query_result, key) for key in query_result.keys()}
...: return cover_dict
...:
...:
In [49]: to_dict(result)
Out[49]:
{'calculate_avg': None,
'calculate_max': None,
'calculate_min': None,
'calculate_sum': None,
'dataPointIntID': 6,
'data_avg': 10.0,
'data_max': 10.0,
'data_min': 10.0,
'data_sum': 60.0,
'deviceID': u'asas',
'productID': u'U7qUDa',
'tenantID': u'CvdQcYzUM'}
为了完成@Anurag Uniyal的回答,这里有一个递归地遵循关系的方法:
from sqlalchemy.inspection import inspect
def to_dict(obj, with_relationships=True):
d = {}
for column in obj.__table__.columns:
if with_relationships and len(column.foreign_keys) > 0:
# Skip foreign keys
continue
d[column.name] = getattr(obj, column.name)
if with_relationships:
for relationship in inspect(type(obj)).relationships:
val = getattr(obj, relationship.key)
d[relationship.key] = to_dict(val) if val else None
return d
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
first_name = Column(TEXT)
address_id = Column(Integer, ForeignKey('addresses.id')
address = relationship('Address')
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
city = Column(TEXT)
user = User(first_name='Nathan', address=Address(city='Lyon'))
# Add and commit user to session to create ids
to_dict(user)
# {'id': 1, 'first_name': 'Nathan', 'address': {'city': 'Lyon'}}
to_dict(user, with_relationship=False)
# {'id': 1, 'first_name': 'Nathan', 'address_id': 1}
你在你的项目中到处都需要它,我很欣赏@anurag的回答,它很好。直到这一点上,我正在使用它,但它会混乱你所有的代码,也不会与实体改变工作。
不如试试这个,
继承SQLAlchemy中的基查询类
from flask_sqlalchemy import SQLAlchemy, BaseQuery
class Query(BaseQuery):
def as_dict(self):
context = self._compile_context()
context.statement.use_labels = False
columns = [column.name for column in context.statement.columns]
return list(map(lambda row: dict(zip(columns, row)), self.all()))
db = SQLAlchemy(query_class=Query)
在那之后,无论你在哪里定义你的对象“as_dict”方法都会在那里。
在python 3.8+中,我们可以使用数据类和它附带的asdict方法来实现这一点:
from dataclasses import dataclass, asdict
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, Integer, create_engine
Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)
@dataclass
class User(Base):
__tablename__ = 'users'
id: int = Column(Integer, primary_key=True)
name: str = Column(String)
email = Column(String)
def __init__(self, name):
self.name = name
self.email = 'hello@example.com'
Base.metadata.create_all(engine)
SessionMaker = sessionmaker(bind=engine)
session = SessionMaker()
user1 = User("anurag")
session.add(user1)
session.commit()
query_result = session.query(User).one() # type: User
print(f'{query_result.id=:}, {query_result.name=:}, {query_result.email=:}')
# query_result.id=1, query_result.name=anurag, query_result.email=hello@example.com
query_result_dict = asdict(query_result)
print(query_result_dict)
# {'id': 1, 'name': 'anurag'}
关键是使用@dataclass装饰器,并用它的类型(name: str = column (String)行的:str部分)注释每一列。
还要注意,由于电子邮件没有注释,因此它不包括在query_result_dict中。
如OP所述,调用dict初始化器会引发一个异常,消息为“User”对象不可迭代。所以真正的问题是如何使一个SQLAlchemy模型可迭代?
We'll have to implement the special methods __iter__ and __next__, but if we inherit directly from the declarative_base model, we would still run into the undesirable "_sa_instance_state" key. What's worse, is we would have to loop through __dict__.keys() for every call to __next__ because the keys() method returns a View -- an iterable that is not indexed. This would increase the time complexity by a factor of N, where N is the number of keys in __dict__. Generating the dict would cost O(N^2). We can do better.
我们可以实现自己的基类,它实现所需的特殊方法,并存储可以通过索引访问的列名列表,从而降低生成O(N)字典的时间复杂性。这有一个额外的好处,我们可以定义一次逻辑,并在任何时候从基类继承,我们希望我们的模型类是可迭代的。
class IterableBase(declarative_base()):
__abstract__ = True
def _init_keys(self):
self._keys = [c.name for c in self.__table__.columns]
self._dict = {c.name: getattr(self, c.name) for c in self.__table__.columns}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._init_keys()
def __setattr__(self, name, value):
super().__setattr__(name, value)
if name not in ('_dict', '_keys', '_n') and '_dict' in self.__dict__:
self._dict[name] = value
def __iter__(self):
self._n = 0
return self
def __next__(self):
if self._n >= len(self._keys):
raise StopIteration
self._n += 1
key = self._keys[self._n-1]
return (key, self._dict[key])
现在User类可以直接从IterableBase类继承。
class User(IterableBase):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
您可以确认,以User实例作为参数调用dict函数将返回所需的字典,没有"_sa_instance_state"。你可能已经注意到在IterableBase类中声明的__setattr__方法。这确保在初始化后属性发生变化或设置时更新_dict。
def main():
user1 = User('Bob')
print(dict(user1))
# outputs {'id': None, 'name': 'Bob'}
user1.id = 42
print(dict(user1))
# outputs {'id': 42, 'name': 'Bob'}
if __name__ == '__main__':
main()