postgresql 并行查询的探究

postgrwsql 并行查询

1.并行查询的相关配置参数

max_worker_processes:决定了整个数据库集群允许启动多少个work process(注意如果有standby,standby的参数必须大于等于主库的参数值)。设置为0,表示不允许并行。设置系统支持的最大后台进程数,默认值为8,如果有备库,备库上此参数必须大于或等于主库的此参数配置值,此参数调整后需重启数据库才能生效

max_parallel_workers:设置系统支持的最大查询进程数,此参数受max_worker_processes参数影响。

max_parallel_workers_per_gather: 最多会有多少个后台进程来一起完成当前查询,推荐值为1-4。这些workers主要来自max_worker_processes(进程池的大小)。在OLTP业务中,因为每个worker都会消耗同等的work_mem等资源,可能会产生比较严重的争抢。

min_parallel_relation_size: 启用并行查询的最小数据表的大小,作为是否启用并行计算的条件之一,如果小于它,不启用并行计算。并不是所有小于它的表一定不会启用并行。

parallel_setup_cost:表示启动woker process的启动成本,因为启动worker进程需要建立共享内存等操作,属于附带的额外成本。其值越小,数据库越有可能使用并行查询。

parallel_tuple_cost:woker进程处理完后的tuple要传输给上层node,即进程间查询结果的交换成本,即后台进程间传输一个元组的代价。其值越小,数据库越有可能使用并行。

force_parallel_mode: 主要用于测试,on/true表示强制使用并行查询。

parallel_workers:设置表级并行度,可在建表时设置,也可后期设置

参数值大小关系:

max_worker_processes>max_parallel_workers>max_parallel_workers_per_gather

并行查询功能说明
现在支持的并行场景主要是以下3种:

parallel sequential scan (并行扫描)
parallel join(并行join)
parallel aggregation(并行聚合)

并行查询有基于代价策略的判断,譬如小数据量时默认还是普通执行。在PostgreSQL的配置参数中,提供了一些跟并行查询相关的参数。我们想测试并行,一般设置下面两个参数:

2.并行查询的实现

PostgreSQL的并行由多个进程的机制完成。每个进程在内部称之为1个worker,这些worker可以动态地创建、销毁。PostgreSQL在SQL语句解析和生成查询计划阶段并没有并行。在执行器(Executor)模块,由多个worker并发执行被分片过的子任务。即使在查询计划被并行执行的环节,一直存在的进程也会充当一个worker来完成并行的子任务,我们可以称之为主进程。同时,根据配置参数指定的worker数,再启动n个worker进程来执行其他子计划。

PostgreSQL内延续了共享内存的机制,在每个worker初始化时就为每个worker分配共享内存,用于worker各自获取计划数据和缓存中间结果。这些worker间没有复杂的通信机制,而是都由主进程做简单的通信,来启动和执行计划。

PostgreSQL并行查询的框架

图1  PostgreSQL并行查询的框架

Hash Join的场景为例,在执行器层面,并行查询的执行流程

set max_parallel_workers_per_gather TO 4;
EXPLAIN ANALYZE
SELECT * FROM pets left JOIN people ON pets.owner_id = people.id WHERE pets.species = ‘cat’ AND people.age<18;

Gather (cost=199677.39…389387.73 rows=890713 width=16) (actual time=1761.945…4242.497 rows=875330 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Hash Join (cost=198677.39…299316.43 rows=222678 width=16) (actual time=1928.009…3678.576 rows=175066 loops=5)
​ Hash Cond: (pets.owner_id = people.id)
​ -> Parallel Seq Scan on pets (cost=0.00…75498.15 rows=1241422 width=8) (actual time=0.113…397.605 rows=999805 loops=5)
​ Filter: (species = ‘cat’::bpchar)
​ Rows Removed by Filter: 1000195
​ -> Hash (cost=169248.60…169248.60 rows=1793743 width=8) (actual time=1921.058…1921.058 rows=1750479 loops=5)
​ Buckets: 131072 Batches: 32 Memory Usage: 3164kB
​ -> Seq Scan on people (cost=0.00…169248.60 rows=1793743 width=8) (actual time=0.154…1599.292 rows=1750479 loops=5)
​ Filter: (age < 18)
​ Rows Removed by Filter: 8249521
Planning time: 0.260 ms
Execution time: 4283.538 ms

图2  并行查询的执行流程

各worker按照以下方式协同完成执行任务:

* 首先,每个worker节点做的任务相同。因为是Hash Join,worker节点使用一个数据量小的表作为驱动表,做Hash表。每个worker节点都会维护这样一个Hash表,而大表被平均分之后跟Hash表做数据Join。

*最底层的并行是磁盘的并行scan,worker进程可以从磁盘block里获取自己要scan的block。

*Hash Join后的数据是全部数据的子集。对于count()这种聚合函数,数据子集上可以分别做计算,最后再合并,结果上可以保证正确。

*数据整合后,做一次总的聚合操作。

3.并行查询实例

首先创建一个people表,只有id(主键)和age列:

CREATE TABLE people (id int PRIMARY KEY NOT NULL, age int NOT NULL);

插入一些数据。一千万行应该足以看到并行计算的用处。表中每个人的年龄取0~100的随机数。

INSERT INTO people SELECT id, (random()*100)::integer AS age FROM generate_series(1,10000000) AS id;

现在尝试获取所有年龄为6岁的人,预计获取约百分之一的行。

explain ANALYZE

EXPLAINANALYZE SELECT ***** FROM people WHERE age **=**6;

Seq Scan on people (cost=0.00…169248.60 rows=93834 width=8) (actual time=0.021…941.836 rows=99804 loops=1)

Filter: (age = 6)

Rows Removed by Filter: 9900196

Planning time: 0.069 ms

Execution time: 945.263 ms

# 开启并行

SET max_parallel_workers_per_gather = 2;

explain ANALYZE

select * from people where age=6;

Gather (cost=1000.00…106714.98 rows=93834 width=8) (actual time=0.888…402.393 rows=99804 loops=1)

Workers Planned: 2

Workers Launched: 2

-> Parallel Seq Scan on people (cost=0.00…96331.58 rows=39098 width=8) (actual time=0.085…356.994 rows=33268 loops=3)

​ Filter: (age = 6)

​ Rows Removed by Filter: 3300065

Planning time: 0.066 ms

Execution time: 423.679 ms

使用并行查询后,同样语句查询事件缩减到945.263 ms

,还不到原来时间的一半。启用并行查询收集数据并将“收集”的数据进行聚合会带来额外的开销。每增加一个并行,开销也随之增大。有时更多的并行并不能改善查询性能。但为了验证并行的性能,你需要在数据库服务器上进行试验,因为服务器拥有更多的cpu核心。

CREATE TABLE pets (owner_id int NOT NULL, species character(3) NOT NULL);

不是所有的查询都会使用并行。例如尝试获取年龄低于50的数据(这将返回一半数据)

postgres**=#** EXPLAINANALYZE SELECT ***** FROM people WHERE age **<**50;

QUERY PLAN

--------------------------------------------------------------------------------------------------------------------

Seq Scan on people (cost**=**0.00…169247.71 **rows=4955739 width=8) (actual time=**0.079…1957.076 **rows=4949330 loops=**1)

Filter: (age < 50)

Rows Removed by Filter: 5050670

Planning time: 0.097 ms

Execution time: 2233.848 ms

(5 rows)

上面的查询返回表中的绝大多数数据,没有使用并行,为什么会这样呢? 当查询只返回表的一小部分时,并行计算进程启动、运行(匹配查询条件)及合并结果集的开销小于串行计算的开销。当返回表中大部分数据时,并行计算的开销可能会高于其所带来的好处。

如果要强制使用并行,可以强制设置并行计算的开销为0,如下所示:

postgres**=#** SET parallel_tuple_cost TO 0;

SET

postgres**=#** EXPLAINANALYZE SELECT ***** FROM people WHERE age **<**50;

QUERY PLAN

----------------------------------------------------------------------------------------------------------------------------------

Gather(cost**=**1000.00…97331.21 **rows=4955739 width=8) (actual time=**0.424…3147.678 **rows=4949330 loops=**1)

Workers Planned: 2

Workers Launched: 2

**->Parallel Seq Scan on people(cost=**0.00…96331.21 **rows=2064891 width=8) (actual time=**0.082…1325.310 **rows=1649777 loops=**3)

​ Filter: (age < 50)

Rows Removed by Filter: 1683557

Planning time: 0.104 ms

Execution time: 3454.690 ms

(8 rows)

从上面结果中可以看到,强制并行后,查询语句执行时间由2233.848 ms增加到3454.690 ms,说明并行计算的开销是真实存在的

cpu.核\并行数 0 2 4 return数据 10% 20% 30% 40% 50% 1000万
1/4 933.731 ms 472.055 ms 402.746 ms 开启 488.486 ms 651.306 ms 852.281 ms 1187.996 ms 1427.499 ms 3191.143 ms
2/16 940.457 ms 496.299 ms 278.398 ms 关闭 939.293 ms 1016.761 ms 1056.501 ms 1125.273 ms 1192.515 ms 1006.990 ms

一千万的数据,查询返回数据小于100万时开启并行查询器会走并行查询计划,当查询数据超过100时,查询器不走并行查询。

设置并行数:set max_parallel_workers_per_gather=4;

强行启用并行:SET parallel_tuple_cost TO 0

聚合函数的并行计算测试

测试之前,现重置一下现有环境

postgres**=#** SET parallel_tuple_cost TO DEFAULT;

postgres**=#** SET max_parallel_workers_per_gather TO 0;

下面语句在未开启并行时,计算所有人的平均年龄

postgres**=#** EXPLAINANALYZE SELECT avg(age) FROM people;

​ QUERY PLAN

---------------------------------------------------------------------------------------------------------------------------

Aggregate (cost**=**169247.72…169247.73 **rows=1 width=32) (actual time=**2751.862…2751.862 **rows=1 loops=**1)

**->Seq Scan on people (cost=**0.00…144247.77 **rows=9999977 width=4) (actual time=**0.054…1250.670 **rows=10000000 loops=**1)

Planning time: 0.054 ms

Execution time: 2751.905 ms

(4 rows)

开启并行后,再次计算平均年龄

postgres**=#** SET max_parallel_workers_per_gather TO 2;

SET

postgres**=#** EXPLAINANALYZE SELECT avg(age) FROM people;

QUERY PLAN

Finalize Aggregate (cost**=**97331.43…97331.44 **rows=1 width=32) (actual time=**1616.346…1616.346 **rows=1 loops=**1)

**->Gather (cost=**97331.21…97331.42 **rows=2 width=32) (actual time=**1616.143…1616.316 **rows=3 loops=**1)

​ Workers Planned: 2

​ Workers Launched: 2

-> Partial Aggregate (cost**=**96331.21…96331.22 **rows=1 width=32) (actual time=**1610.785…1610.785 **rows=1loops=**3)

-> Parallel Seq Scan on people (cost**=**0.00…85914.57 **rows=4166657 width=4) (actual time=**0.067…957.355 **rows=3333333 loops=**3)

Planning time: 0.248 ms

Execution time: 1619.181 ms

(8 rows)

从上面两次查询中可以看到,并行计算将查询时间由2751.905 ms降低到了1619.181ms。

-------------------------------------------------------------------------------------------------------------------------------------

创建测试环境。创建一个1000万行的pets表。

CREATE INDEX pets_owner_id ON pets (owner_id);

INSERT INTO pets SELECT (random()*10000000)::integer AS owner_id, (’{cat,dog}’::text[])[ceil(random()*2)] as species FROM generate_series(1,10000000);

postgres**=#** SET max_parallel_workers_per_gather TO 0;

postgres**=#** EXPLAIN ANALYZE SELECT ***** FROM pets JOIN people ON pets.owner_id = people.id WHERE pets.species = ‘cat’ AND people.age**=** 18;

QUERY PLAN

Hash Join (cost**=**171025.88…310311.99 **rows=407 width=28) (actual time=**1627.973…5963.378 **rows=49943 loops=**1)

Hash Cond: (pets.owner_id = people.id)

**->Seq Scan on pets (cost=**0.00…138275.00 **rows=37611 width=20) (actual time=**0.050…2784.238 **rows=4997112 loops=**1)

​ Filter: (species = ‘cat’::bpchar)

Rows Removed by Filter: 5002888

**->Hash (cost=**169247.71…169247.71 **rows=108333 width=8) (actual time=**1626.987…1626.987 **rows=100094 loops=**1)

​ Buckets: 131072 Batches: 2 Memory Usage: 2974kB

-> Seq Scan on people (cost**=**0.00…169247.71 **rows=108333 width=8) (actual time=**0.045…1596.765 **rows=100094loops=**1)

​ Filter: (age = 18)

Rows Removed by Filter: 9899906

Planning time: 0.466 ms

Execution time: 5967.223 ms

(12 rows)

以上查询花费这几乎是5967.223 ms,下面启用并行计算

postgres**=#** SET max_parallel_workers_per_gather TO 2;

postgres**=#** EXPLAINANALYZE SELECT ***** FROM pets JOIN people ON pets.owner_id = people.id WHERE pets.species = ‘cat’ AND people.age**=** 18;

QUERY PLAN

Gather(cost**=**1000.43…244061.39 **rows=53871 width=16) (actual time=**0.304…1295.285 **rows=49943 loops=**1)

Workers Planned: 2

Workers Launched: 2

**->Nested Loop (cost=**0.43…237674.29 **rows=22446 width=16) (actual time=**0.347…1274.578 **rows=16648 loops=**3)

-> Parallel Seq Scan on people (cost**=**0.00…96331.21 **rows=45139 width=8) (actual time=0.147…882.415rows=33365 loops=**3)

​ Filter: (age = 18)

Rows Removed by Filter: 3299969

-> Index Scan using pets_owner_id on pets (cost**=**0.43…3.12 **rows=1 width=8) (actual time=**0.010…0.011 **rows=0loops=**100094)

Index Cond: (owner_id = people.id)

​ Filter: (species = ‘cat’::bpchar)

Rows Removed by Filter: 1

Planning time: 0.274 ms

Execution time: 1306.590 ms

(13 rows)

_id on pets (cost**=**0.43…3.12 **rows=1 width=8) (actual time=**0.010…0.011 **rows=0loops=**100094)

Index Cond: (owner_id = people.id)

​ Filter: (species = ‘cat’::bpchar)

Rows Removed by Filter: 1

Planning time: 0.274 ms

Execution time: 1306.590 ms

(13 rows)

由以上可知,查询语句的执行时间从5967.223 ms降低到1306.590 ms。

发布了4 篇原创文章 · 获赞 0 · 访问量 198

猜你喜欢

转载自blog.csdn.net/qq_38613380/article/details/103991002