我需要通过编程的方式将数千万条记录插入Postgres数据库。目前,我在一个查询中执行了数千条插入语句。

有没有更好的方法来做到这一点,一些我不知道的批量插入语句?


当前回答

加快速度的一种方法是在一个事务中显式地执行多个插入或复制(比如1000个)。Postgres的默认行为是在每条语句之后提交,因此通过批处理提交,可以避免一些开销。正如Daniel回答中的指南所说,您可能必须禁用自动提交才能工作。还要注意底部的注释,该注释建议将wal_buffers的大小增加到16mb也可能有所帮助。

其他回答

使用COPY还有一种替代方法,即Postgres支持的多行值语法。从文档中可以看到:

INSERT INTO films (code, title, did, date_prod, kind) VALUES
    ('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
    ('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy');

上面的代码插入了两行,但是您可以任意扩展它,直到达到预处理语句令牌的最大数量(可能是999美元,但我不能100%确定)。有时不能使用COPY,对于这些情况,这是一个有价值的替代品。

带有数组的UNNEST函数可以与多行VALUES语法一起使用。我认为这个方法比使用COPY慢,但它对我在psycopg和python (python列表传递给游标)的工作中很有用。execute变成pg ARRAY

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
VALUES (
    UNNEST(ARRAY[1, 2, 3]), 
    UNNEST(ARRAY[100, 200, 300]), 
    UNNEST(ARRAY['a', 'b', 'c'])
);

没有值使用subselect额外的存在性检查:

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
SELECT * FROM (
    SELECT UNNEST(ARRAY[1, 2, 3]), 
           UNNEST(ARRAY[100, 200, 300]), 
           UNNEST(ARRAY['a', 'b', 'c'])
) AS temptable
WHERE NOT EXISTS (
    SELECT 1 FROM tablename tt
    WHERE tt.fieldname1=temptable.fieldname1
);

同样的语法用于批量更新:

UPDATE tablename
SET fieldname1=temptable.data
FROM (
    SELECT UNNEST(ARRAY[1,2]) AS id,
           UNNEST(ARRAY['a', 'b']) AS data
) AS temptable
WHERE tablename.id=temptable.id;

你可以使用COPY表TO…使用二进制,它“比文本和CSV格式略快”。只有当您有数百万行要插入,并且您对二进制数据感到满意时才这样做。

下面是一个使用psycopg2和二进制输入的Python食谱示例。

这主要取决于数据库中的(其他)活动。这样的操作会有效地冻结其他会话的整个数据库。另一个需要考虑的问题是数据模型和约束、触发器等的存在。

我的第一种方法总是:创建一个(临时)表,其结构与目标表类似(创建表tmp AS select * from target where 1=0),并从将文件读入临时表开始。 然后我检查哪些是可以检查的:重复项,目标中已经存在的键,等等。

然后执行do insert到target select * from tmp或类似的操作。

如果失败了,或者花费了太长时间,我将中止它并考虑其他方法(暂时删除索引/约束等)

((这是一个WIKI,你可以编辑和增强答案!))

外部文件是最好的和典型的批量数据

术语“批量数据”与“大量数据”有关,因此使用原始原始数据是很自然的,不需要将其转换为SQL。用于“批量插入”的典型原始数据文件是CSV和JSON格式。

带有一些转换的批量插入

在ETL应用程序和摄取过程中,我们需要在插入数据之前更改数据。临时表会消耗(大量)磁盘空间,而且这不是更快的方法。PostgreSQL外部数据包装器(FDW)是最好的选择。

CSV的例子。假设SQL和CSV文件中的表名(x, y, z)

fieldname1,fieldname2,fieldname3
etc,etc,etc
... million lines ...

你可以使用经典的SQL COPY加载(作为原始数据)到tmp_tablename,他们插入过滤数据到tablename…但是,为了避免磁盘消耗,最好是直接摄取

INSERT INTO tablename (x, y, z)
  SELECT f1(fieldname1), f2(fieldname2), f3(fieldname3) -- the transforms 
  FROM tmp_tablename_fdw
  -- WHERE condictions
;

你需要为FDW准备数据库,而不是静态tmp_tablename_fdw,你可以使用一个函数来生成它:

CREATE EXTENSION file_fdw;
CREATE SERVER import FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE tmp_tablename_fdw(
  ...
) SERVER import OPTIONS ( filename '/tmp/pg_io/file.csv', format 'csv');

JSON的例子。一个包含两个文件的集合,myRawData1。和Ranger_Policies2。Json可以通过以下方式被摄取:

INSERT INTO tablename (fname, metadata, content)
 SELECT fname, meta, j  -- do any data transformation here
 FROM jsonb_read_files('myRawData%.json')
 -- WHERE any_condiction_here
;

函数jsonb_read_files()读取由掩码定义的文件夹中的所有文件:

CREATE or replace FUNCTION jsonb_read_files(
  p_flike text, p_fpath text DEFAULT '/tmp/pg_io/'
) RETURNS TABLE (fid int, fname text, fmeta jsonb, j jsonb) AS $f$
  WITH t AS (
     SELECT (row_number() OVER ())::int id, 
           f AS fname,
           p_fpath ||'/'|| f AS f
     FROM pg_ls_dir(p_fpath) t(f)
     WHERE f LIKE p_flike
  ) SELECT id, fname,
         to_jsonb( pg_stat_file(f) ) || jsonb_build_object('fpath', p_fpath),
         pg_read_file(f)::jsonb
    FROM t
$f$  LANGUAGE SQL IMMUTABLE;

缺少gzip流

“文件摄取”最常见的方法(主要在大数据中)是保存原始文件的gzip格式,并使用流算法传输,任何可以在unix管道中快速运行且不消耗磁盘的方法:

 gunzip remote_or_local_file.csv.gz | convert_to_sql | psql 

因此ideal (future)是.csv.gz格式的服务器选项。

@CharlieClark评论后注意:目前(2022年)无事可做,最好的替代方案似乎是pgloader STDIN:

  gunzip -c file.csv.gz | pgloader --type csv ... - pgsql:///target?foo