Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

我建议用棉花糖。它允许您创建序列化器来表示支持关系和嵌套对象的模型实例。

以下是他们文档中的一个删节的例子。以ORM模型为例,作者:

class Author(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    first = db.Column(db.String(80))
    last = db.Column(db.String(80))

该类的棉花糖模式是这样构造的:

class AuthorSchema(Schema):
    id = fields.Int(dump_only=True)
    first = fields.Str()
    last = fields.Str()
    formatted_name = fields.Method("format_name", dump_only=True)

    def format_name(self, author):
        return "{}, {}".format(author.last, author.first)

...并像这样使用:

author_schema = AuthorSchema()
author_schema.dump(Author.query.first())

...会产生这样的输出:

{
        "first": "Tim",
        "formatted_name": "Peters, Tim",
        "id": 1,
        "last": "Peters"
}

看看他们完整的Flask-SQLAlchemy示例。

一个名为marshmlow - SQLAlchemy的库专门集成了SQLAlchemy和marshmallow。在这个库中,上面描述的Author模型的模式如下所示:

class AuthorSchema(ModelSchema):
    class Meta:
        model = Author

该集成允许从SQLAlchemy Column类型推断字段类型。

marshmallow-sqlalchemy这里。

其他回答

你可以把你的对象输出为一个字典:

class User:
   def as_dict(self):
       return {c.name: getattr(self, c.name) for c in self.__table__.columns}

然后使用User.as_dict()序列化对象。

如将sqlalchemy行对象转换为python dict中所述

更详细的解释。 在你的模型中,添加:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()是针对python3的,所以如果使用python2则使用unicode()。它应该有助于反序列化日期。如果不处理这些,你可以删除它。

现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

需要First()来避免奇怪的错误。As_dict()现在将反序列化结果。反序列化之后,就可以将其转换为json了

jsonify(some_result)

当使用sqlalchemy连接到db I时,这是一个高度可配置的简单解决方案。使用熊猫。

import pandas as pd
import sqlalchemy

#sqlalchemy engine configuration
engine = sqlalchemy.create_engine....

def my_function():
  #read in from sql directly into a pandas dataframe
  #check the pandas documentation for additional config options
  sql_DF = pd.read_sql_table("table_name", con=engine)

  # "orient" is optional here but allows you to specify the json formatting you require
  sql_json = sql_DF.to_json(orient="index")

  return sql_json

下面是一个解决方案,它允许您选择希望在输出中包含的关系。 注意:这是一个完整的重写,将dict/str作为一个参数,而不是一个列表。修复了一些东西..

def deep_dict(self, relations={}):
    """Output a dict of an SA object recursing as deep as you want.

    Takes one argument, relations which is a dictionary of relations we'd
    like to pull out. The relations dict items can be a single relation
    name or deeper relation names connected by sub dicts

    Example:
        Say we have a Person object with a family relationship
            person.deep_dict(relations={'family':None})
        Say the family object has homes as a relation then we can do
            person.deep_dict(relations={'family':{'homes':None}})
            OR
            person.deep_dict(relations={'family':'homes'})
        Say homes has a relation like rooms you can do
            person.deep_dict(relations={'family':{'homes':'rooms'}})
            and so on...
    """
    mydict =  dict((c, str(a)) for c, a in
                    self.__dict__.items() if c != '_sa_instance_state')
    if not relations:
        # just return ourselves
        return mydict

    # otherwise we need to go deeper
    if not isinstance(relations, dict) and not isinstance(relations, str):
        raise Exception("relations should be a dict, it is of type {}".format(type(relations)))

    # got here so check and handle if we were passed a dict
    if isinstance(relations, dict):
        # we were passed deeper info
        for left, right in relations.items():
            myrel = getattr(self, left)
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=right)
    # if we get here check and handle if we were passed a string
    elif isinstance(relations, str):
        # passed a single item
        myrel = getattr(self, relations)
        left = relations
        if isinstance(myrel, list):
            mydict[left] = [rel.deep_dict(relations=None)
                                 for rel in myrel]
        else:
            mydict[left] = myrel.deep_dict(relations=None)

    return mydict

举个关于person/family/homes/rooms的例子…把它转换成json,你只需要

json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))

我知道这是一个相当老的帖子。我采取了@SashaB给出的解决方案,并根据我的需要进行了修改。

我添加了以下内容:

字段忽略列表:序列化时要忽略的字段列表 字段替换列表:包含在序列化时要被值替换的字段名的字典。 删除方法和BaseQuery被序列化

我的代码如下:

def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
   """
   Serialize SQLAlchemy result into JSon
   :param revisit_self: True / False
   :param fields_to_expand: Fields which are to be expanded for including their children and all
   :param fields_to_ignore: Fields to be ignored while encoding
   :param fields_to_replace: Field keys to be replaced by values assigned in dictionary
   :return: Json serialized SQLAlchemy object
   """
   _visited_objs = []
   class AlchemyEncoder(json.JSONEncoder):
      def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # don't re-visit self
            if revisit_self:
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

            # go through each field in this SQLalchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
                val = obj.__getattribute__(field)
                # is this field method defination, or an SQLalchemy object
                if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
                    field_name = fields_to_replace[field] if field in fields_to_replace else field
                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or \
                            (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field_name] = None
                            continue

                    fields[field_name] = val
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)
   return AlchemyEncoder

希望它能帮助到一些人!