我需要用一个查询插入多行(行数不是常量),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我唯一知道的办法就是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要更简单的方法。


当前回答

Execute_batch在这个问题发布后已经添加到psycopg2。

它比execute_values快。

其他回答

另一种有效的方法是将rows作为1参数传递给insert, 也就是数组的json对象。

例如,你传递的论点:

[ {id: 18, score: 1}, { id: 19, score: 5} ]

它是一个数组,其中可以包含任意数量的对象。 然后你的SQL看起来像这样:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

注意:你的postgress必须足够新,才能支持json

游标。copy_from是迄今为止我发现的用于批量插入的最快解决方案。下面是我做的一个要点,包含一个名为IteratorFile的类,它允许迭代器产生的字符串像文件一样读取。我们可以使用生成器表达式将每个输入记录转换为字符串。所以解是

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

对于这种微不足道的参数大小,它不会产生太大的速度差异,但当处理数千行以上时,我看到了很大的加速。它也比构建一个巨大的查询字符串更节省内存。迭代器一次只能在内存中保存一条输入记录,在某些时候,在Python进程或Postgres中构建查询字符串会耗尽内存。

安全漏洞

截至2022-11-16,@Clodoaldo Neto (Psycopg 2.6), @Joseph Sheedy, @J。J, @Bart Jonk, @kevo Njoki, @TKoutny和@Nihal Sharma包含SQL注入漏洞,不应使用。

目前为止最快的建议(copy_from)也不应该使用,因为它很难正确地转义数据。当尝试插入',",\n, \, \t或\n这样的字符时,这很容易看出。

psycopg2的作者也建议不要使用copy_from:

Copy_from()和copy_to()实际上只是古老且不完整的方法

最快的方法

最快的方法是游标。copy_expert,它可以直接从CSV文件插入数据。

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

copy_expert也是动态生成CSV文件时最快的方法。作为参考,请参阅下面的CSVFile类,该类注意限制内存使用。

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data

这个类可以这样使用:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))

如果所有数据都适合内存,您也可以直接生成整个CSV数据,而不需要使用CSVFile类,但是如果您不知道将来要插入多少数据,则可能不应该这样做。

f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

基准测试结果

914毫秒——多次调用cursor.execute 846毫秒——cursor.executemany 362毫秒- psycopg2.extras.execute_batch 346毫秒——execute_batch with page_size=1000 265毫秒——execute_batch带有预处理语句 161毫秒- psycopg2.extras.execute_values 127毫秒——游标。使用字符串连接的值执行 39毫秒- copy_expert一次生成整个CSV文件 32毫秒- copy_expert with CSVFile

光标。由@ joseph提供的复制解决方案。Sheedy (https://stackoverflow.com/users/958118/joseph-sheedy)以上(https://stackoverflow.com/a/30721460/11100064)确实是闪电般快。

然而,他给出的例子不适用于具有任意数量字段的记录,我花了一些时间才弄清楚如何正确使用它。

IteratorFile需要用制表符分隔的字段实例化,就像这样(r是一个字典列表,其中每个字典都是一条记录):

    f = IteratorFile("{0}\t{1}\t{2}\t{3}\t{4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

为了泛化任意数量的字段,我们将首先创建一个具有正确数量的制表符和字段占位符的行字符串:"{}\t{}\t{}....\t{}",然后使用.format()为记录中的r填充字段值:*list(r.values())):

        line = "\t".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

在主旨这里完成功能。

psycopg2 2.9.3

data = "(1, 2), (3, 4), (5, 6)"
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)

or

data = [(1, 2), (3, 4), (5, 6)]
data = ",".join(map(str, data))
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)