是否有一种简单的方法来遍历列名和值对?

我的SQLAlchemy版本是0.5.6

下面是我尝试使用dict(row)的示例代码:

import sqlalchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

print "sqlalchemy version:",sqlalchemy.__version__ 

engine = create_engine('sqlite:///:memory:', echo=False)
metadata = MetaData()
users_table = Table('users', metadata,
     Column('id', Integer, primary_key=True),
     Column('name', String),
)
metadata.create_all(engine) 

class User(declarative_base()):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    
    def __init__(self, name):
        self.name = name

Session = sessionmaker(bind=engine)
session = Session()

user1 = User("anurag")
session.add(user1)
session.commit()

# uncommenting next line throws exception 'TypeError: 'User' object is not iterable'
#print dict(user1)
# this one also throws 'TypeError: 'User' object is not iterable'
for u in session.query(User).all():
    print dict(u)

在我的系统输出上运行这段代码:

Traceback (most recent call last):
  File "untitled-1.py", line 37, in <module>
    print dict(u)
TypeError: 'User' object is not iterable

当前回答

为了完成@Anurag Uniyal的回答,这里有一个递归地遵循关系的方法:

from sqlalchemy.inspection import inspect

def to_dict(obj, with_relationships=True):
    d = {}
    for column in obj.__table__.columns:
        if with_relationships and len(column.foreign_keys) > 0:
             # Skip foreign keys
            continue
        d[column.name] = getattr(obj, column.name)

    if with_relationships:
        for relationship in inspect(type(obj)).relationships:
            val = getattr(obj, relationship.key)
            d[relationship.key] = to_dict(val) if val else None
    return d

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    first_name = Column(TEXT)
    address_id = Column(Integer, ForeignKey('addresses.id')
    address = relationship('Address')

class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    city = Column(TEXT)


user = User(first_name='Nathan', address=Address(city='Lyon'))
# Add and commit user to session to create ids

to_dict(user)
# {'id': 1, 'first_name': 'Nathan', 'address': {'city': 'Lyon'}}
to_dict(user, with_relationship=False)
# {'id': 1, 'first_name': 'Nathan', 'address_id': 1}

其他回答

为了完成@Anurag Uniyal的回答,这里有一个递归地遵循关系的方法:

from sqlalchemy.inspection import inspect

def to_dict(obj, with_relationships=True):
    d = {}
    for column in obj.__table__.columns:
        if with_relationships and len(column.foreign_keys) > 0:
             # Skip foreign keys
            continue
        d[column.name] = getattr(obj, column.name)

    if with_relationships:
        for relationship in inspect(type(obj)).relationships:
            val = getattr(obj, relationship.key)
            d[relationship.key] = to_dict(val) if val else None
    return d

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    first_name = Column(TEXT)
    address_id = Column(Integer, ForeignKey('addresses.id')
    address = relationship('Address')

class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    city = Column(TEXT)


user = User(first_name='Nathan', address=Address(city='Lyon'))
# Add and commit user to session to create ids

to_dict(user)
# {'id': 1, 'first_name': 'Nathan', 'address': {'city': 'Lyon'}}
to_dict(user, with_relationship=False)
# {'id': 1, 'first_name': 'Nathan', 'address_id': 1}

在大多数情况下,列名适合它们。但是你可能会像下面这样写代码:

class UserModel(BaseModel):
    user_id = Column("user_id", INT, primary_key=True)
    email = Column("user_email", STRING)

column.name“user_email”而字段名是“email”,column.name不能像以前那样工作。

sqlalchemy_base_model.py

我把答案写在这里

参考Alex Brasetvik的答案,你可以用一行代码来解决这个问题

row_as_dict = [dict(row) for row in resultproxy]

在Alex Brasetvik的回答的评论部分,SQLAlchemy的创建者zzzeek表示这是解决这个问题的“正确方法”。

在python 3.8+中,我们可以使用数据类和它附带的asdict方法来实现这一点:

from dataclasses import dataclass, asdict

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, Integer, create_engine

Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)


@dataclass
class User(Base):
    __tablename__ = 'users'

    id: int = Column(Integer, primary_key=True)
    name: str = Column(String)
    email = Column(String)

    def __init__(self, name):
        self.name = name
        self.email = 'hello@example.com'


Base.metadata.create_all(engine)

SessionMaker = sessionmaker(bind=engine)
session = SessionMaker()

user1 = User("anurag")
session.add(user1)
session.commit()

query_result = session.query(User).one()  # type: User
print(f'{query_result.id=:}, {query_result.name=:}, {query_result.email=:}')
# query_result.id=1, query_result.name=anurag, query_result.email=hello@example.com

query_result_dict = asdict(query_result)
print(query_result_dict)
# {'id': 1, 'name': 'anurag'}

关键是使用@dataclass装饰器,并用它的类型(name: str = column (String)行的:str部分)注释每一列。

还要注意,由于电子邮件没有注释,因此它不包括在query_result_dict中。

from copy import copy

def to_record(row):
    record = copy(row.__dict__)
    del record["_sa_instance_state"]
    return record

如果不使用复制,可能会遇到错误。