Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

自定义序列化和反序列化。

"from_json"(类方法)基于json数据构建一个Model对象。

“反序列化”只能在实例上调用,并将json中的所有数据合并到Model实例中。

"serialize" -递归序列化

需要__write_only__属性来定义只写属性(例如“password_hash”)。

class Serializable(object):
    __exclude__ = ('id',)
    __include__ = ()
    __write_only__ = ()

    @classmethod
    def from_json(cls, json, selfObj=None):
        if selfObj is None:
            self = cls()
        else:
            self = selfObj
        exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
        include = cls.__include__ or ()
        if json:
            for prop, value in json.iteritems():
                # ignore all non user data, e.g. only
                if (not (prop in exclude) | (prop in include)) and isinstance(
                        getattr(cls, prop, None), QueryableAttribute):
                    setattr(self, prop, value)
        return self

    def deserialize(self, json):
        if not json:
            return None
        return self.__class__.from_json(json, selfObj=self)

    @classmethod
    def serialize_list(cls, object_list=[]):
        output = []
        for li in object_list:
            if isinstance(li, Serializable):
                output.append(li.serialize())
            else:
                output.append(li)
        return output

    def serialize(self, **kwargs):

        # init write only props
        if len(getattr(self.__class__, '__write_only__', ())) == 0:
            self.__class__.__write_only__ = ()
        dictionary = {}
        expand = kwargs.get('expand', ()) or ()
        prop = 'props'
        if expand:
            # expand all the fields
            for key in expand:
                getattr(self, key)
        iterable = self.__dict__.items()
        is_custom_property_set = False
        # include only properties passed as parameter
        if (prop in kwargs) and (kwargs.get(prop, None) is not None):
            is_custom_property_set = True
            iterable = kwargs.get(prop, None)
        # loop trough all accessible properties
        for key in iterable:
            accessor = key
            if isinstance(key, tuple):
                accessor = key[0]
            if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
                # force select from db to be able get relationships
                if is_custom_property_set:
                    getattr(self, accessor, None)
                if isinstance(self.__dict__.get(accessor), list):
                    dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
                # check if those properties are read only
                elif isinstance(self.__dict__.get(accessor), Serializable):
                    dictionary[accessor] = self.__dict__.get(accessor).serialize()
                else:
                    dictionary[accessor] = self.__dict__.get(accessor)
        return dictionary

其他回答

在Flask下,它工作并处理datatime字段,转换类型字段 “时间”:datetime。Datetime(2018, 3, 22, 15, 40)成 “时间”:“2018-03-22 15:40:00”:

obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

# This to get the JSON body
return json.dumps(obj)

# Or this to get a response object
return jsonify(obj)

这并不是那么简单。我写了一些代码来做这件事。我还在开发中,它使用了MochiKit框架。它基本上使用代理和注册的JSON转换器在Python和Javascript之间转换复合对象。

数据库对象的浏览器端是db.js 它需要proxy.js中的基本Python代理源代码。

在Python方面,有基本代理模块。 最后是webserver.py中的SqlAlchemy对象编码器。 它还依赖于models.py文件中的元数据提取器。

你可以像这样使用SqlAlchemy的自省:

mysql = SQLAlchemy()
from sqlalchemy import inspect

class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }

@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)

@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

从下面的答案中得到启发: 将sqlalchemy行对象转换为python dict

扁平化实现

你可以使用这样的代码:

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

然后转换为JSON使用:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

它将忽略不可编码的字段(将它们设置为“None”)。

它不会自动展开关系(因为这可能导致自引用,并永远循环)。

递归的非循环实现

然而,如果你宁愿永远循环,你可以使用:

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

然后对对象进行编码,使用:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

这将编码所有的子代、子代、子代……基本上可以编码你的整个数据库。当它到达之前编码过的东西时,它会将其编码为“None”。

递归的、可能是循环的、有选择的实现

另一种选择,可能更好,是能够指定你想要展开的字段:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

你现在可以调用它:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

例如,仅展开名为“parents”的SQLAlchemy字段。

更详细的解释。 在你的模型中,添加:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()是针对python3的,所以如果使用python2则使用unicode()。它应该有助于反序列化日期。如果不处理这些,你可以删除它。

现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

需要First()来避免奇怪的错误。As_dict()现在将反序列化结果。反序列化之后,就可以将其转换为json了

jsonify(some_result)