Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

虽然这是一篇老文章,也许我没有回答上面的问题,但我想谈谈我的连载,至少它对我有用。

我使用FastAPI,SqlAlchemy和MySQL,但我不使用orm模型;

# from sqlalchemy import create_engine
# from sqlalchemy.orm import sessionmaker
# engine = create_engine(config.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

序列化代码



import decimal
import datetime


def alchemy_encoder(obj):
    """JSON encoder function for SQLAlchemy special classes."""
    if isinstance(obj, datetime.date):
        return obj.strftime("%Y-%m-%d %H:%M:%S")
    elif isinstance(obj, decimal.Decimal):
        return float(obj)

import json
from sqlalchemy import text

# db is SessionLocal() object 

app_sql = 'SELECT * FROM app_info ORDER BY app_id LIMIT :page,:page_size'

# The next two are the parameters passed in
page = 1
page_size = 10

# execute sql and return a <class 'sqlalchemy.engine.result.ResultProxy'> object
app_list = db.execute(text(app_sql), {'page': page, 'page_size': page_size})

# serialize
res = json.loads(json.dumps([dict(r) for r in app_list], default=alchemy_encoder))

如果不行,请忽略我的回答。我在这里提到它

https://codeandlife.com/2014/12/07/sqlalchemy-results-to-json-the-easy-way/

其他回答

我知道这是一个相当老的帖子。我采取了@SashaB给出的解决方案,并根据我的需要进行了修改。

我添加了以下内容:

字段忽略列表:序列化时要忽略的字段列表 字段替换列表:包含在序列化时要被值替换的字段名的字典。 删除方法和BaseQuery被序列化

我的代码如下:

def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
   """
   Serialize SQLAlchemy result into JSon
   :param revisit_self: True / False
   :param fields_to_expand: Fields which are to be expanded for including their children and all
   :param fields_to_ignore: Fields to be ignored while encoding
   :param fields_to_replace: Field keys to be replaced by values assigned in dictionary
   :return: Json serialized SQLAlchemy object
   """
   _visited_objs = []
   class AlchemyEncoder(json.JSONEncoder):
      def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # don't re-visit self
            if revisit_self:
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

            # go through each field in this SQLalchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
                val = obj.__getattribute__(field)
                # is this field method defination, or an SQLalchemy object
                if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
                    field_name = fields_to_replace[field] if field in fields_to_replace else field
                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or \
                            (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field_name] = None
                            continue

                    fields[field_name] = val
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)
   return AlchemyEncoder

希望它能帮助到一些人!

Python 3.7+和Flask 1.1+可以使用内置的数据类包

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)


@dataclass
class User(db.Model):
  id: int
  email: str

  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)


@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  


if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

/users/路由现在将返回一个用户列表。

[
  {"email": "user1@gmail.com", "id": 1},
  {"email": "user2@gmail.com", "id": 2}
]

自动序列化相关模型

@dataclass
class Account(db.Model):
  id: int
  users: User

  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

jsonify(account)的响应是这样的。

{  
   "id":1,
   "users":[  
      {  
         "email":"user1@gmail.com",
         "id":1
      },
      {  
         "email":"user2@gmail.com",
         "id":2
      }
   ]
}

覆盖默认的JSON编码器

from flask.json import JSONEncoder


class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"

  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    if type(o) == datetime.datetime:
      return o.isoformat()
    return super().default(o)

app.json_encoder = CustomJSONEncoder      

(Sasha B的回答非常棒)

这特别地将datetime对象转换为字符串,在原始答案中将转换为None:

# Standard library imports
from datetime import datetime
import json

# 3rd party imports
from sqlalchemy.ext.declarative import DeclarativeMeta

class JsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            dict = {}

            # Remove invalid fields and just get the column attributes
            columns = [x for x in dir(obj) if not x.startswith("_") and x != "metadata"]

            for column in columns:
                value = obj.__getattribute__(column)

                try:
                    json.dumps(value)
                    dict[column] = value
                except TypeError:
                    if isinstance(value, datetime):
                        dict[column] = value.__str__()
                    else:
                        dict[column] = None
            return dict

        return json.JSONEncoder.default(self, obj)

扁平化实现

你可以使用这样的代码:

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

然后转换为JSON使用:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

它将忽略不可编码的字段(将它们设置为“None”)。

它不会自动展开关系(因为这可能导致自引用,并永远循环)。

递归的非循环实现

然而,如果你宁愿永远循环,你可以使用:

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

然后对对象进行编码,使用:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

这将编码所有的子代、子代、子代……基本上可以编码你的整个数据库。当它到达之前编码过的东西时,它会将其编码为“None”。

递归的、可能是循环的、有选择的实现

另一种选择,可能更好,是能够指定你想要展开的字段:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

你现在可以调用它:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

例如,仅展开名为“parents”的SQLAlchemy字段。

安装simplejson by PIP安装simplejson并创建一个类

class Serialise(object):

    def _asdict(self):
        """
        Serialization logic for converting entities using flask's jsonify

        :return: An ordered dictionary
        :rtype: :class:`collections.OrderedDict`
        """

        result = OrderedDict()
        # Get the columns
        for key in self.__mapper__.c.keys():
            if isinstance(getattr(self, key), datetime):
                result["x"] = getattr(self, key).timestamp() * 1000
                result["timestamp"] = result["x"]
            else:
                result[key] = getattr(self, key)

        return result

并将这个类继承到每个orm类,这样这个_asdict函数就会注册到每个orm类,然后。 并在任何地方使用jsonify