我想从数据库中获得一个对象,如果它已经存在(基于提供的参数)或创建它,如果它不存在。

Django的get_or_create(或source)可以做到这一点。在SQLAlchemy中是否有等价的快捷方式?

我现在明确地像这样写出来:

def get_or_create_instrument(session, serial_number):
    instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
    if instrument:
        return instrument
    else:
        instrument = Instrument(serial_number)
        session.add(instrument)
        return instrument

当前回答

我稍微简化了一下@凯文。避免将整个函数包装在if/else语句中的解决方案。这样就只有一次返回,我觉得更干净:

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()

    if not instance:
        instance = model(**kwargs)
        session.add(instance)

    return instance

其他回答

基本上就是这么做的,没有捷径可走。

当然,你可以把它概括为:

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        params = {k: v for k, v in kwargs.items() if not isinstance(v, ClauseElement)}
        params.update(defaults or {})
        instance = model(**params)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

2020年更新(Python 3.9+ ONLY)

下面是一个简洁的版本,使用Python 3.9的新字典联合运算符(|=)

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        kwargs |= defaults or {}
        instance = model(**kwargs)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

注意:

类似于Django版本,这将捕获重复的关键约束和类似的错误。如果你的get或create不能保证返回一个结果,它仍然会导致竞争条件。

为了缓解这个问题,你需要在session.commit()之后添加另一个one_or_none()样式的获取。这仍然不能100%保证不出现竞争条件,除非您还使用with_for_update()或可序列化事务模式。

我一直在研究这个问题,并最终得到了一个相当强大的解决方案:

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), False
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        created = getattr(model, create_method, model)(**kwargs)
        try:
            session.add(created)
            session.flush()
            return created, True
        except IntegrityError:
            session.rollback()
            return session.query(model).filter_by(**kwargs).one(), False

我只是写了一篇关于所有细节的相当广泛的博客文章,但有一些关于我为什么使用它的想法。

它解包到一个元组,该元组告诉您对象是否存在。这在您的工作流中通常是有用的。 该函数提供了使用@classmethod修饰的创建者函数(以及特定于它们的属性)的能力。 当有多个进程连接到数据存储时,该解决方案可以防止Race Conditions。

编辑:我已经将session.commit()更改为session.flush(),如本文所述。注意,这些决策是特定于所使用的数据存储的(在本例中是Postgres)。

编辑2:我在函数中使用{}作为默认值进行更新,因为这是典型的Python陷阱。谢谢你的评论,奈杰尔!如果你对这个问题感到好奇,看看这个StackOverflow的问题和这篇博客文章。

根据所采用的隔离级别,上述解决方案都不起作用。 我发现的最好的解决方案是一个RAW SQL在以下形式:

INSERT INTO table(f1, f2, unique_f3) 
SELECT 'v1', 'v2', 'v3' 
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')

无论隔离级别和并行度如何,这都是事务安全的。

注意:为了提高效率,明智的做法是为唯一的列使用INDEX。

有一个Python包包含@erik的解决方案以及一个版本的update_or_create()。https://github.com/enricobarzetti/sqlalchemy_get_or_create

埃里克精彩回答的修改版

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), True
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        try:
            with session.begin_nested():
                created = getattr(model, create_method, model)(**kwargs)
                session.add(created)
            return created, False
        except IntegrityError:
            return session.query(model).filter_by(**kwargs).one(), True

Use a nested transaction to only roll back the addition of the new item instead of rolling back everything (See this answer to use nested transactions with SQLite) Move create_method. If the created object has relations and it is assigned members through those relations, it is automatically added to the session. E.g. create a book, which has user_id and user as corresponding relationship, then doing book.user=<user object> inside of create_method will add book to the session. This means that create_method must be inside with to benefit from an eventual rollback. Note that begin_nested automatically triggers a flush.

注意,如果使用MySQL,事务隔离级别必须设置为READ COMMITTED,而不是REPEATABLE READ。Django的get_or_create(和这里)使用了相同的策略,请参阅Django文档。