是否有一种简单的方法来遍历列名和值对?

我的SQLAlchemy版本是0.5.6

下面是我尝试使用dict(row)的示例代码:

import sqlalchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

print "sqlalchemy version:",sqlalchemy.__version__ 

engine = create_engine('sqlite:///:memory:', echo=False)
metadata = MetaData()
users_table = Table('users', metadata,
     Column('id', Integer, primary_key=True),
     Column('name', String),
)
metadata.create_all(engine) 

class User(declarative_base()):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    
    def __init__(self, name):
        self.name = name

Session = sessionmaker(bind=engine)
session = Session()

user1 = User("anurag")
session.add(user1)
session.commit()

# uncommenting next line throws exception 'TypeError: 'User' object is not iterable'
#print dict(user1)
# this one also throws 'TypeError: 'User' object is not iterable'
for u in session.query(User).all():
    print dict(u)

在我的系统输出上运行这段代码:

Traceback (most recent call last):
  File "untitled-1.py", line 37, in <module>
    print dict(u)
TypeError: 'User' object is not iterable

当前回答

正如@balki提到的:

如果您正在查询特定的字段,可以使用_asdict()方法,因为它作为KeyedTuple返回。

In [1]: foo = db.session.query(Topic.name).first()
In [2]: foo._asdict()
Out[2]: {'name': u'blah'}

然而,如果您没有指定列,则可以使用其他建议的方法之一——例如@charlax提供的方法。注意,此方法仅对2.7+有效。

In [1]: foo = db.session.query(Topic).first()
In [2]: {x.name: getattr(foo, x.name) for x in foo.__table__.columns}
Out[2]: {'name': u'blah'}

其他回答

Sqlalchemy-utils有get_columns来帮助实现这一点。

你可以这样写:

{column: getattr(row, column) for column in get_columns(row)}

我是一个新晋的Python程序员,遇到了使用join表获取JSON的问题。使用这里的答案中的信息,我构建了一个函数,将合理的结果返回到JSON,其中包括表名,避免使用别名或字段冲突。

简单地传递会话查询的结果:

test = Session()。查询(VMInfo、客户). join(客户).order_by (VMInfo.vm_name) .limit (50) .offset (10)

json = sqlAl2json(test)

def sqlAl2json(self, result):
    arr = []
    for rs in result.all():
        proc = []
        try:
            iterator = iter(rs)
        except TypeError:
            proc.append(rs)
        else:
            for t in rs:
                proc.append(t)

        dict = {}
        for p in proc:
            tname = type(p).__name__
            for d in dir(p):
                if d.startswith('_') | d.startswith('metadata'):
                    pass
                else:
                    key = '%s_%s' %(tname, d)
                    dict[key] = getattr(p, d)
        arr.append(dict)
    return json.dumps(arr)

这里有一个超级简单的方法

row2dict = lambda r: dict(r.items())

返回this:class:的内容。KeyedTuple作为字典

In [46]: result = aggregate_events[0]

In [47]: type(result)
Out[47]: sqlalchemy.util._collections.result

In [48]: def to_dict(query_result=None):
    ...:     cover_dict = {key: getattr(query_result, key) for key in query_result.keys()}
    ...:     return cover_dict
    ...: 
    ...:     

In [49]: to_dict(result)
Out[49]: 
{'calculate_avg': None,
 'calculate_max': None,
 'calculate_min': None,
 'calculate_sum': None,
 'dataPointIntID': 6,
 'data_avg': 10.0,
 'data_max': 10.0,
 'data_min': 10.0,
 'data_sum': 60.0,
 'deviceID': u'asas',
 'productID': u'U7qUDa',
 'tenantID': u'CvdQcYzUM'}

两种方式:

1.

for row in session.execute(session.query(User).statement):
    print(dict(row))

2.

selected_columns = User.__table__.columns
rows = session.query(User).with_entities(*selected_columns).all()
for row in rows :
    print(row._asdict())