Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

我对使用(太多?)字典的看法:

def serialize(_query):
    #d = dictionary written to per row
    #D = dictionary d is written to each time, then reset
    #Master = dictionary of dictionaries; the id Key (int, unique from database) 
    from D is used as the Key for the dictionary D entry in Master
    Master = {}
    D = {}
    x = 0
    for u in _query:
        d = u.__dict__
        D = {}
        for n in d.keys():
           if n != '_sa_instance_state':
                    D[n] = d[n]
        x = d['id']
        Master[x] = D
    return Master

使用flask(包括jsonify)和flask_sqlalchemy将输出打印为JSON。

使用jsonify(serialize())调用该函数。

与我迄今为止尝试过的所有SQLAlchemy查询一起工作(运行SQLite3)

其他回答

def alc2json(row):
    return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])

我想和她玩会儿代码高尔夫。

供参考:我使用automap_base,因为我们有一个根据业务需求单独设计的模式。我今天才开始使用SQLAlchemy,但是文档指出automap_base是declarative_base的扩展,这似乎是SQLAlchemy ORM中的典型范例,所以我相信这应该可以工作。

根据Tjorriemorrie的解决方案,它并没有跟随外键,而是简单地将列与值匹配,并通过str()-ing列值来处理Python类型。我们的值包括Python datetime。时间和小数。十进位类类型的结果,所以它完成了工作。

希望对路人有所帮助!

下面是一个解决方案,它允许您选择希望在输出中包含的关系。 注意:这是一个完整的重写,将dict/str作为一个参数,而不是一个列表。修复了一些东西..

def deep_dict(self, relations={}):
    """Output a dict of an SA object recursing as deep as you want.

    Takes one argument, relations which is a dictionary of relations we'd
    like to pull out. The relations dict items can be a single relation
    name or deeper relation names connected by sub dicts

    Example:
        Say we have a Person object with a family relationship
            person.deep_dict(relations={'family':None})
        Say the family object has homes as a relation then we can do
            person.deep_dict(relations={'family':{'homes':None}})
            OR
            person.deep_dict(relations={'family':'homes'})
        Say homes has a relation like rooms you can do
            person.deep_dict(relations={'family':{'homes':'rooms'}})
            and so on...
    """
    mydict =  dict((c, str(a)) for c, a in
                    self.__dict__.items() if c != '_sa_instance_state')
    if not relations:
        # just return ourselves
        return mydict

    # otherwise we need to go deeper
    if not isinstance(relations, dict) and not isinstance(relations, str):
        raise Exception("relations should be a dict, it is of type {}".format(type(relations)))

    # got here so check and handle if we were passed a dict
    if isinstance(relations, dict):
        # we were passed deeper info
        for left, right in relations.items():
            myrel = getattr(self, left)
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=right)
    # if we get here check and handle if we were passed a string
    elif isinstance(relations, str):
        # passed a single item
        myrel = getattr(self, relations)
        left = relations
        if isinstance(myrel, list):
            mydict[left] = [rel.deep_dict(relations=None)
                                 for rel in myrel]
        else:
            mydict[left] = myrel.deep_dict(relations=None)

    return mydict

举个关于person/family/homes/rooms的例子…把它转换成json,你只需要

json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))
class SqlToDict:
    def __init__(self, data) -> None:
        self.data = data

    def to_timestamp(self, date):
        if isinstance(date, datetime):
            return int(datetime.timestamp(date))
        else:
            return date

    def to_dict(self) -> List:
        arr = []
        for i in self.data:
            keys = [*i.keys()]
            values = [*i]
            values = [self.to_timestamp(d) for d in values]
            arr.append(dict(zip(keys, values)))
        return arr

例如:

SqlToDict(data).to_dict()

扁平化实现

你可以使用这样的代码:

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

然后转换为JSON使用:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

它将忽略不可编码的字段(将它们设置为“None”)。

它不会自动展开关系(因为这可能导致自引用,并永远循环)。

递归的非循环实现

然而,如果你宁愿永远循环,你可以使用:

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

然后对对象进行编码,使用:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

这将编码所有的子代、子代、子代……基本上可以编码你的整个数据库。当它到达之前编码过的东西时,它会将其编码为“None”。

递归的、可能是循环的、有选择的实现

另一种选择,可能更好,是能够指定你想要展开的字段:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

你现在可以调用它:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

例如,仅展开名为“parents”的SQLAlchemy字段。

下面的代码将sqlalchemy结果序列化为json。

import json
from collections import OrderedDict


def asdict(self):
    result = OrderedDict()
    for key in self.__mapper__.c.keys():
        if getattr(self, key) is not None:
            result[key] = str(getattr(self, key))
        else:
            result[key] = getattr(self, key)
    return result


def to_array(all_vendors):
    v = [ ven.asdict() for ven in all_vendors ]
    return json.dumps(v) 

叫有趣,

def all_products():
    all_products = Products.query.all()
    return to_array(all_products)