Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。
如何序列化SQLAlchemy查询结果为JSON格式?
我试过jsonpickle。编码,但它编码查询对象本身。
我尝试了json.dumps(items),但它返回
TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable
将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。
我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。
需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)
AlchemyEncoder是很棒的,但有时会失败的十进制值。这是一个改进的编码器,解决十进制问题-
class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
model_fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
data = obj.__getattribute__(field)
print data
try:
json.dumps(data) # this will fail on non-encodable values, like other classes
model_fields[field] = data
except TypeError:
model_fields[field] = None
return model_fields
if isinstance(obj, Decimal):
return float(obj)
return json.JSONEncoder.default(self, obj)
这是一个JSONEncoder版本,它保留了模型列的顺序,只保留递归定义的列和关系字段。它还格式化了大多数不可序列化的JSON类型:
import json
from datetime import datetime
from decimal import Decimal
import arrow
from sqlalchemy.ext.declarative import DeclarativeMeta
class SQLAlchemyJSONEncoder(json.JSONEncoder):
"""
SQLAlchemy ORM JSON Encoder
If you have a "backref" relationship defined in your SQLAlchemy model,
this encoder raises a ValueError to stop an infinite loop.
"""
def default(self, obj):
if isinstance(obj, datetime):
return arrow.get(obj).isoformat()
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, set):
return sorted(obj)
elif isinstance(obj.__class__, DeclarativeMeta):
for attribute, relationship in obj.__mapper__.relationships.items():
if isinstance(relationship.__getattribute__("backref"), tuple):
raise ValueError(
f'{obj.__class__} object has a "backref" relationship '
"that would cause an infinite loop!"
)
dictionary = {}
column_names = [column.name for column in obj.__table__.columns]
for key in column_names:
value = obj.__getattribute__(key)
if isinstance(value, datetime):
value = arrow.get(value).isoformat()
elif isinstance(value, Decimal):
value = float(value)
elif isinstance(value, set):
value = sorted(value)
dictionary[key] = value
for key in [
attribute
for attribute in dir(obj)
if not attribute.startswith("_")
and attribute != "metadata"
and attribute not in column_names
]:
value = obj.__getattribute__(key)
dictionary[key] = value
return dictionary
return super().default(obj)
Python 3.7+和Flask 1.1+可以使用内置的数据类包
from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app)
@dataclass
class User(db.Model):
id: int
email: str
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
email = db.Column(db.String(200), unique=True)
@app.route('/users/')
def users():
users = User.query.all()
return jsonify(users)
if __name__ == "__main__":
users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
db.create_all()
db.session.add_all(users)
db.session.commit()
app.run()
/users/路由现在将返回一个用户列表。
[
{"email": "user1@gmail.com", "id": 1},
{"email": "user2@gmail.com", "id": 2}
]
自动序列化相关模型
@dataclass
class Account(db.Model):
id: int
users: User
id = db.Column(db.Integer)
users = db.relationship(User) # User model would need a db.ForeignKey field
jsonify(account)的响应是这样的。
{
"id":1,
"users":[
{
"email":"user1@gmail.com",
"id":1
},
{
"email":"user2@gmail.com",
"id":2
}
]
}
覆盖默认的JSON编码器
from flask.json import JSONEncoder
class CustomJSONEncoder(JSONEncoder):
"Add support for serializing timedeltas"
def default(o):
if type(o) == datetime.timedelta:
return str(o)
if type(o) == datetime.datetime:
return o.isoformat()
return super().default(o)
app.json_encoder = CustomJSONEncoder
向任何模型添加一个_dict方法的动态方法
from sqlalchemy.inspection import inspect
def implement_as_dict(model):
if not hasattr(model,"as_dict"):
column_names=[]
imodel = inspect(model)
for c in imodel.columns:
column_names.append(c.key)
#define model.as_dict()
def as_dict(self):
d = {}
for c in column_names:
d[c] = getattr(self,c)
return d
setattr(model,"as_dict",as_dict)
#model definition
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
# adding as_dict definition to model
implement_as_dict(User)
然后你可以使用
user = session.query(User).filter_by(name='rick').first()
user.as_dict()
#sample output
{"id":1,"name":"rick"}
我对使用(太多?)字典的看法:
def serialize(_query):
#d = dictionary written to per row
#D = dictionary d is written to each time, then reset
#Master = dictionary of dictionaries; the id Key (int, unique from database)
from D is used as the Key for the dictionary D entry in Master
Master = {}
D = {}
x = 0
for u in _query:
d = u.__dict__
D = {}
for n in d.keys():
if n != '_sa_instance_state':
D[n] = d[n]
x = d['id']
Master[x] = D
return Master
使用flask(包括jsonify)和flask_sqlalchemy将输出打印为JSON。
使用jsonify(serialize())调用该函数。
与我迄今为止尝试过的所有SQLAlchemy查询一起工作(运行SQLite3)