Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

经过一番尝试,我想出了自己的解决方案

def to_dict(self):
    keys = self.__mapper__.attrs.keys()
    attrs = vars(self)
    return { k : attrs[k]  for k in keys}

其他回答

下面是一个解决方案,它允许您选择希望在输出中包含的关系。 注意:这是一个完整的重写,将dict/str作为一个参数,而不是一个列表。修复了一些东西..

def deep_dict(self, relations={}):
    """Output a dict of an SA object recursing as deep as you want.

    Takes one argument, relations which is a dictionary of relations we'd
    like to pull out. The relations dict items can be a single relation
    name or deeper relation names connected by sub dicts

    Example:
        Say we have a Person object with a family relationship
            person.deep_dict(relations={'family':None})
        Say the family object has homes as a relation then we can do
            person.deep_dict(relations={'family':{'homes':None}})
            OR
            person.deep_dict(relations={'family':'homes'})
        Say homes has a relation like rooms you can do
            person.deep_dict(relations={'family':{'homes':'rooms'}})
            and so on...
    """
    mydict =  dict((c, str(a)) for c, a in
                    self.__dict__.items() if c != '_sa_instance_state')
    if not relations:
        # just return ourselves
        return mydict

    # otherwise we need to go deeper
    if not isinstance(relations, dict) and not isinstance(relations, str):
        raise Exception("relations should be a dict, it is of type {}".format(type(relations)))

    # got here so check and handle if we were passed a dict
    if isinstance(relations, dict):
        # we were passed deeper info
        for left, right in relations.items():
            myrel = getattr(self, left)
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=right)
    # if we get here check and handle if we were passed a string
    elif isinstance(relations, str):
        # passed a single item
        myrel = getattr(self, relations)
        left = relations
        if isinstance(myrel, list):
            mydict[left] = [rel.deep_dict(relations=None)
                                 for rel in myrel]
        else:
            mydict[left] = myrel.deep_dict(relations=None)

    return mydict

举个关于person/family/homes/rooms的例子…把它转换成json,你只需要

json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))

你可以像这样使用SqlAlchemy的自省:

mysql = SQLAlchemy()
from sqlalchemy import inspect

class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }

@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)

@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

从下面的答案中得到启发: 将sqlalchemy行对象转换为python dict

出于安全考虑,您不应该返回模型的所有字段。我更喜欢有选择性地选择他们。

Flask的json编码现在支持UUID, datetime和relationships(并为flask_sqlalchemy db添加了query和query_class。模型类)。编码器我更新如下:

app / json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json


    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = {}
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                    try:
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)

app / __init__ . py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

有了这个,我可以选择添加一个__json__属性,返回我希望编码的字段列表:

app / models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    def __init__(self, song):
        self.song = song
        self.src = song.full_path

    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

我添加@jsonapi到我的视图,返回结果列表,然后我的输出如下:

[

{

    "created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
    "song": 

        {
            "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
            "id": 2,
            "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
        },
    "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
    "type": "audio/mpeg"
}

]

向任何模型添加一个_dict方法的动态方法

from sqlalchemy.inspection import inspect

def implement_as_dict(model):
    if not hasattr(model,"as_dict"):
        column_names=[]
        imodel = inspect(model)
        for c in imodel.columns:
            column_names.append(c.key)

        #define model.as_dict()
        def as_dict(self):
            d = {}
            for c in column_names:
                d[c] = getattr(self,c)
            return d

        setattr(model,"as_dict",as_dict)

#model definition
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
# adding as_dict definition to model
implement_as_dict(User)

然后你可以使用

user = session.query(User).filter_by(name='rick').first() 

user.as_dict()
#sample output 
{"id":1,"name":"rick"}

这并不是那么简单。我写了一些代码来做这件事。我还在开发中,它使用了MochiKit框架。它基本上使用代理和注册的JSON转换器在Python和Javascript之间转换复合对象。

数据库对象的浏览器端是db.js 它需要proxy.js中的基本Python代理源代码。

在Python方面,有基本代理模块。 最后是webserver.py中的SqlAlchemy对象编码器。 它还依赖于models.py文件中的元数据提取器。