一个经常被问到的问题是如何做一个upsert,也就是MySQL所说的INSERT…ON DUPLICATE UPDATE和标准支持作为MERGE操作的一部分。

考虑到PostgreSQL不直接支持它(在pg 9.5之前),你如何做到这一点?考虑以下几点:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

现在想象一下,你想要“upsert”元组(2,'Joe'), (3, 'Alan'),那么新的表内容将是:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

这就是人们在讨论upsert时谈论的内容。至关重要的是,任何方法在同一个表上存在多个事务时都必须是安全的——可以使用显式锁定,也可以防止产生竞争条件。

这个话题在PostgreSQL的Insert, on duplicate update ?,但这是关于MySQL语法的替代方案,随着时间的推移,它增加了相当多不相关的细节。我正在研究明确的答案。

这些技术对于“如果不存在就插入,否则什么都不做”也很有用。“插入…重复键忽略”。


当前回答

SQLAlchemy upsert for Postgres >=9.5

由于上面的大文章涵盖了Postgres版本的许多不同的SQL方法(不仅仅是问题中的非9.5),我想补充一下如果您使用的是Postgres 9.5,如何在SQLAlchemy中做到这一点。除了实现自己的upsert,还可以使用SQLAlchemy的函数(在SQLAlchemy 1.1中添加)。就我个人而言,如果可能的话,我会推荐使用这些工具。不仅因为方便,还因为它可以让PostgreSQL处理任何可能发生的竞争条件。

我昨天给出的另一个答案(https://stackoverflow.com/a/44395983/2156909)

SQLAlchemy现在通过on_conflict_do_update()和on_conflict_do_nothing()两个方法支持ON冲突:

从文档中复制:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

其他回答

在PostgreSQL v. 15合并

从PostgreSQL v. 15开始,就可以使用MERGE命令了。它实际上是这个新版本的第一个主要改进。

它使用WHEN MATCHED / WHEN NOT MATCHED条件,以便在存在具有相同条件的现有行时选择行为。

它甚至比标准的UPSERT更好,因为新功能可以批量控制INSERT、UPDATE或DELETE行。

MERGE INTO customer_account ca
USING recent_transactions t
ON t.customer_id = ca.customer_id
WHEN MATCHED THEN
  UPDATE SET balance = balance + transaction_value
WHEN NOT MATCHED THEN
  INSERT (customer_id, balance)
  VALUES (t.customer_id, t.transaction_value)

这里有一些插入…关于冲突……(pg 9.5+):

Insert, on conflict - do nothing. insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict do nothing;` Insert, on conflict - do update, specify conflict target via column. insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict(id) do update set name = 'new_name', size = 3; Insert, on conflict - do update, specify conflict target via constraint name. insert into dummy(id, name, size) values(1, 'new_name', 3) on conflict on constraint dummy_pkey do update set name = 'new_name', size = 4;

因为这个问题已经结束了,所以我在这里发布了如何使用SQLAlchemy来解决这个问题。通过递归,它重新尝试批量插入或更新以解决竞态条件和验证错误。

首先是进口

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

现在有几个辅助函数

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

最后是upsert函数

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

下面是你如何使用它

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

与bulk_save_objects相比,它的优点是可以处理插入上的关系、错误检查等(与批量操作不同)。

WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

在Postgresql 9.3上测试

我正在尝试为PostgreSQL 9.5之前版本的单次插入问题提供另一种解决方案。这个想法很简单,首先尝试执行插入,如果记录已经存在,则更新它:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

请注意,只有在不删除表中的行时才可以应用此解决方案。

我不知道这个解决方案的效率如何,但在我看来它是合理的。