一个经常被问到的问题是如何做一个upsert,也就是MySQL所说的INSERT…ON DUPLICATE UPDATE和标准支持作为MERGE操作的一部分。

考虑到PostgreSQL不直接支持它(在pg 9.5之前),你如何做到这一点?考虑以下几点:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

现在想象一下,你想要“upsert”元组(2,'Joe'), (3, 'Alan'),那么新的表内容将是:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

这就是人们在讨论upsert时谈论的内容。至关重要的是,任何方法在同一个表上存在多个事务时都必须是安全的——可以使用显式锁定,也可以防止产生竞争条件。

这个话题在PostgreSQL的Insert, on duplicate update ?,但这是关于MySQL语法的替代方案,随着时间的推移,它增加了相当多不相关的细节。我正在研究明确的答案。

这些技术对于“如果不存在就插入,否则什么都不做”也很有用。“插入…重复键忽略”。


当前回答

因为这个问题已经结束了,所以我在这里发布了如何使用SQLAlchemy来解决这个问题。通过递归,它重新尝试批量插入或更新以解决竞态条件和验证错误。

首先是进口

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

现在有几个辅助函数

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

最后是upsert函数

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

下面是你如何使用它

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

与bulk_save_objects相比,它的优点是可以处理插入上的关系、错误检查等(与批量操作不同)。

其他回答

我正在尝试为PostgreSQL 9.5之前版本的单次插入问题提供另一种解决方案。这个想法很简单,首先尝试执行插入,如果记录已经存在,则更新它:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

请注意,只有在不删除表中的行时才可以应用此解决方案。

我不知道这个解决方案的效率如何,但在我看来它是合理的。

WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

在Postgresql 9.3上测试

这里有一些插入…关于冲突……(pg 9.5+):

Insert, on conflict - do nothing. insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict do nothing;` Insert, on conflict - do update, specify conflict target via column. insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict(id) do update set name = 'new_name', size = 3; Insert, on conflict - do update, specify conflict target via constraint name. insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict on constraint dummy_pkey do update set name = 'new_name', size = 4;

9.5及更新版本:

PostgreSQL 9.5及更新版本支持INSERT…当冲突(键)时执行UPDATE(和当冲突(键)时不执行任何操作),即upsert。

与ON重复键更新的比较。

快速的解释。

有关使用方法,请参阅手册-特别是语法图中的conflict_action子句和解释性文本。

与下面给出的9.4及更老版本的解决方案不同,此特性适用于多个冲突行的情况,并且不需要排他锁定或重试循环。

添加该特性的提交在这里,围绕其开发的讨论在这里。


如果您使用的是9.5并且不需要向后兼容,您现在可以停止阅读。


9.4及以上版本:

PostgreSQL没有任何内置的UPSERT(或MERGE)功能,并且在并发使用的情况下高效地执行它是非常困难的。

本文详细讨论了这个问题。

一般来说,你有两个选择:

重试循环中的单个插入/更新操作;或 锁定表并进行批量合并

个别行重试循环

如果希望多个连接同时尝试执行插入,那么在重试循环中使用单独的行upserts是合理的选择。

PostgreSQL文档包含了一个有用的过程,可以让你在数据库内部的循环中完成这个过程。与大多数简单的解决方案不同,它防止丢失更新和插入竞赛。它只能在READ COMMITTED模式下工作,并且只有当它是您在事务中所做的唯一一件事时才安全。如果触发器或次要惟一键导致惟一违反,则该函数将无法正常工作。

这种策略效率很低。只要可行,您应该将工作排队,并按照下面描述的方式进行批量upsert。

Many attempted solutions to this problem fail to consider rollbacks, so they result in incomplete updates. Two transactions race with each other; one of them successfully INSERTs; the other gets a duplicate key error and does an UPDATE instead. The UPDATE blocks waiting for the INSERT to rollback or commit. When it rolls back, the UPDATE condition re-check matches zero rows, so even though the UPDATE commits it hasn't actually done the upsert you expected. You have to check the result row counts and re-try where necessary.

一些尝试的解决方案也没有考虑SELECT竞争。如果你尝试一些显而易见的简单方法:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

当两种模式同时运行时,就有几种失效模式。一个是已经讨论过的更新重新检查问题。另一种情况是同时进行UPDATE,匹配零行并继续。然后它们都执行EXISTS测试,该测试发生在INSERT之前。都得到0行,所以都执行INSERT。其中一个失败,出现重复键错误。

这就是为什么你需要一个re-try循环。您可能认为可以使用聪明的SQL来防止重复的键错误或丢失的更新,但这是不可能的。您需要检查行数或处理重复键错误(取决于所选择的方法),然后重试。

请不要自己动手解决这个问题。就像消息排队一样,这可能是错误的。

散装上塞带锁

有时您希望执行批量upsert,其中您有一个新数据集,希望将其合并到旧的现有数据集中。这比单独的行upserts要有效得多,应该在实际情况下优先使用。

在这种情况下,您通常遵循以下流程:

创建临时表 复制或批量插入新数据到临时表中 在EXCLUSIVE模式下锁定目标表。这允许其他事务进行SELECT,但不对表进行任何更改。 做一个更新…使用临时表中的值对现有记录进行FROM; 对目标表中不存在的行执行INSERT操作; COMMIT,释放锁。

例如,对于问题中给出的例子,使用多值INSERT填充临时表:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

相关阅读

UPSERT wiki页面 UPSERTisms在Postgres 插入,重复更新PostgreSQL? http://petereisentraut.blogspot.com/2010/05/merge-syntax.html 使用事务Upsert 函数中的SELECT或INSERT是否容易出现竞态条件? PostgreSQL wiki上的SQL MERGE 这是目前在Postgresql中实现UPSERT最常用的方法

“合并”呢?

sql标准MERGE的并发语义定义很差,不适合在不先锁定表的情况下进行上传。

对于数据合并,这是一个非常有用的OLAP语句,但对于并发安全的upsert,它实际上不是一个有用的解决方案。对于使用其他dbms使用MERGE进行upserts,有很多建议,但这实际上是错误的。

其他星展:

插入……在MySQL中重复键更新 从MS SQL Server合并(但请参阅上面关于合并问题) 从Oracle合并(但请参阅上面关于合并问题)

SQLAlchemy upsert for Postgres >=9.5

由于上面的大文章涵盖了Postgres版本的许多不同的SQL方法(不仅仅是问题中的非9.5),我想补充一下如果您使用的是Postgres 9.5,如何在SQLAlchemy中做到这一点。除了实现自己的upsert,还可以使用SQLAlchemy的函数(在SQLAlchemy 1.1中添加)。就我个人而言,如果可能的话,我会推荐使用这些工具。不仅因为方便,还因为它可以让PostgreSQL处理任何可能发生的竞争条件。

我昨天给出的另一个答案(https://stackoverflow.com/a/44395983/2156909)

SQLAlchemy现在通过on_conflict_do_update()和on_conflict_do_nothing()两个方法支持ON冲突:

从文档中复制:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert