一个经常被问到的问题是如何做一个upsert,也就是MySQL所说的INSERT…ON DUPLICATE UPDATE和标准支持作为MERGE操作的一部分。
考虑到PostgreSQL不直接支持它(在pg 9.5之前),你如何做到这一点?考虑以下几点:
CREATE TABLE testtable (
id integer PRIMARY KEY,
somedata text NOT NULL
);
INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');
现在想象一下,你想要“upsert”元组(2,'Joe'), (3, 'Alan'),那么新的表内容将是:
(1, 'fred'),
(2, 'Joe'), -- Changed value of existing tuple
(3, 'Alan') -- Added new tuple
这就是人们在讨论upsert时谈论的内容。至关重要的是,任何方法在同一个表上存在多个事务时都必须是安全的——可以使用显式锁定,也可以防止产生竞争条件。
这个话题在PostgreSQL的Insert, on duplicate update ?,但这是关于MySQL语法的替代方案,随着时间的推移,它增加了相当多不相关的细节。我正在研究明确的答案。
这些技术对于“如果不存在就插入,否则什么都不做”也很有用。“插入…重复键忽略”。
因为这个问题已经结束了,所以我在这里发布了如何使用SQLAlchemy来解决这个问题。通过递归,它重新尝试批量插入或更新以解决竞态条件和验证错误。
首先是进口
import itertools as it
from functools import partial
from operator import itemgetter
from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts
现在有几个辅助函数
def chunk(content, chunksize=None):
"""Groups data into chunks each with (at most) `chunksize` items.
https://stackoverflow.com/a/22919323/408556
"""
if chunksize:
i = iter(content)
generator = (list(it.islice(i, chunksize)) for _ in it.count())
else:
generator = iter([content])
return it.takewhile(bool, generator)
def gen_resources(records):
"""Yields a dictionary if the record's id already exists, a row object
otherwise.
"""
ids = {item[0] for item in session.query(Posts.id)}
for record in records:
is_row = hasattr(record, 'to_dict')
if is_row and record.id in ids:
# It's a row but the id already exists, so we need to convert it
# to a dict that updates the existing record. Since it is duplicate,
# also yield True
yield record.to_dict(), True
elif is_row:
# It's a row and the id doesn't exist, so no conversion needed.
# Since it's not a duplicate, also yield False
yield record, False
elif record['id'] in ids:
# It's a dict and the id already exists, so no conversion needed.
# Since it is duplicate, also yield True
yield record, True
else:
# It's a dict and the id doesn't exist, so we need to convert it.
# Since it's not a duplicate, also yield False
yield Posts(**record), False
最后是upsert函数
def upsert(data, chunksize=None):
for records in chunk(data, chunksize):
resources = gen_resources(records)
sorted_resources = sorted(resources, key=itemgetter(1))
for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
items = [g[0] for g in group]
if dupe:
_upsert = partial(session.bulk_update_mappings, Posts)
else:
_upsert = session.add_all
try:
_upsert(items)
session.commit()
except IntegrityError:
# A record was added or deleted after we checked, so retry
#
# modify accordingly by adding additional exceptions, e.g.,
# except (IntegrityError, ValidationError, ValueError)
db.session.rollback()
upsert(items)
except Exception as e:
# Some other error occurred so reduce chunksize to isolate the
# offending row(s)
db.session.rollback()
num_items = len(items)
if num_items > 1:
upsert(items, num_items // 2)
else:
print('Error adding record {}'.format(items[0]))
下面是你如何使用它
>>> data = [
... {'id': 1, 'text': 'updated post1'},
... {'id': 5, 'text': 'updated post5'},
... {'id': 1000, 'text': 'new post1000'}]
...
>>> upsert(data)
与bulk_save_objects相比,它的优点是可以处理插入上的关系、错误检查等(与批量操作不同)。
这里有一些插入…关于冲突……(pg 9.5+):
Insert, on conflict - do nothing.
insert into dummy(id, name, size) values(1, 'new_name', 3)
on conflict do nothing;`
Insert, on conflict - do update, specify conflict target via column.
insert into dummy(id, name, size) values(1, 'new_name', 3)
on conflict(id)
do update set name = 'new_name', size = 3;
Insert, on conflict - do update, specify conflict target via constraint name.
insert into dummy(id, name, size) values(1, 'new_name', 3)
on conflict on constraint dummy_pkey
do update set name = 'new_name', size = 4;
SQLAlchemy upsert for Postgres >=9.5
由于上面的大文章涵盖了Postgres版本的许多不同的SQL方法(不仅仅是问题中的非9.5),我想补充一下如果您使用的是Postgres 9.5,如何在SQLAlchemy中做到这一点。除了实现自己的upsert,还可以使用SQLAlchemy的函数(在SQLAlchemy 1.1中添加)。就我个人而言,如果可能的话,我会推荐使用这些工具。不仅因为方便,还因为它可以让PostgreSQL处理任何可能发生的竞争条件。
我昨天给出的另一个答案(https://stackoverflow.com/a/44395983/2156909)
SQLAlchemy现在通过on_conflict_do_update()和on_conflict_do_nothing()两个方法支持ON冲突:
从文档中复制:
from sqlalchemy.dialects.postgresql import insert
stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
index_elements=[my_table.c.user_email],
index_where=my_table.c.user_email.like('%@gmail.com'),
set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert
因为这个问题已经结束了,所以我在这里发布了如何使用SQLAlchemy来解决这个问题。通过递归,它重新尝试批量插入或更新以解决竞态条件和验证错误。
首先是进口
import itertools as it
from functools import partial
from operator import itemgetter
from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts
现在有几个辅助函数
def chunk(content, chunksize=None):
"""Groups data into chunks each with (at most) `chunksize` items.
https://stackoverflow.com/a/22919323/408556
"""
if chunksize:
i = iter(content)
generator = (list(it.islice(i, chunksize)) for _ in it.count())
else:
generator = iter([content])
return it.takewhile(bool, generator)
def gen_resources(records):
"""Yields a dictionary if the record's id already exists, a row object
otherwise.
"""
ids = {item[0] for item in session.query(Posts.id)}
for record in records:
is_row = hasattr(record, 'to_dict')
if is_row and record.id in ids:
# It's a row but the id already exists, so we need to convert it
# to a dict that updates the existing record. Since it is duplicate,
# also yield True
yield record.to_dict(), True
elif is_row:
# It's a row and the id doesn't exist, so no conversion needed.
# Since it's not a duplicate, also yield False
yield record, False
elif record['id'] in ids:
# It's a dict and the id already exists, so no conversion needed.
# Since it is duplicate, also yield True
yield record, True
else:
# It's a dict and the id doesn't exist, so we need to convert it.
# Since it's not a duplicate, also yield False
yield Posts(**record), False
最后是upsert函数
def upsert(data, chunksize=None):
for records in chunk(data, chunksize):
resources = gen_resources(records)
sorted_resources = sorted(resources, key=itemgetter(1))
for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
items = [g[0] for g in group]
if dupe:
_upsert = partial(session.bulk_update_mappings, Posts)
else:
_upsert = session.add_all
try:
_upsert(items)
session.commit()
except IntegrityError:
# A record was added or deleted after we checked, so retry
#
# modify accordingly by adding additional exceptions, e.g.,
# except (IntegrityError, ValidationError, ValueError)
db.session.rollback()
upsert(items)
except Exception as e:
# Some other error occurred so reduce chunksize to isolate the
# offending row(s)
db.session.rollback()
num_items = len(items)
if num_items > 1:
upsert(items, num_items // 2)
else:
print('Error adding record {}'.format(items[0]))
下面是你如何使用它
>>> data = [
... {'id': 1, 'text': 'updated post1'},
... {'id': 5, 'text': 'updated post5'},
... {'id': 1000, 'text': 'new post1000'}]
...
>>> upsert(data)
与bulk_save_objects相比,它的优点是可以处理插入上的关系、错误检查等(与批量操作不同)。