是否有一种简单的方法来遍历列名和值对?

我的SQLAlchemy版本是0.5.6

下面是我尝试使用dict(row)的示例代码:

import sqlalchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

print "sqlalchemy version:",sqlalchemy.__version__ 

engine = create_engine('sqlite:///:memory:', echo=False)
metadata = MetaData()
users_table = Table('users', metadata,
     Column('id', Integer, primary_key=True),
     Column('name', String),
)
metadata.create_all(engine) 

class User(declarative_base()):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    
    def __init__(self, name):
        self.name = name

Session = sessionmaker(bind=engine)
session = Session()

user1 = User("anurag")
session.add(user1)
session.commit()

# uncommenting next line throws exception 'TypeError: 'User' object is not iterable'
#print dict(user1)
# this one also throws 'TypeError: 'User' object is not iterable'
for u in session.query(User).all():
    print dict(u)

在我的系统输出上运行这段代码:

Traceback (most recent call last):
  File "untitled-1.py", line 37, in <module>
    print dict(u)
TypeError: 'User' object is not iterable

当前回答

Python 3.6.8 +

内置str()方法自动转换datetime。Datetime对象到iso-8806-1。

print(json.dumps([dict(row.items()) for row in rows], default=str, indent="  "))

注意:默认的func只会应用于有错误的值,所以int和float值不会被转换…除非出现错误:)。

其他回答

我找到这篇文章是因为我正在寻找一种将SQLAlchemy行转换为dict的方法。我正在使用SqlSoup…但答案是我自己想出来的,所以,如果它能帮助到别人,我的意见是:

a = db.execute('select * from acquisizioni_motes')
b = a.fetchall()
c = b[0]

# and now, finally...
dict(zip(c.keys(), c.values()))
from copy import copy

def to_record(row):
    record = copy(row.__dict__)
    del record["_sa_instance_state"]
    return record

如果不使用复制,可能会遇到错误。

为了大家和我自己,以下是我如何使用它:

def run_sql(conn_String):
  output_connection = engine.create_engine(conn_string, poolclass=NullPool).connect()
  rows = output_connection.execute('select * from db1.t1').fetchall()  
  return [dict(row) for row in rows]

在python 3.8+中,我们可以使用数据类和它附带的asdict方法来实现这一点:

from dataclasses import dataclass, asdict

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, Integer, create_engine

Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)


@dataclass
class User(Base):
    __tablename__ = 'users'

    id: int = Column(Integer, primary_key=True)
    name: str = Column(String)
    email = Column(String)

    def __init__(self, name):
        self.name = name
        self.email = 'hello@example.com'


Base.metadata.create_all(engine)

SessionMaker = sessionmaker(bind=engine)
session = SessionMaker()

user1 = User("anurag")
session.add(user1)
session.commit()

query_result = session.query(User).one()  # type: User
print(f'{query_result.id=:}, {query_result.name=:}, {query_result.email=:}')
# query_result.id=1, query_result.name=anurag, query_result.email=hello@example.com

query_result_dict = asdict(query_result)
print(query_result_dict)
# {'id': 1, 'name': 'anurag'}

关键是使用@dataclass装饰器,并用它的类型(name: str = column (String)行的:str部分)注释每一列。

还要注意,由于电子邮件没有注释,因此它不包括在query_result_dict中。

我是一个新晋的Python程序员,遇到了使用join表获取JSON的问题。使用这里的答案中的信息,我构建了一个函数,将合理的结果返回到JSON,其中包括表名,避免使用别名或字段冲突。

简单地传递会话查询的结果:

test = Session()。查询(VMInfo、客户). join(客户).order_by (VMInfo.vm_name) .limit (50) .offset (10)

json = sqlAl2json(test)

def sqlAl2json(self, result):
    arr = []
    for rs in result.all():
        proc = []
        try:
            iterator = iter(rs)
        except TypeError:
            proc.append(rs)
        else:
            for t in rs:
                proc.append(t)

        dict = {}
        for p in proc:
            tname = type(p).__name__
            for d in dir(p):
                if d.startswith('_') | d.startswith('metadata'):
                    pass
                else:
                    key = '%s_%s' %(tname, d)
                    dict[key] = getattr(p, d)
        arr.append(dict)
    return json.dumps(arr)