Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

这是一个JSONEncoder版本,它保留了模型列的顺序,只保留递归定义的列和关系字段。它还格式化了大多数不可序列化的JSON类型:

import json
from datetime import datetime
from decimal import Decimal

import arrow
from sqlalchemy.ext.declarative import DeclarativeMeta

class SQLAlchemyJSONEncoder(json.JSONEncoder):
    """
    SQLAlchemy ORM JSON Encoder
    If you have a "backref" relationship defined in your SQLAlchemy model,
    this encoder raises a ValueError to stop an infinite loop.
    """

    def default(self, obj):
        if isinstance(obj, datetime):
            return arrow.get(obj).isoformat()
        elif isinstance(obj, Decimal):
            return float(obj)
        elif isinstance(obj, set):
            return sorted(obj)
        elif isinstance(obj.__class__, DeclarativeMeta):
            for attribute, relationship in obj.__mapper__.relationships.items():
                if isinstance(relationship.__getattribute__("backref"), tuple):
                    raise ValueError(
                        f'{obj.__class__} object has a "backref" relationship '
                        "that would cause an infinite loop!"
                    )
            dictionary = {}
            column_names = [column.name for column in obj.__table__.columns]
            for key in column_names:
                value = obj.__getattribute__(key)
                if isinstance(value, datetime):
                    value = arrow.get(value).isoformat()
                elif isinstance(value, Decimal):
                    value = float(value)
                elif isinstance(value, set):
                    value = sorted(value)
                dictionary[key] = value
            for key in [
                attribute
                for attribute in dir(obj)
                if not attribute.startswith("_")
                and attribute != "metadata"
                and attribute not in column_names
            ]:
                value = obj.__getattribute__(key)
                dictionary[key] = value
            return dictionary

        return super().default(obj)

其他回答

更详细的解释。 在你的模型中,添加:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()是针对python3的,所以如果使用python2则使用unicode()。它应该有助于反序列化日期。如果不处理这些,你可以删除它。

现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

需要First()来避免奇怪的错误。As_dict()现在将反序列化结果。反序列化之后,就可以将其转换为json了

jsonify(some_result)

出于安全考虑,您不应该返回模型的所有字段。我更喜欢有选择性地选择他们。

Flask的json编码现在支持UUID, datetime和relationships(并为flask_sqlalchemy db添加了query和query_class。模型类)。编码器我更新如下:

app / json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json


    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = {}
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                    try:
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)

app / __init__ . py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

有了这个,我可以选择添加一个__json__属性,返回我希望编码的字段列表:

app / models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    def __init__(self, song):
        self.song = song
        self.src = song.full_path

    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

我添加@jsonapi到我的视图,返回结果列表,然后我的输出如下:

[

{

    "created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
    "song": 

        {
            "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
            "id": 2,
            "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
        },
    "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
    "type": "audio/mpeg"
}

]

自定义序列化和反序列化。

"from_json"(类方法)基于json数据构建一个Model对象。

“反序列化”只能在实例上调用,并将json中的所有数据合并到Model实例中。

"serialize" -递归序列化

需要__write_only__属性来定义只写属性(例如“password_hash”)。

class Serializable(object):
    __exclude__ = ('id',)
    __include__ = ()
    __write_only__ = ()

    @classmethod
    def from_json(cls, json, selfObj=None):
        if selfObj is None:
            self = cls()
        else:
            self = selfObj
        exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
        include = cls.__include__ or ()
        if json:
            for prop, value in json.iteritems():
                # ignore all non user data, e.g. only
                if (not (prop in exclude) | (prop in include)) and isinstance(
                        getattr(cls, prop, None), QueryableAttribute):
                    setattr(self, prop, value)
        return self

    def deserialize(self, json):
        if not json:
            return None
        return self.__class__.from_json(json, selfObj=self)

    @classmethod
    def serialize_list(cls, object_list=[]):
        output = []
        for li in object_list:
            if isinstance(li, Serializable):
                output.append(li.serialize())
            else:
                output.append(li)
        return output

    def serialize(self, **kwargs):

        # init write only props
        if len(getattr(self.__class__, '__write_only__', ())) == 0:
            self.__class__.__write_only__ = ()
        dictionary = {}
        expand = kwargs.get('expand', ()) or ()
        prop = 'props'
        if expand:
            # expand all the fields
            for key in expand:
                getattr(self, key)
        iterable = self.__dict__.items()
        is_custom_property_set = False
        # include only properties passed as parameter
        if (prop in kwargs) and (kwargs.get(prop, None) is not None):
            is_custom_property_set = True
            iterable = kwargs.get(prop, None)
        # loop trough all accessible properties
        for key in iterable:
            accessor = key
            if isinstance(key, tuple):
                accessor = key[0]
            if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
                # force select from db to be able get relationships
                if is_custom_property_set:
                    getattr(self, accessor, None)
                if isinstance(self.__dict__.get(accessor), list):
                    dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
                # check if those properties are read only
                elif isinstance(self.__dict__.get(accessor), Serializable):
                    dictionary[accessor] = self.__dict__.get(accessor).serialize()
                else:
                    dictionary[accessor] = self.__dict__.get(accessor)
        return dictionary

安装simplejson by PIP安装simplejson并创建一个类

class Serialise(object):

    def _asdict(self):
        """
        Serialization logic for converting entities using flask's jsonify

        :return: An ordered dictionary
        :rtype: :class:`collections.OrderedDict`
        """

        result = OrderedDict()
        # Get the columns
        for key in self.__mapper__.c.keys():
            if isinstance(getattr(self, key), datetime):
                result["x"] = getattr(self, key).timestamp() * 1000
                result["timestamp"] = result["x"]
            else:
                result[key] = getattr(self, key)

        return result

并将这个类继承到每个orm类,这样这个_asdict函数就会注册到每个orm类,然后。 并在任何地方使用jsonify

这是一个JSONEncoder版本,它保留了模型列的顺序,只保留递归定义的列和关系字段。它还格式化了大多数不可序列化的JSON类型:

import json
from datetime import datetime
from decimal import Decimal

import arrow
from sqlalchemy.ext.declarative import DeclarativeMeta

class SQLAlchemyJSONEncoder(json.JSONEncoder):
    """
    SQLAlchemy ORM JSON Encoder
    If you have a "backref" relationship defined in your SQLAlchemy model,
    this encoder raises a ValueError to stop an infinite loop.
    """

    def default(self, obj):
        if isinstance(obj, datetime):
            return arrow.get(obj).isoformat()
        elif isinstance(obj, Decimal):
            return float(obj)
        elif isinstance(obj, set):
            return sorted(obj)
        elif isinstance(obj.__class__, DeclarativeMeta):
            for attribute, relationship in obj.__mapper__.relationships.items():
                if isinstance(relationship.__getattribute__("backref"), tuple):
                    raise ValueError(
                        f'{obj.__class__} object has a "backref" relationship '
                        "that would cause an infinite loop!"
                    )
            dictionary = {}
            column_names = [column.name for column in obj.__table__.columns]
            for key in column_names:
                value = obj.__getattribute__(key)
                if isinstance(value, datetime):
                    value = arrow.get(value).isoformat()
                elif isinstance(value, Decimal):
                    value = float(value)
                elif isinstance(value, set):
                    value = sorted(value)
                dictionary[key] = value
            for key in [
                attribute
                for attribute in dir(obj)
                if not attribute.startswith("_")
                and attribute != "metadata"
                and attribute not in column_names
            ]:
                value = obj.__getattribute__(key)
                dictionary[key] = value
            return dictionary

        return super().default(obj)