postgrwsql 并行查询
1.并行查询的相关配置参数
max_worker_processes:决定了整个数据库集群允许启动多少个work process(注意如果有standby,standby的参数必须大于等于主库的参数值)。设置为0,表示不允许并行。设置系统支持的最大后台进程数,默认值为8,如果有备库,备库上此参数必须大于或等于主库的此参数配置值,此参数调整后需重启数据库才能生效
max_parallel_workers:设置系统支持的最大查询进程数,此参数受max_worker_processes参数影响。
max_parallel_workers_per_gather: 最多会有多少个后台进程来一起完成当前查询,推荐值为1-4。这些workers主要来自max_worker_processes(进程池的大小)。在OLTP业务中,因为每个worker都会消耗同等的work_mem等资源,可能会产生比较严重的争抢。
min_parallel_relation_size: 启用并行查询的最小数据表的大小,作为是否启用并行计算的条件之一,如果小于它,不启用并行计算。并不是所有小于它的表一定不会启用并行。
parallel_setup_cost:表示启动woker process的启动成本,因为启动worker进程需要建立共享内存等操作,属于附带的额外成本。其值越小,数据库越有可能使用并行查询。
parallel_tuple_cost:woker进程处理完后的tuple要传输给上层node,即进程间查询结果的交换成本,即后台进程间传输一个元组的代价。其值越小,数据库越有可能使用并行。
force_parallel_mode: 主要用于测试,on/true表示强制使用并行查询。
parallel_workers:设置表级并行度,可在建表时设置,也可后期设置
参数值大小关系:
max_worker_processes>max_parallel_workers>max_parallel_workers_per_gather
并行查询功能说明
现在支持的并行场景主要是以下3种:
parallel sequential scan (并行扫描)
parallel join(并行join)
parallel aggregation(并行聚合)
并行查询有基于代价策略的判断,譬如小数据量时默认还是普通执行。在PostgreSQL的配置参数中,提供了一些跟并行查询相关的参数。我们想测试并行,一般设置下面两个参数:
2.并行查询的实现
PostgreSQL的并行由多个进程的机制完成。每个进程在内部称之为1个worker,这些worker可以动态地创建、销毁。PostgreSQL在SQL语句解析和生成查询计划阶段并没有并行。在执行器(Executor)模块,由多个worker并发执行被分片过的子任务。即使在查询计划被并行执行的环节,一直存在的进程也会充当一个worker来完成并行的子任务,我们可以称之为主进程。同时,根据配置参数指定的worker数,再启动n个worker进程来执行其他子计划。
PostgreSQL内延续了共享内存的机制,在每个worker初始化时就为每个worker分配共享内存,用于worker各自获取计划数据和缓存中间结果。这些worker间没有复杂的通信机制,而是都由主进程做简单的通信,来启动和执行计划。
PostgreSQL并行查询的框架
Hash Join的场景为例,在执行器层面,并行查询的执行流程
set max_parallel_workers_per_gather TO 4;
EXPLAIN ANALYZE
SELECT * FROM pets left JOIN people ON pets.owner_id = people.id WHERE pets.species = ‘cat’ AND people.age<18;
Gather (cost=199677.39…389387.73 rows=890713 width=16) (actual time=1761.945…4242.497 rows=875330 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Hash Join (cost=198677.39…299316.43 rows=222678 width=16) (actual time=1928.009…3678.576 rows=175066 loops=5)
Hash Cond: (pets.owner_id = people.id)
-> Parallel Seq Scan on pets (cost=0.00…75498.15 rows=1241422 width=8) (actual time=0.113…397.605 rows=999805 loops=5)
Filter: (species = ‘cat’::bpchar)
Rows Removed by Filter: 1000195
-> Hash (cost=169248.60…169248.60 rows=1793743 width=8) (actual time=1921.058…1921.058 rows=1750479 loops=5)
Buckets: 131072 Batches: 32 Memory Usage: 3164kB
-> Seq Scan on people (cost=0.00…169248.60 rows=1793743 width=8) (actual time=0.154…1599.292 rows=1750479 loops=5)
Filter: (age < 18)
Rows Removed by Filter: 8249521
Planning time: 0.260 ms
Execution time: 4283.538 ms
各worker按照以下方式协同完成执行任务:
*
首先,每个worker节点做的任务相同。因为是Hash Join,worker节点使用一个数据量小的表作为驱动表,做Hash表。每个worker节点都会维护这样一个Hash表,而大表被平均分之后跟Hash表做数据Join。
*
最底层的并行是磁盘的并行scan,worker进程可以从磁盘block里获取自己要scan的block。
*
Hash Join后的数据是全部数据的子集。对于count()这种聚合函数,数据子集上可以分别做计算,最后再合并,结果上可以保证正确。
*
数据整合后,做一次总的聚合操作。
3.并行查询实例
首先创建一个people表,只有id(主键)和age列:
CREATE TABLE people (id int PRIMARY KEY NOT NULL, age int NOT NULL);
插入一些数据。一千万行应该足以看到并行计算的用处。表中每个人的年龄取0~100的随机数。
INSERT INTO people SELECT id, (random()*100)::integer AS age FROM generate_series(1,10000000) AS id;
现在尝试获取所有年龄为6岁的人,预计获取约百分之一的行。
explain ANALYZE
EXPLAINANALYZE SELECT ***** FROM people WHERE age **=**6;
Seq Scan on people (cost=0.00…169248.60 rows=93834 width=8) (actual time=0.021…941.836 rows=99804 loops=1)
Filter: (age = 6)
Rows Removed by Filter: 9900196
Planning time: 0.069 ms
Execution time: 945.263 ms
# 开启并行
SET max_parallel_workers_per_gather = 2;
explain ANALYZE
select * from people where age=6;
Gather (cost=1000.00…106714.98 rows=93834 width=8) (actual time=0.888…402.393 rows=99804 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Parallel Seq Scan on people (cost=0.00…96331.58 rows=39098 width=8) (actual time=0.085…356.994 rows=33268 loops=3)
Filter: (age = 6)
Rows Removed by Filter: 3300065
Planning time: 0.066 ms
Execution time: 423.679 ms
使用并行查询后,同样语句查询事件缩减到945.263 ms
,还不到原来时间的一半。启用并行查询收集数据并将“收集”的数据进行聚合会带来额外的开销。每增加一个并行,开销也随之增大。有时更多的并行并不能改善查询性能。但为了验证并行的性能,你需要在数据库服务器上进行试验,因为服务器拥有更多的cpu核心。
CREATE TABLE pets (owner_id int NOT NULL, species character(3) NOT NULL);
不是所有的查询都会使用并行。例如尝试获取年龄低于50的数据(这将返回一半数据)
postgres**=#** EXPLAINANALYZE SELECT ***** FROM people WHERE age **<**50;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
Seq Scan on people (cost**=**0.00…169247.71 **rows=4955739 width=8) (actual time=**0.079…1957.076 **rows=4949330 loops=**1)
Filter: (age < 50)
Rows Removed by Filter: 5050670
Planning time: 0.097 ms
Execution time: 2233.848 ms
(5 rows)
上面的查询返回表中的绝大多数数据,没有使用并行,为什么会这样呢? 当查询只返回表的一小部分时,并行计算进程启动、运行(匹配查询条件)及合并结果集的开销小于串行计算的开销。当返回表中大部分数据时,并行计算的开销可能会高于其所带来的好处。
如果要强制使用并行,可以强制设置并行计算的开销为0,如下所示:
postgres**=#** SET parallel_tuple_cost TO 0;
SET
postgres**=#** EXPLAINANALYZE SELECT ***** FROM people WHERE age **<**50;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------
Gather(cost**=**1000.00…97331.21 **rows=4955739 width=8) (actual time=**0.424…3147.678 **rows=4949330 loops=**1)
Workers Planned: 2
Workers Launched: 2
**->Parallel Seq Scan on people(cost=**0.00…96331.21 **rows=2064891 width=8) (actual time=**0.082…1325.310 **rows=1649777 loops=**3)
Filter: (age < 50)
Rows Removed by Filter: 1683557
Planning time: 0.104 ms
Execution time: 3454.690 ms
(8 rows)
从上面结果中可以看到,强制并行后,查询语句执行时间由2233.848 ms增加到3454.690 ms,说明并行计算的开销是真实存在的
cpu.核\并行数 | 0 | 2 | 4 | return数据 | 10% | 20% | 30% | 40% | 50% | 1000万 |
---|---|---|---|---|---|---|---|---|---|---|
1/4 | 933.731 ms | 472.055 ms | 402.746 ms | 开启 | 488.486 ms | 651.306 ms | 852.281 ms | 1187.996 ms | 1427.499 ms | 3191.143 ms |
2/16 | 940.457 ms | 496.299 ms | 278.398 ms | 关闭 | 939.293 ms | 1016.761 ms | 1056.501 ms | 1125.273 ms | 1192.515 ms | 1006.990 ms |
一千万的数据,查询返回数据小于100万时开启并行查询器会走并行查询计划,当查询数据超过100时,查询器不走并行查询。
设置并行数:set max_parallel_workers_per_gather=4;
强行启用并行:SET parallel_tuple_cost TO 0
聚合函数的并行计算测试
测试之前,现重置一下现有环境
postgres**=#** SET parallel_tuple_cost TO DEFAULT;
postgres**=#** SET max_parallel_workers_per_gather TO 0;
下面语句在未开启并行时,计算所有人的平均年龄
postgres**=#** EXPLAINANALYZE SELECT avg(age) FROM people;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------
Aggregate (cost**=**169247.72…169247.73 **rows=1 width=32) (actual time=**2751.862…2751.862 **rows=1 loops=**1)
**->Seq Scan on people (cost=**0.00…144247.77 **rows=9999977 width=4) (actual time=**0.054…1250.670 **rows=10000000 loops=**1)
Planning time: 0.054 ms
Execution time: 2751.905 ms
(4 rows)
开启并行后,再次计算平均年龄
postgres**=#** SET max_parallel_workers_per_gather TO 2;
SET
postgres**=#** EXPLAINANALYZE SELECT avg(age) FROM people;
QUERY PLAN
Finalize Aggregate (cost**=**97331.43…97331.44 **rows=1 width=32) (actual time=**1616.346…1616.346 **rows=1 loops=**1)
**->Gather (cost=**97331.21…97331.42 **rows=2 width=32) (actual time=**1616.143…1616.316 **rows=3 loops=**1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost**=**96331.21…96331.22 **rows=1 width=32) (actual time=**1610.785…1610.785 **rows=1loops=**3)
-> Parallel Seq Scan on people (cost**=**0.00…85914.57 **rows=4166657 width=4) (actual time=**0.067…957.355 **rows=3333333 loops=**3)
Planning time: 0.248 ms
Execution time: 1619.181 ms
(8 rows)
从上面两次查询中可以看到,并行计算将查询时间由2751.905 ms降低到了1619.181ms。
-------------------------------------------------------------------------------------------------------------------------------------
创建测试环境。创建一个1000万行的pets表。
CREATE INDEX pets_owner_id ON pets (owner_id);
INSERT INTO pets SELECT (random()*10000000)::integer AS owner_id, (’{cat,dog}’::text[])[ceil(random()*2)] as species FROM generate_series(1,10000000);
postgres**=#** SET max_parallel_workers_per_gather TO 0;
postgres**=#** EXPLAIN ANALYZE SELECT ***** FROM pets JOIN people ON pets.owner_id = people.id WHERE pets.species = ‘cat’ AND people.age**=** 18;
QUERY PLAN
Hash Join (cost**=**171025.88…310311.99 **rows=407 width=28) (actual time=**1627.973…5963.378 **rows=49943 loops=**1)
Hash Cond: (pets.owner_id = people.id)
**->Seq Scan on pets (cost=**0.00…138275.00 **rows=37611 width=20) (actual time=**0.050…2784.238 **rows=4997112 loops=**1)
Filter: (species = ‘cat’::bpchar)
Rows Removed by Filter: 5002888
**->Hash (cost=**169247.71…169247.71 **rows=108333 width=8) (actual time=**1626.987…1626.987 **rows=100094 loops=**1)
Buckets: 131072 Batches: 2 Memory Usage: 2974kB
-> Seq Scan on people (cost**=**0.00…169247.71 **rows=108333 width=8) (actual time=**0.045…1596.765 **rows=100094loops=**1)
Filter: (age = 18)
Rows Removed by Filter: 9899906
Planning time: 0.466 ms
Execution time: 5967.223 ms
(12 rows)
以上查询花费这几乎是5967.223 ms,下面启用并行计算
postgres**=#** SET max_parallel_workers_per_gather TO 2;
postgres**=#** EXPLAINANALYZE SELECT ***** FROM pets JOIN people ON pets.owner_id = people.id WHERE pets.species = ‘cat’ AND people.age**=** 18;
QUERY PLAN
Gather(cost**=**1000.43…244061.39 **rows=53871 width=16) (actual time=**0.304…1295.285 **rows=49943 loops=**1)
Workers Planned: 2
Workers Launched: 2
**->Nested Loop (cost=**0.43…237674.29 **rows=22446 width=16) (actual time=**0.347…1274.578 **rows=16648 loops=**3)
-> Parallel Seq Scan on people (cost**=**0.00…96331.21 **rows=45139 width=8) (actual time=0.147…882.415rows=33365 loops=**3)
Filter: (age = 18)
Rows Removed by Filter: 3299969
-> Index Scan using pets_owner_id on pets (cost**=**0.43…3.12 **rows=1 width=8) (actual time=**0.010…0.011 **rows=0loops=**100094)
Index Cond: (owner_id = people.id)
Filter: (species = ‘cat’::bpchar)
Rows Removed by Filter: 1
Planning time: 0.274 ms
Execution time: 1306.590 ms
(13 rows)
_id on pets (cost**=**0.43…3.12 **rows=1 width=8) (actual time=**0.010…0.011 **rows=0loops=**100094)
Index Cond: (owner_id = people.id)
Filter: (species = ‘cat’::bpchar)
Rows Removed by Filter: 1
Planning time: 0.274 ms
Execution time: 1306.590 ms
(13 rows)
由以上可知,查询语句的执行时间从5967.223 ms降低到1306.590 ms。