Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

更详细的解释。 在你的模型中,添加:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()是针对python3的,所以如果使用python2则使用unicode()。它应该有助于反序列化日期。如果不处理这些,你可以删除它。

现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

需要First()来避免奇怪的错误。As_dict()现在将反序列化结果。反序列化之后,就可以将其转换为json了

jsonify(some_result)

其他回答

扁平化实现

你可以使用这样的代码:

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

然后转换为JSON使用:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

它将忽略不可编码的字段(将它们设置为“None”)。

它不会自动展开关系(因为这可能导致自引用,并永远循环)。

递归的非循环实现

然而,如果你宁愿永远循环,你可以使用:

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

然后对对象进行编码,使用:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

这将编码所有的子代、子代、子代……基本上可以编码你的整个数据库。当它到达之前编码过的东西时,它会将其编码为“None”。

递归的、可能是循环的、有选择的实现

另一种选择,可能更好,是能够指定你想要展开的字段:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

你现在可以调用它:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

例如,仅展开名为“parents”的SQLAlchemy字段。

你可以把你的对象输出为一个字典:

class User:
   def as_dict(self):
       return {c.name: getattr(self, c.name) for c in self.__table__.columns}

然后使用User.as_dict()序列化对象。

如将sqlalchemy行对象转换为python dict中所述

虽然这是一篇老文章,也许我没有回答上面的问题,但我想谈谈我的连载,至少它对我有用。

我使用FastAPI,SqlAlchemy和MySQL,但我不使用orm模型;

# from sqlalchemy import create_engine
# from sqlalchemy.orm import sessionmaker
# engine = create_engine(config.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

序列化代码



import decimal
import datetime


def alchemy_encoder(obj):
    """JSON encoder function for SQLAlchemy special classes."""
    if isinstance(obj, datetime.date):
        return obj.strftime("%Y-%m-%d %H:%M:%S")
    elif isinstance(obj, decimal.Decimal):
        return float(obj)

import json
from sqlalchemy import text

# db is SessionLocal() object 

app_sql = 'SELECT * FROM app_info ORDER BY app_id LIMIT :page,:page_size'

# The next two are the parameters passed in
page = 1
page_size = 10

# execute sql and return a <class 'sqlalchemy.engine.result.ResultProxy'> object
app_list = db.execute(text(app_sql), {'page': page, 'page_size': page_size})

# serialize
res = json.loads(json.dumps([dict(r) for r in app_list], default=alchemy_encoder))

如果不行,请忽略我的回答。我在这里提到它

https://codeandlife.com/2014/12/07/sqlalchemy-results-to-json-the-easy-way/

向任何模型添加一个_dict方法的动态方法

from sqlalchemy.inspection import inspect

def implement_as_dict(model):
    if not hasattr(model,"as_dict"):
        column_names=[]
        imodel = inspect(model)
        for c in imodel.columns:
            column_names.append(c.key)

        #define model.as_dict()
        def as_dict(self):
            d = {}
            for c in column_names:
                d[c] = getattr(self,c)
            return d

        setattr(model,"as_dict",as_dict)

#model definition
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
# adding as_dict definition to model
implement_as_dict(User)

然后你可以使用

user = session.query(User).filter_by(name='rick').first() 

user.as_dict()
#sample output 
{"id":1,"name":"rick"}

这是一个JSONEncoder版本,它保留了模型列的顺序,只保留递归定义的列和关系字段。它还格式化了大多数不可序列化的JSON类型:

import json
from datetime import datetime
from decimal import Decimal

import arrow
from sqlalchemy.ext.declarative import DeclarativeMeta

class SQLAlchemyJSONEncoder(json.JSONEncoder):
    """
    SQLAlchemy ORM JSON Encoder
    If you have a "backref" relationship defined in your SQLAlchemy model,
    this encoder raises a ValueError to stop an infinite loop.
    """

    def default(self, obj):
        if isinstance(obj, datetime):
            return arrow.get(obj).isoformat()
        elif isinstance(obj, Decimal):
            return float(obj)
        elif isinstance(obj, set):
            return sorted(obj)
        elif isinstance(obj.__class__, DeclarativeMeta):
            for attribute, relationship in obj.__mapper__.relationships.items():
                if isinstance(relationship.__getattribute__("backref"), tuple):
                    raise ValueError(
                        f'{obj.__class__} object has a "backref" relationship '
                        "that would cause an infinite loop!"
                    )
            dictionary = {}
            column_names = [column.name for column in obj.__table__.columns]
            for key in column_names:
                value = obj.__getattribute__(key)
                if isinstance(value, datetime):
                    value = arrow.get(value).isoformat()
                elif isinstance(value, Decimal):
                    value = float(value)
                elif isinstance(value, set):
                    value = sorted(value)
                dictionary[key] = value
            for key in [
                attribute
                for attribute in dir(obj)
                if not attribute.startswith("_")
                and attribute != "metadata"
                and attribute not in column_names
            ]:
                value = obj.__getattribute__(key)
                dictionary[key] = value
            return dictionary

        return super().default(obj)