我需要通过编程的方式将数千万条记录插入Postgres数据库。目前,我在一个查询中执行了数千条插入语句。

有没有更好的方法来做到这一点,一些我不知道的批量插入语句?


当前回答

下面的查询可以创建带有generate_series列的测试表,该列有10000行。*我通常创建这样的测试表来测试查询性能,你可以检查generate_series():

CREATE TABLE test AS SELECT generate_series(1, 10000);
postgres=# SELECT count(*) FROM test;
 count
-------
 10000
(1 row)
postgres=# SELECT * FROM test;
 generate_series
-----------------
               1
               2
               3
               4
               5
               6
-- More --

并且,如果你已经有了测试表,运行下面的查询来插入10000行:

INSERT INTO test (generate_series) SELECT generate_series(1, 10000);

其他回答

PostgreSQL有一个关于如何最好地初始填充数据库的指南,他们建议使用COPY命令批量加载行。该指南还提供了其他一些关于如何加快处理速度的好技巧,比如在加载数据之前删除索引和外键(然后再将它们添加回来)。

((这是一个WIKI,你可以编辑和增强答案!))

外部文件是最好的和典型的批量数据

术语“批量数据”与“大量数据”有关,因此使用原始原始数据是很自然的,不需要将其转换为SQL。用于“批量插入”的典型原始数据文件是CSV和JSON格式。

带有一些转换的批量插入

在ETL应用程序和摄取过程中,我们需要在插入数据之前更改数据。临时表会消耗(大量)磁盘空间,而且这不是更快的方法。PostgreSQL外部数据包装器(FDW)是最好的选择。

CSV的例子。假设SQL和CSV文件中的表名(x, y, z)

fieldname1,fieldname2,fieldname3
etc,etc,etc
... million lines ...

你可以使用经典的SQL COPY加载(作为原始数据)到tmp_tablename,他们插入过滤数据到tablename…但是,为了避免磁盘消耗,最好是直接摄取

INSERT INTO tablename (x, y, z)
  SELECT f1(fieldname1), f2(fieldname2), f3(fieldname3) -- the transforms 
  FROM tmp_tablename_fdw
  -- WHERE condictions
;

你需要为FDW准备数据库,而不是静态tmp_tablename_fdw,你可以使用一个函数来生成它:

CREATE EXTENSION file_fdw;
CREATE SERVER import FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE tmp_tablename_fdw(
  ...
) SERVER import OPTIONS ( filename '/tmp/pg_io/file.csv', format 'csv');

JSON的例子。一个包含两个文件的集合,myRawData1。和Ranger_Policies2。Json可以通过以下方式被摄取:

INSERT INTO tablename (fname, metadata, content)
 SELECT fname, meta, j  -- do any data transformation here
 FROM jsonb_read_files('myRawData%.json')
 -- WHERE any_condiction_here
;

函数jsonb_read_files()读取由掩码定义的文件夹中的所有文件:

CREATE or replace FUNCTION jsonb_read_files(
  p_flike text, p_fpath text DEFAULT '/tmp/pg_io/'
) RETURNS TABLE (fid int, fname text, fmeta jsonb, j jsonb) AS $f$
  WITH t AS (
     SELECT (row_number() OVER ())::int id, 
           f AS fname,
           p_fpath ||'/'|| f AS f
     FROM pg_ls_dir(p_fpath) t(f)
     WHERE f LIKE p_flike
  ) SELECT id, fname,
         to_jsonb( pg_stat_file(f) ) || jsonb_build_object('fpath', p_fpath),
         pg_read_file(f)::jsonb
    FROM t
$f$  LANGUAGE SQL IMMUTABLE;

缺少gzip流

“文件摄取”最常见的方法(主要在大数据中)是保存原始文件的gzip格式,并使用流算法传输,任何可以在unix管道中快速运行且不消耗磁盘的方法:

 gunzip remote_or_local_file.csv.gz | convert_to_sql | psql 

因此ideal (future)是.csv.gz格式的服务器选项。

@CharlieClark评论后注意:目前(2022年)无事可做,最好的替代方案似乎是pgloader STDIN:

  gunzip -c file.csv.gz | pgloader --type csv ... - pgsql:///target?foo

带有数组的UNNEST函数可以与多行VALUES语法一起使用。我认为这个方法比使用COPY慢,但它对我在psycopg和python (python列表传递给游标)的工作中很有用。execute变成pg ARRAY

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
VALUES (
    UNNEST(ARRAY[1, 2, 3]), 
    UNNEST(ARRAY[100, 200, 300]), 
    UNNEST(ARRAY['a', 'b', 'c'])
);

没有值使用subselect额外的存在性检查:

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
SELECT * FROM (
    SELECT UNNEST(ARRAY[1, 2, 3]), 
           UNNEST(ARRAY[100, 200, 300]), 
           UNNEST(ARRAY['a', 'b', 'c'])
) AS temptable
WHERE NOT EXISTS (
    SELECT 1 FROM tablename tt
    WHERE tt.fieldname1=temptable.fieldname1
);

同样的语法用于批量更新:

UPDATE tablename
SET fieldname1=temptable.data
FROM (
    SELECT UNNEST(ARRAY[1,2]) AS id,
           UNNEST(ARRAY['a', 'b']) AS data
) AS temptable
WHERE tablename.id=temptable.id;

下面的查询可以创建带有generate_series列的测试表,该列有10000行。*我通常创建这样的测试表来测试查询性能,你可以检查generate_series():

CREATE TABLE test AS SELECT generate_series(1, 10000);
postgres=# SELECT count(*) FROM test;
 count
-------
 10000
(1 row)
postgres=# SELECT * FROM test;
 generate_series
-----------------
               1
               2
               3
               4
               5
               6
-- More --

并且,如果你已经有了测试表,运行下面的查询来插入10000行:

INSERT INTO test (generate_series) SELECT generate_series(1, 10000);

加快速度的一种方法是在一个事务中显式地执行多个插入或复制(比如1000个)。Postgres的默认行为是在每条语句之后提交,因此通过批处理提交,可以避免一些开销。正如Daniel回答中的指南所说,您可能必须禁用自动提交才能工作。还要注意底部的注释,该注释建议将wal_buffers的大小增加到16mb也可能有所帮助。