我需要通过编程的方式将数千万条记录插入Postgres数据库。目前,我在一个查询中执行了数千条插入语句。

有没有更好的方法来做到这一点,一些我不知道的批量插入语句?


PostgreSQL有一个关于如何最好地初始填充数据库的指南,他们建议使用COPY命令批量加载行。该指南还提供了其他一些关于如何加快处理速度的好技巧,比如在加载数据之前删除索引和外键(然后再将它们添加回来)。


加快速度的一种方法是在一个事务中显式地执行多个插入或复制(比如1000个)。Postgres的默认行为是在每条语句之后提交,因此通过批处理提交,可以避免一些开销。正如Daniel回答中的指南所说,您可能必须禁用自动提交才能工作。还要注意底部的注释,该注释建议将wal_buffers的大小增加到16mb也可能有所帮助。


这主要取决于数据库中的(其他)活动。这样的操作会有效地冻结其他会话的整个数据库。另一个需要考虑的问题是数据模型和约束、触发器等的存在。

我的第一种方法总是:创建一个(临时)表,其结构与目标表类似(创建表tmp AS select * from target where 1=0),并从将文件读入临时表开始。 然后我检查哪些是可以检查的:重复项,目标中已经存在的键,等等。

然后执行do insert到target select * from tmp或类似的操作。

如果失败了,或者花费了太长时间,我将中止它并考虑其他方法(暂时删除索引/约束等)


你可以使用COPY表TO…使用二进制,它“比文本和CSV格式略快”。只有当您有数百万行要插入,并且您对二进制数据感到满意时才这样做。

下面是一个使用psycopg2和二进制输入的Python食谱示例。


我用本地libpq方法实现了非常快速的Postgresq数据加载器。 试试我的套餐https://www.nuget.org/packages/NpgsqlBulkCopy/


使用COPY还有一种替代方法,即Postgres支持的多行值语法。从文档中可以看到:

INSERT INTO films (code, title, did, date_prod, kind) VALUES
    ('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
    ('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy');

上面的代码插入了两行,但是您可以任意扩展它,直到达到预处理语句令牌的最大数量(可能是999美元,但我不能100%确定)。有时不能使用COPY,对于这些情况,这是一个有价值的替代品。


带有数组的UNNEST函数可以与多行VALUES语法一起使用。我认为这个方法比使用COPY慢,但它对我在psycopg和python (python列表传递给游标)的工作中很有用。execute变成pg ARRAY

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
VALUES (
    UNNEST(ARRAY[1, 2, 3]), 
    UNNEST(ARRAY[100, 200, 300]), 
    UNNEST(ARRAY['a', 'b', 'c'])
);

没有值使用subselect额外的存在性检查:

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
SELECT * FROM (
    SELECT UNNEST(ARRAY[1, 2, 3]), 
           UNNEST(ARRAY[100, 200, 300]), 
           UNNEST(ARRAY['a', 'b', 'c'])
) AS temptable
WHERE NOT EXISTS (
    SELECT 1 FROM tablename tt
    WHERE tt.fieldname1=temptable.fieldname1
);

同样的语法用于批量更新:

UPDATE tablename
SET fieldname1=temptable.data
FROM (
    SELECT UNNEST(ARRAY[1,2]) AS id,
           UNNEST(ARRAY['a', 'b']) AS data
) AS temptable
WHERE tablename.id=temptable.id;

我刚刚遇到了这个问题,建议将csvsql(发行版)批量导入到Postgres。要执行批量插入,只需创建b,然后使用csvsql,它连接到数据库,并为整个csv文件夹创建单独的表。

$ createdb test 
$ csvsql --db postgresql:///test --insert examples/*.csv

((这是一个WIKI,你可以编辑和增强答案!))

外部文件是最好的和典型的批量数据

术语“批量数据”与“大量数据”有关,因此使用原始原始数据是很自然的,不需要将其转换为SQL。用于“批量插入”的典型原始数据文件是CSV和JSON格式。

带有一些转换的批量插入

在ETL应用程序和摄取过程中,我们需要在插入数据之前更改数据。临时表会消耗(大量)磁盘空间,而且这不是更快的方法。PostgreSQL外部数据包装器(FDW)是最好的选择。

CSV的例子。假设SQL和CSV文件中的表名(x, y, z)

fieldname1,fieldname2,fieldname3
etc,etc,etc
... million lines ...

你可以使用经典的SQL COPY加载(作为原始数据)到tmp_tablename,他们插入过滤数据到tablename…但是,为了避免磁盘消耗,最好是直接摄取

INSERT INTO tablename (x, y, z)
  SELECT f1(fieldname1), f2(fieldname2), f3(fieldname3) -- the transforms 
  FROM tmp_tablename_fdw
  -- WHERE condictions
;

你需要为FDW准备数据库,而不是静态tmp_tablename_fdw,你可以使用一个函数来生成它:

CREATE EXTENSION file_fdw;
CREATE SERVER import FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE tmp_tablename_fdw(
  ...
) SERVER import OPTIONS ( filename '/tmp/pg_io/file.csv', format 'csv');

JSON的例子。一个包含两个文件的集合,myRawData1。和Ranger_Policies2。Json可以通过以下方式被摄取:

INSERT INTO tablename (fname, metadata, content)
 SELECT fname, meta, j  -- do any data transformation here
 FROM jsonb_read_files('myRawData%.json')
 -- WHERE any_condiction_here
;

函数jsonb_read_files()读取由掩码定义的文件夹中的所有文件:

CREATE or replace FUNCTION jsonb_read_files(
  p_flike text, p_fpath text DEFAULT '/tmp/pg_io/'
) RETURNS TABLE (fid int, fname text, fmeta jsonb, j jsonb) AS $f$
  WITH t AS (
     SELECT (row_number() OVER ())::int id, 
           f AS fname,
           p_fpath ||'/'|| f AS f
     FROM pg_ls_dir(p_fpath) t(f)
     WHERE f LIKE p_flike
  ) SELECT id, fname,
         to_jsonb( pg_stat_file(f) ) || jsonb_build_object('fpath', p_fpath),
         pg_read_file(f)::jsonb
    FROM t
$f$  LANGUAGE SQL IMMUTABLE;

缺少gzip流

“文件摄取”最常见的方法(主要在大数据中)是保存原始文件的gzip格式,并使用流算法传输,任何可以在unix管道中快速运行且不消耗磁盘的方法:

 gunzip remote_or_local_file.csv.gz | convert_to_sql | psql 

因此ideal (future)是.csv.gz格式的服务器选项。

@CharlieClark评论后注意:目前(2022年)无事可做,最好的替代方案似乎是pgloader STDIN:

  gunzip -c file.csv.gz | pgloader --type csv ... - pgsql:///target?foo

May be I'm late already. But, there is a Java library called pgbulkinsert by Bytefish. Me and my team were able to bulk insert 1 Million records in 15 seconds. Of course, there were some other operations that we performed like, reading 1M+ records from a file sitting on Minio, do couple of processing on the top of 1M+ records, filter down records if duplicates, and then finally insert 1M records into the Postgres Database. And all these processes were completed within 15 seconds. I don't remember exactly how much time it took to do the DB operation, but I think it was around less then 5 seconds. Find more details from https://www.bytefish.de/blog/pgbulkinsert_bulkprocessor.html


正如其他人所注意到的,在将数据导入Postgres时,会因为Postgres为您设计的检查而减慢速度。此外,您经常需要以某种方式操作数据,以使其适合使用。任何可以在Postgres进程之外完成的操作都意味着您可以使用COPY协议进行导入。

For my use I regularly import data from the httparchive.org project using pgloader. As the source files are created by MySQL you need to be able to handle some MySQL oddities such as the use of \N for an empty value and along with encoding problems. The files are also so large that, at least on my machine, using FDW runs out of memory. pgloader makes it easy to create a pipeline that lets you select the fields you want, cast to the relevant data types and any additional work before it goes into your main database so that index updates, etc. are minimal.


下面的查询可以创建带有generate_series列的测试表,该列有10000行。*我通常创建这样的测试表来测试查询性能,你可以检查generate_series():

CREATE TABLE test AS SELECT generate_series(1, 10000);
postgres=# SELECT count(*) FROM test;
 count
-------
 10000
(1 row)
postgres=# SELECT * FROM test;
 generate_series
-----------------
               1
               2
               3
               4
               5
               6
-- More --

并且,如果你已经有了测试表,运行下面的查询来插入10000行:

INSERT INTO test (generate_series) SELECT generate_series(1, 10000);