我需要用一个查询插入多行(行数不是常量),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我唯一知道的办法就是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要更简单的方法。


当前回答

与psycopg2 2.7更新:

经典的executemany()比@ant32的实现(称为“折叠”)慢大约60倍,详见https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

这个实现在2.7版被添加到psycopg2中,称为execute_values():

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

之前的回答:

要插入多行,与execute()一起使用多行VALUES语法比使用psycopg2 executemany()快10倍左右。实际上,executemany()只是运行许多单独的INSERT语句。

@ant32的代码在Python 2中完美地工作。但在Python 3中,cursor.mogrify()返回字节,cursor.execute()接受字节或字符串,','.join()期望str实例。

所以在Python 3中,你可能需要修改@ant32的代码,添加.decode('utf-8'):

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

或者只使用bytes(带b"或b""):

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 

其他回答

与psycopg2 2.7更新:

经典的executemany()比@ant32的实现(称为“折叠”)慢大约60倍,详见https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

这个实现在2.7版被添加到psycopg2中,称为execute_values():

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

之前的回答:

要插入多行,与execute()一起使用多行VALUES语法比使用psycopg2 executemany()快10倍左右。实际上,executemany()只是运行许多单独的INSERT语句。

@ant32的代码在Python 2中完美地工作。但在Python 3中,cursor.mogrify()返回字节,cursor.execute()接受字节或字符串,','.join()期望str实例。

所以在Python 3中,你可能需要修改@ant32的代码,添加.decode('utf-8'):

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

或者只使用bytes(带b"或b""):

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 

使用aiopg -下面的代码段工作得非常好

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)

安全漏洞

截至2022-11-16,@Clodoaldo Neto (Psycopg 2.6), @Joseph Sheedy, @J。J, @Bart Jonk, @kevo Njoki, @TKoutny和@Nihal Sharma包含SQL注入漏洞,不应使用。

目前为止最快的建议(copy_from)也不应该使用,因为它很难正确地转义数据。当尝试插入',",\n, \, \t或\n这样的字符时,这很容易看出。

psycopg2的作者也建议不要使用copy_from:

Copy_from()和copy_to()实际上只是古老且不完整的方法

最快的方法

最快的方法是游标。copy_expert,它可以直接从CSV文件插入数据。

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

copy_expert也是动态生成CSV文件时最快的方法。作为参考,请参阅下面的CSVFile类,该类注意限制内存使用。

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data

这个类可以这样使用:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))

如果所有数据都适合内存,您也可以直接生成整个CSV数据,而不需要使用CSVFile类,但是如果您不知道将来要插入多少数据,则可能不应该这样做。

f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

基准测试结果

914毫秒——多次调用cursor.execute 846毫秒——cursor.executemany 362毫秒- psycopg2.extras.execute_batch 346毫秒——execute_batch with page_size=1000 265毫秒——execute_batch带有预处理语句 161毫秒- psycopg2.extras.execute_values 127毫秒——游标。使用字符串连接的值执行 39毫秒- copy_expert一次生成整个CSV文件 32毫秒- copy_expert with CSVFile

Execute_batch在这个问题发布后已经添加到psycopg2。

它比execute_values快。

psycopg2 2.9.3

data = "(1, 2), (3, 4), (5, 6)"
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)

or

data = [(1, 2), (3, 4), (5, 6)]
data = ",".join(map(str, data))
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)