我想从数据库中获得一个对象,如果它已经存在(基于提供的参数)或创建它,如果它不存在。

Django的get_or_create(或source)可以做到这一点。在SQLAlchemy中是否有等价的快捷方式?

我现在明确地像这样写出来:

def get_or_create_instrument(session, serial_number):
    instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
    if instrument:
        return instrument
    else:
        instrument = Instrument(serial_number)
        session.add(instrument)
        return instrument

当前回答

根据所采用的隔离级别,上述解决方案都不起作用。 我发现的最好的解决方案是一个RAW SQL在以下形式:

INSERT INTO table(f1, f2, unique_f3) 
SELECT 'v1', 'v2', 'v3' 
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')

无论隔离级别和并行度如何,这都是事务安全的。

注意:为了提高效率,明智的做法是为唯一的列使用INDEX。

其他回答

我一直在研究这个问题,并最终得到了一个相当强大的解决方案:

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), False
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        created = getattr(model, create_method, model)(**kwargs)
        try:
            session.add(created)
            session.flush()
            return created, True
        except IntegrityError:
            session.rollback()
            return session.query(model).filter_by(**kwargs).one(), False

我只是写了一篇关于所有细节的相当广泛的博客文章,但有一些关于我为什么使用它的想法。

它解包到一个元组,该元组告诉您对象是否存在。这在您的工作流中通常是有用的。 该函数提供了使用@classmethod修饰的创建者函数(以及特定于它们的属性)的能力。 当有多个进程连接到数据存储时,该解决方案可以防止Race Conditions。

编辑:我已经将session.commit()更改为session.flush(),如本文所述。注意,这些决策是特定于所使用的数据存储的(在本例中是Postgres)。

编辑2:我在函数中使用{}作为默认值进行更新,因为这是典型的Python陷阱。谢谢你的评论,奈杰尔!如果你对这个问题感到好奇,看看这个StackOverflow的问题和这篇博客文章。

有一个Python包包含@erik的解决方案以及一个版本的update_or_create()。https://github.com/enricobarzetti/sqlalchemy_get_or_create

埃里克精彩回答的修改版

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), True
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        try:
            with session.begin_nested():
                created = getattr(model, create_method, model)(**kwargs)
                session.add(created)
            return created, False
        except IntegrityError:
            return session.query(model).filter_by(**kwargs).one(), True

Use a nested transaction to only roll back the addition of the new item instead of rolling back everything (See this answer to use nested transactions with SQLite) Move create_method. If the created object has relations and it is assigned members through those relations, it is automatically added to the session. E.g. create a book, which has user_id and user as corresponding relationship, then doing book.user=<user object> inside of create_method will add book to the session. This means that create_method must be inside with to benefit from an eventual rollback. Note that begin_nested automatically triggers a flush.

注意,如果使用MySQL,事务隔离级别必须设置为READ COMMITTED,而不是REPEATABLE READ。Django的get_or_create(和这里)使用了相同的策略,请参阅Django文档。

我稍微简化了一下@凯文。避免将整个函数包装在if/else语句中的解决方案。这样就只有一次返回,我觉得更干净:

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()

    if not instance:
        instance = model(**kwargs)
        session.add(instance)

    return instance

这个SQLALchemy食谱很好地完成了这项工作。

首先要做的是定义一个函数,给它一个Session来使用,并将一个字典与Session()关联起来,Session()用来跟踪当前唯一的键。

def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
    cache = getattr(session, '_unique_cache', None)
    if cache is None:
        session._unique_cache = cache = {}

    key = (cls, hashfunc(*arg, **kw))
    if key in cache:
        return cache[key]
    else:
        with session.no_autoflush:
            q = session.query(cls)
            q = queryfunc(q, *arg, **kw)
            obj = q.first()
            if not obj:
                obj = constructor(*arg, **kw)
                session.add(obj)
        cache[key] = obj
        return obj

使用这个函数的一个例子是在mixin中:

class UniqueMixin(object):
    @classmethod
    def unique_hash(cls, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def unique_filter(cls, query, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def as_unique(cls, session, *arg, **kw):
        return _unique(
                    session,
                    cls,
                    cls.unique_hash,
                    cls.unique_filter,
                    cls,
                    arg, kw
            )

最后创建唯一的get_or_create模型:

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

engine = create_engine('sqlite://', echo=True)

Session = sessionmaker(bind=engine)

class Widget(UniqueMixin, Base):
    __tablename__ = 'widget'

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)

    @classmethod
    def unique_hash(cls, name):
        return name

    @classmethod
    def unique_filter(cls, query, name):
        return query.filter(Widget.name == name)

Base.metadata.create_all(engine)

session = Session()

w1, w2, w3 = Widget.as_unique(session, name='w1'), \
                Widget.as_unique(session, name='w2'), \
                Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')

assert w1 is w1b
assert w2 is not w3
assert w2 is not w1

session.commit()

这个食谱更深入地阐述了这个想法,并提供了不同的方法,但我使用这个方法非常成功。