Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。
如何序列化SQLAlchemy查询结果为JSON格式?
我试过jsonpickle。编码,但它编码查询对象本身。
我尝试了json.dumps(items),但它返回
TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable
将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。
我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。
需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)
Python 3.7+将于2023年发布
您可以将数据类装饰器添加到您的模型中,并定义一个自定义JSON序列化器,然后是JSON。转储将工作(通过向cls提供自定义编码器)。在下面的例子中,db_row是DB类的一个实例:
json.dumps(db_row, cls=models.CustomJSONEncoder)
{"id": 25, "name": "A component", "author": "Bob", "modified": "2023-02-08T11:49:15.675837"}
可以很容易地修改定制JSON序列化器,使其与任何原生JSON不可序列化的类型兼容。
models.py
from datetime import datetime
import dataclasses
import json
from sqlalchemy import Column, Integer, String, DateTime
from database import Base
@dataclasses.dataclass # <<-- add this decorator
class DB(Base):
"""Model used for SQLite database entries."""
__tablename__ = "components"
id: int = Column(Integer, primary_key=True, index=True)
name: str = Column(String)
author: str = Column(String)
modified: datetime = Column(DateTime(timezone=True), default=datetime.utcnow)
class CustomJSONEncoder(json.JSONEncoder): # <<-- Add this custom encoder
"""Custom JSON encoder for the DB class."""
def default(self, o):
if dataclasses.is_dataclass(o): # this serializes anything dataclass can handle
return dataclasses.asdict(o)
if isinstance(o, datetime): # this adds support for datetime
return o.isoformat()
return super().default(o)
为了进一步扩展它,使它适用于你在数据库中可能使用的任何不可序列化的类型,在自定义编码器类中添加另一条if语句,返回一些可序列化的东西(例如str)。
Python 3.7+和Flask 1.1+可以使用内置的数据类包
from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app)
@dataclass
class User(db.Model):
id: int
email: str
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
email = db.Column(db.String(200), unique=True)
@app.route('/users/')
def users():
users = User.query.all()
return jsonify(users)
if __name__ == "__main__":
users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
db.create_all()
db.session.add_all(users)
db.session.commit()
app.run()
/users/路由现在将返回一个用户列表。
[
{"email": "user1@gmail.com", "id": 1},
{"email": "user2@gmail.com", "id": 2}
]
自动序列化相关模型
@dataclass
class Account(db.Model):
id: int
users: User
id = db.Column(db.Integer)
users = db.relationship(User) # User model would need a db.ForeignKey field
jsonify(account)的响应是这样的。
{
"id":1,
"users":[
{
"email":"user1@gmail.com",
"id":1
},
{
"email":"user2@gmail.com",
"id":2
}
]
}
覆盖默认的JSON编码器
from flask.json import JSONEncoder
class CustomJSONEncoder(JSONEncoder):
"Add support for serializing timedeltas"
def default(o):
if type(o) == datetime.timedelta:
return str(o)
if type(o) == datetime.datetime:
return o.isoformat()
return super().default(o)
app.json_encoder = CustomJSONEncoder
你可以像这样使用SqlAlchemy的自省:
mysql = SQLAlchemy()
from sqlalchemy import inspect
class Contacts(mysql.Model):
__tablename__ = 'CONTACTS'
id = mysql.Column(mysql.Integer, primary_key=True)
first_name = mysql.Column(mysql.String(128), nullable=False)
last_name = mysql.Column(mysql.String(128), nullable=False)
phone = mysql.Column(mysql.String(128), nullable=False)
email = mysql.Column(mysql.String(128), nullable=False)
street = mysql.Column(mysql.String(128), nullable=False)
zip_code = mysql.Column(mysql.String(128), nullable=False)
city = mysql.Column(mysql.String(128), nullable=False)
def toDict(self):
return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }
@app.route('/contacts',methods=['GET'])
def getContacts():
contacts = Contacts.query.all()
contactsArr = []
for contact in contacts:
contactsArr.append(contact.toDict())
return jsonify(contactsArr)
@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
contact = Contacts.query.get(id)
return jsonify(contact.toDict())
从下面的答案中得到启发:
将sqlalchemy行对象转换为python dict
我知道这是一个相当老的帖子。我采取了@SashaB给出的解决方案,并根据我的需要进行了修改。
我添加了以下内容:
字段忽略列表:序列化时要忽略的字段列表
字段替换列表:包含在序列化时要被值替换的字段名的字典。
删除方法和BaseQuery被序列化
我的代码如下:
def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
"""
Serialize SQLAlchemy result into JSon
:param revisit_self: True / False
:param fields_to_expand: Fields which are to be expanded for including their children and all
:param fields_to_ignore: Fields to be ignored while encoding
:param fields_to_replace: Field keys to be replaced by values assigned in dictionary
:return: Json serialized SQLAlchemy object
"""
_visited_objs = []
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if revisit_self:
if obj in _visited_objs:
return None
_visited_objs.append(obj)
# go through each field in this SQLalchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
val = obj.__getattribute__(field)
# is this field method defination, or an SQLalchemy object
if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
field_name = fields_to_replace[field] if field in fields_to_replace else field
# is this field another SQLalchemy object, or a list of SQLalchemy objects?
if isinstance(val.__class__, DeclarativeMeta) or \
(isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
# unless we're expanding this field, stop here
if field not in fields_to_expand:
# not expanding this field: set it to None and continue
fields[field_name] = None
continue
fields[field_name] = val
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
return AlchemyEncoder
希望它能帮助到一些人!
向任何模型添加一个_dict方法的动态方法
from sqlalchemy.inspection import inspect
def implement_as_dict(model):
if not hasattr(model,"as_dict"):
column_names=[]
imodel = inspect(model)
for c in imodel.columns:
column_names.append(c.key)
#define model.as_dict()
def as_dict(self):
d = {}
for c in column_names:
d[c] = getattr(self,c)
return d
setattr(model,"as_dict",as_dict)
#model definition
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
# adding as_dict definition to model
implement_as_dict(User)
然后你可以使用
user = session.query(User).filter_by(name='rick').first()
user.as_dict()
#sample output
{"id":1,"name":"rick"}