Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

def alc2json(row):
    return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])

我想和她玩会儿代码高尔夫。

供参考:我使用automap_base,因为我们有一个根据业务需求单独设计的模式。我今天才开始使用SQLAlchemy,但是文档指出automap_base是declarative_base的扩展,这似乎是SQLAlchemy ORM中的典型范例,所以我相信这应该可以工作。

根据Tjorriemorrie的解决方案,它并没有跟随外键,而是简单地将列与值匹配,并通过str()-ing列值来处理Python类型。我们的值包括Python datetime。时间和小数。十进位类类型的结果,所以它完成了工作。

希望对路人有所帮助!

其他回答

AlchemyEncoder是很棒的,但有时会失败的十进制值。这是一个改进的编码器,解决十进制问题-

class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects 
def default(self, obj):
    if isinstance(obj.__class__, DeclarativeMeta):
        model_fields = {}
        for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
            data = obj.__getattribute__(field)
            print data
            try:
                json.dumps(data)  # this will fail on non-encodable values, like other classes
                model_fields[field] = data
            except TypeError:
                model_fields[field] = None
        return model_fields
    if isinstance(obj, Decimal):
        return float(obj)
    return json.JSONEncoder.default(self, obj)

出于安全考虑,您不应该返回模型的所有字段。我更喜欢有选择性地选择他们。

Flask的json编码现在支持UUID, datetime和relationships(并为flask_sqlalchemy db添加了query和query_class。模型类)。编码器我更新如下:

app / json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json


    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = {}
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                    try:
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)

app / __init__ . py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

有了这个,我可以选择添加一个__json__属性,返回我希望编码的字段列表:

app / models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    def __init__(self, song):
        self.song = song
        self.src = song.full_path

    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

我添加@jsonapi到我的视图,返回结果列表,然后我的输出如下:

[

{

    "created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
    "song": 

        {
            "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
            "id": 2,
            "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
        },
    "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
    "type": "audio/mpeg"
}

]

这是一个JSONEncoder版本,它保留了模型列的顺序,只保留递归定义的列和关系字段。它还格式化了大多数不可序列化的JSON类型:

import json
from datetime import datetime
from decimal import Decimal

import arrow
from sqlalchemy.ext.declarative import DeclarativeMeta

class SQLAlchemyJSONEncoder(json.JSONEncoder):
    """
    SQLAlchemy ORM JSON Encoder
    If you have a "backref" relationship defined in your SQLAlchemy model,
    this encoder raises a ValueError to stop an infinite loop.
    """

    def default(self, obj):
        if isinstance(obj, datetime):
            return arrow.get(obj).isoformat()
        elif isinstance(obj, Decimal):
            return float(obj)
        elif isinstance(obj, set):
            return sorted(obj)
        elif isinstance(obj.__class__, DeclarativeMeta):
            for attribute, relationship in obj.__mapper__.relationships.items():
                if isinstance(relationship.__getattribute__("backref"), tuple):
                    raise ValueError(
                        f'{obj.__class__} object has a "backref" relationship '
                        "that would cause an infinite loop!"
                    )
            dictionary = {}
            column_names = [column.name for column in obj.__table__.columns]
            for key in column_names:
                value = obj.__getattribute__(key)
                if isinstance(value, datetime):
                    value = arrow.get(value).isoformat()
                elif isinstance(value, Decimal):
                    value = float(value)
                elif isinstance(value, set):
                    value = sorted(value)
                dictionary[key] = value
            for key in [
                attribute
                for attribute in dir(obj)
                if not attribute.startswith("_")
                and attribute != "metadata"
                and attribute not in column_names
            ]:
                value = obj.__getattribute__(key)
                dictionary[key] = value
            return dictionary

        return super().default(obj)

Python 3.7+和Flask 1.1+可以使用内置的数据类包

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)


@dataclass
class User(db.Model):
  id: int
  email: str

  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)


@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  


if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

/users/路由现在将返回一个用户列表。

[
  {"email": "user1@gmail.com", "id": 1},
  {"email": "user2@gmail.com", "id": 2}
]

自动序列化相关模型

@dataclass
class Account(db.Model):
  id: int
  users: User

  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

jsonify(account)的响应是这样的。

{  
   "id":1,
   "users":[  
      {  
         "email":"user1@gmail.com",
         "id":1
      },
      {  
         "email":"user2@gmail.com",
         "id":2
      }
   ]
}

覆盖默认的JSON编码器

from flask.json import JSONEncoder


class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"

  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    if type(o) == datetime.datetime:
      return o.isoformat()
    return super().default(o)

app.json_encoder = CustomJSONEncoder      

如果你正在使用Flask并且只想快速查询:

def get_cats():
    sql = text("select * from cat")
    sql_params = {}
    result = db.session.execute(sql, sql_params)
    row_list = result.fetchall()
    data = [dict(r) for r in row_list]

    response = jsonify({
        'data': [{
            'categorias': data
        }]
    })
    
    return response