Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

虽然最初的问题可以追溯到很久以前,但这里的答案数量(以及我自己的经验)表明,这是一个不平凡的问题,有许多不同的方法,不同的复杂性和不同的权衡。

这就是为什么我构建了SQLAthanor库,它扩展了SQLAlchemy的声明性ORM,支持可配置的序列化/反序列化,您可能想看看。

该库支持:

Python 2.7, 3.4, 3.5, and 3.6. SQLAlchemy versions 0.9 and higher serialization/de-serialization to/from JSON, CSV, YAML, and Python dict serialization/de-serialization of columns/attributes, relationships, hybrid properties, and association proxies enabling and disabling of serialization for particular formats and columns/relationships/attributes (e.g. you want to support an inbound password value, but never include an outbound one) pre-serialization and post-deserialization value processing (for validation or type coercion) a pretty straightforward syntax that is both Pythonic and seamlessly consistent with SQLAlchemy's own approach

你可以在这里查看(我希望!)全面的文档:https://sqlathanor.readthedocs.io/en/latest

希望这能有所帮助!

其他回答

也许你可以使用这样的类

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import Table


class Custom:
    """Some custom logic here!"""

    __table__: Table  # def for mypy

    @declared_attr
    def __tablename__(cls):  # pylint: disable=no-self-argument
        return cls.__name__  # pylint: disable= no-member

    def to_dict(self) -> Dict[str, Any]:
        """Serializes only column data."""
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}

Base = declarative_base(cls=Custom)

class MyOwnTable(Base):
    #COLUMNS!

所有对象都有to_dict方法

如果你正在使用Flask并且只想快速查询:

def get_cats():
    sql = text("select * from cat")
    sql_params = {}
    result = db.session.execute(sql, sql_params)
    row_list = result.fetchall()
    data = [dict(r) for r in row_list]

    response = jsonify({
        'data': [{
            'categorias': data
        }]
    })
    
    return response

Python 3.7+和Flask 1.1+可以使用内置的数据类包

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)


@dataclass
class User(db.Model):
  id: int
  email: str

  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)


@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  


if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

/users/路由现在将返回一个用户列表。

[
  {"email": "user1@gmail.com", "id": 1},
  {"email": "user2@gmail.com", "id": 2}
]

自动序列化相关模型

@dataclass
class Account(db.Model):
  id: int
  users: User

  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

jsonify(account)的响应是这样的。

{  
   "id":1,
   "users":[  
      {  
         "email":"user1@gmail.com",
         "id":1
      },
      {  
         "email":"user2@gmail.com",
         "id":2
      }
   ]
}

覆盖默认的JSON编码器

from flask.json import JSONEncoder


class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"

  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    if type(o) == datetime.datetime:
      return o.isoformat()
    return super().default(o)

app.json_encoder = CustomJSONEncoder      

Flask-JsonTools包为您的模型提供了JsonSerializableBase基类的实现。

用法:

from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase

Base = declarative_base(cls=(JsonSerializableBase,))

class User(Base):
    #...

现在User模型可以神奇地序列化了。

如果你的框架不是Flask,你可以抓取代码

虽然使用一些原始sql和未定义的对象,使用cursor.description似乎得到了我正在寻找的东西:

with connection.cursor() as cur:
    print(query)
    cur.execute(query)
    for item in cur.fetchall():
        row = {column.name: item[i] for i, column in enumerate(cur.description)}
        print(row)