我需要用一个查询插入多行(行数不是常量),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我唯一知道的办法就是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要更简单的方法。


当前回答

如果您正在使用SQLAlchemy,则不需要手工制作字符串,因为SQLAlchemy支持为单个INSERT语句生成多行VALUES子句:

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)

其他回答

从@ant32

def myInsertManyTuples(connection, table, tuple_of_tuples):
    cursor = connection.cursor()
    try:
        insert_len = len(tuple_of_tuples[0])
        insert_template = "("
        for i in range(insert_len):
            insert_template += "%s,"
        insert_template = insert_template[:-1] + ")"

        args_str = ",".join(
            cursor.mogrify(insert_template, x).decode("utf-8")
            for x in tuple_of_tuples
        )
        cursor.execute("INSERT INTO " + table + " VALUES " + args_str)
        connection.commit()

    except psycopg2.Error as e:
        print(f"psycopg2.Error in myInsertMany = {e}")
        connection.rollback()

安全漏洞

截至2022-11-16,@Clodoaldo Neto (Psycopg 2.6), @Joseph Sheedy, @J。J, @Bart Jonk, @kevo Njoki, @TKoutny和@Nihal Sharma包含SQL注入漏洞,不应使用。

目前为止最快的建议(copy_from)也不应该使用,因为它很难正确地转义数据。当尝试插入',",\n, \, \t或\n这样的字符时,这很容易看出。

psycopg2的作者也建议不要使用copy_from:

Copy_from()和copy_to()实际上只是古老且不完整的方法

最快的方法

最快的方法是游标。copy_expert,它可以直接从CSV文件插入数据。

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

copy_expert也是动态生成CSV文件时最快的方法。作为参考,请参阅下面的CSVFile类,该类注意限制内存使用。

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data

这个类可以这样使用:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))

如果所有数据都适合内存,您也可以直接生成整个CSV数据,而不需要使用CSVFile类,但是如果您不知道将来要插入多少数据,则可能不应该这样做。

f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

基准测试结果

914毫秒——多次调用cursor.execute 846毫秒——cursor.executemany 362毫秒- psycopg2.extras.execute_batch 346毫秒——execute_batch with page_size=1000 265毫秒——execute_batch带有预处理语句 161毫秒- psycopg2.extras.execute_values 127毫秒——游标。使用字符串连接的值执行 39毫秒- copy_expert一次生成整个CSV文件 32毫秒- copy_expert with CSVFile

如果您正在使用SQLAlchemy,则不需要手工制作字符串,因为SQLAlchemy支持为单个INSERT语句生成多行VALUES子句:

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)

所有这些技术在Postgres术语中都被称为“扩展插入”,截至2016年11月24日,它仍然比psychopg2的executemany()和这个线程中列出的所有其他方法快得多(在得到这个答案之前我尝试过)。

下面是一些不使用cur.mogrify的代码,很好,很简单:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

但需要注意的是,如果可以使用copy_from(),则应该使用copy_from;)

我构建了一个程序,可以向位于另一个城市的服务器插入多行代码。

我发现使用这种方法比任何执行方法都快10倍。在我的例子中,tup是一个包含大约2000行的元组。使用这种方法大约需要10秒:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

使用此方法时2分钟:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)