Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

Python 3.7+将于2023年发布

您可以将数据类装饰器添加到您的模型中,并定义一个自定义JSON序列化器,然后是JSON。转储将工作(通过向cls提供自定义编码器)。在下面的例子中,db_row是DB类的一个实例:

json.dumps(db_row, cls=models.CustomJSONEncoder)
{"id": 25, "name": "A component", "author": "Bob", "modified": "2023-02-08T11:49:15.675837"}

可以很容易地修改定制JSON序列化器,使其与任何原生JSON不可序列化的类型兼容。

models.py

from datetime import datetime
import dataclasses
import json
from sqlalchemy import Column, Integer, String, DateTime
from database import Base


@dataclasses.dataclass # <<-- add this decorator 
class DB(Base):
    """Model used for SQLite database entries."""

    __tablename__ = "components"

    id: int = Column(Integer, primary_key=True, index=True)
    name: str = Column(String)
    author: str = Column(String)
    modified: datetime = Column(DateTime(timezone=True), default=datetime.utcnow)


class CustomJSONEncoder(json.JSONEncoder): # <<-- Add this custom encoder 
    """Custom JSON encoder for the DB class."""

    def default(self, o):
        if dataclasses.is_dataclass(o): # this serializes anything dataclass can handle  
            return dataclasses.asdict(o)
        if isinstance(o, datetime): # this adds support for datetime
            return o.isoformat()
        return super().default(o)

为了进一步扩展它,使它适用于你在数据库中可能使用的任何不可序列化的类型,在自定义编码器类中添加另一条if语句,返回一些可序列化的东西(例如str)。

其他回答

虽然使用一些原始sql和未定义的对象,使用cursor.description似乎得到了我正在寻找的东西:

with connection.cursor() as cur:
    print(query)
    cur.execute(query)
    for item in cur.fetchall():
        row = {column.name: item[i] for i, column in enumerate(cur.description)}
        print(row)

更详细的解释。 在你的模型中,添加:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()是针对python3的,所以如果使用python2则使用unicode()。它应该有助于反序列化日期。如果不处理这些,你可以删除它。

现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

需要First()来避免奇怪的错误。As_dict()现在将反序列化结果。反序列化之后,就可以将其转换为json了

jsonify(some_result)
class SqlToDict:
    def __init__(self, data) -> None:
        self.data = data

    def to_timestamp(self, date):
        if isinstance(date, datetime):
            return int(datetime.timestamp(date))
        else:
            return date

    def to_dict(self) -> List:
        arr = []
        for i in self.data:
            keys = [*i.keys()]
            values = [*i]
            values = [self.to_timestamp(d) for d in values]
            arr.append(dict(zip(keys, values)))
        return arr

例如:

SqlToDict(data).to_dict()

这并不是那么简单。我写了一些代码来做这件事。我还在开发中,它使用了MochiKit框架。它基本上使用代理和注册的JSON转换器在Python和Javascript之间转换复合对象。

数据库对象的浏览器端是db.js 它需要proxy.js中的基本Python代理源代码。

在Python方面,有基本代理模块。 最后是webserver.py中的SqlAlchemy对象编码器。 它还依赖于models.py文件中的元数据提取器。

如果你正在使用Flask并且只想快速查询:

def get_cats():
    sql = text("select * from cat")
    sql_params = {}
    result = db.session.execute(sql, sql_params)
    row_list = result.fetchall()
    data = [dict(r) for r in row_list]

    response = jsonify({
        'data': [{
            'categorias': data
        }]
    })
    
    return response