ClickHouse实战–使用分布式表
分布式表非实体表,本质上来说是一个视图。由于CK的节点是对等的(没有主从的概念),当一个分布式表接收到请求时,请求会落到一个其中一个节点上,接收请求的节点,会把SQL进行拆分,并把请求发送到其他各个节点上,然后再对数据进行聚合,并把聚合好的结果返回给客户端。
查看集群信息
由于在创建分布式表时会用到集群名,所以,需要查看一个集群的信息。可以通过一个sql查看集群信息。
select * from system.clusters;
查看系统设置
select * from system.settings;
分布式表的创建
创建分布式表时,一般是先存在本地表。分布式表是基于本地表创建来进行创建。创建的基本语法如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name
[ON CLUSTER cluster] AS [db2.]name2
ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]
要注意的是:默认情况下分布式表的数据插入是异步的,若要实现同步的分布式表数据插入,需要设置参数。
分布式引擎参数解释
cluster
- 服务为配置中的集群名database
- 远程数据库名table
- 远程数据表名sharding_key
- (可选) 分片keypolicy_name
- (可选) 规则名,它会被用作存储临时文件以便异步发送数据
在创建分布式表时,还有几个重要的setting参数需要注意:
- insert_distributed_sync:默认情况下,向分布式表中插入数据时,ClickHouse 服务器以异步方式向集群节点发送数据。当insert_distributed_sync=1时,数据是同步处理的,只有在所有shard上都保存了所有数据后INSERT操作才成功(如果internal_replication为true,每个shard至少有一个replica)。
- fsync_after_insert:在异步插入到分布式后对文件数据进行 fsync。保证操作系统将整个插入的数据刷新到启动器节点磁盘上的文件中。
分布式表实战
查看分布式集群信息
可以先通过sql语句来查看一下系统的集群配置。
:) select * from system.clusters;
─cluster────────────────────┬─host_name───┬─host_address─┬─port─┐
│ perftest_2shards_1replicas │ x.x.x.1xx │ x.x.x.xxx │ 9000 │
│ perftest_2shards_1replicas │ x.x.x.2xx │ x.x.x.xxx │ 9000 │
└────────────────────────────┴─────────────┴──────────────┴──────┘
通过以上输出可知,我们的集群名是:perftest_2shards_1replicas。
创建集群的本地表
CREATE TABLE if not exists test_db.city_local on cluster perftest_2shards_1replicas
(
`id` Int64,
`city_code` Int32,
`city_name` String,
`total_cnt` Int64,
`event_time` DateTime
)
Engine=MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY id;
创建分布式表
分布式表实际上是一个视图,因此它需要与分片(服务器节点)具有相同的表结构定义。创建视图后,将在每个分片(服务器节点)上查询数据,并将结果聚合到最初调用查询的节点上。
在其中一个节点上执行以下创建分布式表的语句:
CREATE TABLE test_db.city_all on cluster perftest_2shards_1replicas
AS test_db.city_local
ENGINE = Distributed(perftest_2shards_1replicas, test_db, city_local, rand());
注意:在创建分布式表时需要添加上on cluster xxx。另外,创建分布式表的语法,可以参考官方文档。
Distributed中参数中,第1个参数:集群名;第2个参数:数据库名;第3个参数:本地表名;第4个参数:分布策略。
向分布式表中插入数据
insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (1, 4000, 'guangzhou', 420000, '2022-02-21 00:00:00');
insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (2, 5000, 'shenzhan', 55000, '2022-02-22 00:00:00');
insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (3, 6000, 'huizhou', 65000, '2022-02-23 00:00:00');
insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (4, 7000, 'huizhou', 75000, '2022-02-24 00:00:00');
insert into test_db.city_all (id, city_code, city_name, total_cnt, event_time) values (5, 8000, 'huizhou', 75001, '2022-02-25 00:00:00');
查看一下分布式表的数据条数:
:) select count() from city_all;
SELECT count() FROM city_all
Query id: eff9e667-61d7-4302-93dc-9d7379d234db
┌─count()─┐
│ 5 │
└─────────┘
连接到其中一个节点,查看本地表的数据条数:
:) select count() from city_local;
SELECT count() FROM city_local
Query id: 24347cdb-4cfe-4d6b-8492-90d40c8e0e2c
┌─count()─┐
│ 3 │
└─────────┘
更新分布式表的数据
想要更新分布式表的数据时,可以通过更新本地表的数据来完成。不过在更新本地表的数据时,必须添加on cluster xxx。否则只会在其中一个节点中进行操作。另外,不能直接更新分布式表。
更新本地表的语句如下:
ALTER TABLE city_local ON CLUSTER perftest_2shards_1replicas UPDATE total_cnt = 5555 WHERE city_name = 'shenzhan';
通过以上sql语句,可以正确的完成数据的更新。更新完成后,可以通过以下sql语句来查看更新后的数据。
select * from city_all order by id;
我们尝试来通过分布式表来更新数据,会报错。
ALTER TABLE city_all ON CLUSTER perftest_2shards_1replicas UPDATE total_cnt = 4444 WHERE city_name = 'shenzhan';
会报以下错误:
┌─host────────┬─port─┬─status─┬─error───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─num_hosts_remaining─┬─num_hosts_active─┐
│ xxx.xxx.1xx.xx│ 9000 │ 60 │ Code: 60, e.displayText() = DB::Exception: Table test_db.city_all doesn't exist (version 21.8.10.19 (official build)) │ 1 │ 0 │
│ xxx.xxx.2xx.xx │ 9000 │ 48 │ Code: 48, e.displayText() = DB::Exception: Table engine Distributed doesn't support mutations (version 21.8.10.19 (official build)) │ 0 │ 0 │
└─────────────┴──────┴────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────┴──────────────────┘
↘ Progress: 0.00 rows, 0.00 B (0.00 rows/s., 0.00 B/s.) 0%
所以,可知,不能通过分布式表来更新数据。
另外,要注意,CK在更新数据时,哪怕更新一条数据,也会重写该条数据所在分区的所有数据,所以,CK的更新操作是一个很重的操作,不能频繁进行。特别是当数据量非常大的时候,进行数据的更新更是要谨慎。
修改分布式表的结构
一般来说,分布式表的表结构必须要和本地保持一致的。在修改表结构时,最好不要直接通过alter语句进行表结构的修改,而是新建一张表,然后把老表的数据导入到新表中。
比如我们有一张表t1_local,分布式表d1_all,需要修改表结构。一般情况下,修改分布式表的表结构的步骤是:
(1)按新的表结构创建一个新的集群本地表(必须添加on cluster xxx),比如为:t2_local
(2)按新的本地表创建一个分布式表(添加on cluster xxx),比如为:d2_all
(3)把老的数据表中的数据导入新的分布式表中:insert into d2_all select f1,f2,f3,fn from d1_all。注意:这一步也可以导入到本地表t2_local中,导入分布式表是为了让数据按新的分片策略进行分片。
(4)验证一下数据是否导入完成。
(5)修改老的本地表名为t1_local_bk,修改新的本地表名为:t1_local。必须添加on cluster xxx。此时,以前的分布式表d1_all没有修改,但我们修改了本地表的名称,所以在对d1_all分布式表进行操作时,会对最新的本地表上进行。
另外,要注意,为了避免数据丢失,在进行表结构更新操作时,最好停止数据写入的操作。
小结
本文介绍了分布式表的基本操作。包括分布式表的创建,如何向分布式表插入数据,如何更新分布式表的表结构等操作。
参考资料
- 分布式引擎的说明
- https://medium.com/@merticariug/distributed-clickhouse-configuration-d412c211687c