我需要通过编程的方式将数千万条记录插入Postgres数据库。目前,我在一个查询中执行了数千条插入语句。
有没有更好的方法来做到这一点,一些我不知道的批量插入语句?
我需要通过编程的方式将数千万条记录插入Postgres数据库。目前,我在一个查询中执行了数千条插入语句。
有没有更好的方法来做到这一点,一些我不知道的批量插入语句?
当前回答
我刚刚遇到了这个问题,建议将csvsql(发行版)批量导入到Postgres。要执行批量插入,只需创建b,然后使用csvsql,它连接到数据库,并为整个csv文件夹创建单独的表。
$ createdb test
$ csvsql --db postgresql:///test --insert examples/*.csv
其他回答
你可以使用COPY表TO…使用二进制,它“比文本和CSV格式略快”。只有当您有数百万行要插入,并且您对二进制数据感到满意时才这样做。
下面是一个使用psycopg2和二进制输入的Python食谱示例。
((这是一个WIKI,你可以编辑和增强答案!))
外部文件是最好的和典型的批量数据
术语“批量数据”与“大量数据”有关,因此使用原始原始数据是很自然的,不需要将其转换为SQL。用于“批量插入”的典型原始数据文件是CSV和JSON格式。
带有一些转换的批量插入
在ETL应用程序和摄取过程中,我们需要在插入数据之前更改数据。临时表会消耗(大量)磁盘空间,而且这不是更快的方法。PostgreSQL外部数据包装器(FDW)是最好的选择。
CSV的例子。假设SQL和CSV文件中的表名(x, y, z)
fieldname1,fieldname2,fieldname3
etc,etc,etc
... million lines ...
你可以使用经典的SQL COPY加载(作为原始数据)到tmp_tablename,他们插入过滤数据到tablename…但是,为了避免磁盘消耗,最好是直接摄取
INSERT INTO tablename (x, y, z)
SELECT f1(fieldname1), f2(fieldname2), f3(fieldname3) -- the transforms
FROM tmp_tablename_fdw
-- WHERE condictions
;
你需要为FDW准备数据库,而不是静态tmp_tablename_fdw,你可以使用一个函数来生成它:
CREATE EXTENSION file_fdw;
CREATE SERVER import FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE tmp_tablename_fdw(
...
) SERVER import OPTIONS ( filename '/tmp/pg_io/file.csv', format 'csv');
JSON的例子。一个包含两个文件的集合,myRawData1。和Ranger_Policies2。Json可以通过以下方式被摄取:
INSERT INTO tablename (fname, metadata, content)
SELECT fname, meta, j -- do any data transformation here
FROM jsonb_read_files('myRawData%.json')
-- WHERE any_condiction_here
;
函数jsonb_read_files()读取由掩码定义的文件夹中的所有文件:
CREATE or replace FUNCTION jsonb_read_files(
p_flike text, p_fpath text DEFAULT '/tmp/pg_io/'
) RETURNS TABLE (fid int, fname text, fmeta jsonb, j jsonb) AS $f$
WITH t AS (
SELECT (row_number() OVER ())::int id,
f AS fname,
p_fpath ||'/'|| f AS f
FROM pg_ls_dir(p_fpath) t(f)
WHERE f LIKE p_flike
) SELECT id, fname,
to_jsonb( pg_stat_file(f) ) || jsonb_build_object('fpath', p_fpath),
pg_read_file(f)::jsonb
FROM t
$f$ LANGUAGE SQL IMMUTABLE;
缺少gzip流
“文件摄取”最常见的方法(主要在大数据中)是保存原始文件的gzip格式,并使用流算法传输,任何可以在unix管道中快速运行且不消耗磁盘的方法:
gunzip remote_or_local_file.csv.gz | convert_to_sql | psql
因此ideal (future)是.csv.gz格式的服务器选项。
@CharlieClark评论后注意:目前(2022年)无事可做,最好的替代方案似乎是pgloader STDIN:
gunzip -c file.csv.gz | pgloader --type csv ... - pgsql:///target?foo
带有数组的UNNEST函数可以与多行VALUES语法一起使用。我认为这个方法比使用COPY慢,但它对我在psycopg和python (python列表传递给游标)的工作中很有用。execute变成pg ARRAY
INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
VALUES (
UNNEST(ARRAY[1, 2, 3]),
UNNEST(ARRAY[100, 200, 300]),
UNNEST(ARRAY['a', 'b', 'c'])
);
没有值使用subselect额外的存在性检查:
INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
SELECT * FROM (
SELECT UNNEST(ARRAY[1, 2, 3]),
UNNEST(ARRAY[100, 200, 300]),
UNNEST(ARRAY['a', 'b', 'c'])
) AS temptable
WHERE NOT EXISTS (
SELECT 1 FROM tablename tt
WHERE tt.fieldname1=temptable.fieldname1
);
同样的语法用于批量更新:
UPDATE tablename
SET fieldname1=temptable.data
FROM (
SELECT UNNEST(ARRAY[1,2]) AS id,
UNNEST(ARRAY['a', 'b']) AS data
) AS temptable
WHERE tablename.id=temptable.id;
这主要取决于数据库中的(其他)活动。这样的操作会有效地冻结其他会话的整个数据库。另一个需要考虑的问题是数据模型和约束、触发器等的存在。
我的第一种方法总是:创建一个(临时)表,其结构与目标表类似(创建表tmp AS select * from target where 1=0),并从将文件读入临时表开始。 然后我检查哪些是可以检查的:重复项,目标中已经存在的键,等等。
然后执行do insert到target select * from tmp或类似的操作。
如果失败了,或者花费了太长时间,我将中止它并考虑其他方法(暂时删除索引/约束等)
下面的查询可以创建带有generate_series列的测试表,该列有10000行。*我通常创建这样的测试表来测试查询性能,你可以检查generate_series():
CREATE TABLE test AS SELECT generate_series(1, 10000);
postgres=# SELECT count(*) FROM test;
count
-------
10000
(1 row)
postgres=# SELECT * FROM test;
generate_series
-----------------
1
2
3
4
5
6
-- More --
并且,如果你已经有了测试表,运行下面的查询来插入10000行:
INSERT INTO test (generate_series) SELECT generate_series(1, 10000);