一、pipelineDB默认的用户不是postgres而是pipeline。
pipeline=# \c You are now connected to database "pipeline" as user "steven".
进入数据库 命令:pipeline pipeline
[steven@steven1 ~]$ pipeline pipeline pipeline (9.5.3) Type "help" for help. pipeline=#
创建一个流 stream,一个stream就是一个FDW,其实不存储任何数据。
pipeline=# create stream stream_test(x integer, y integer, z text); CREATE FOREIGN TABLE
查看流结构
pipeline=# \d stream_test; Foreign table "public.stream_test" Column | Type | Modifiers | FDW Options -------------------+--------------------------+-----------+------------- x | integer | | y | integer | | z | text | | arrival_timestamp | timestamp with time zone | | Server: pipelinedb
创建一个CONTINUOUS 连续视图
pipeline=# create continuous view v_sum as select sum (x + y) from stream_test; CREATE VIEW pipeline=# create continuous view v_group as select count(*) as coun,x,y,z from stream_test group by x,y,z; CREATE VIEW
pipeline=# create continuous view v_single as select x,z from stream_test; CREATE VIEW
stream 只能被continuous查询,如果直接查询会报错,被告知只能被continous view读取。
查看continues views结构
pipeline=# \d v_group View "public.v_group" Column | Type | Modifiers --------+---------+----------- coun | bigint | x | integer | y | integer | z | text |
pipeline=# \d v_single View "public.v_single" Column | Type | Modifiers --------+---------+----------- x | integer | z | text |
创建好continuous,会附带创建一些别的东西。
pipeline=# \d List of relations Schema | Name | Type | Owner --------+------------------+---------------+-------- public | v | view | steven public | v_group | view | steven public | v_group_mrel | table | steven public | v_group_osrel | foreign table | steven public | v_group_seq | sequence | steven public | v_mrel | table | steven public | v_osrel | foreign table | steven public | v_seq | sequence | steven public | v_single | view | steven public | v_single_mrel | table | steven public | v_single_osrel | foreign table | steven public | v_single_seq | sequence | steven public | v_sum | view | steven public | v_sum_mrel | table | steven public | v_sum_osrel | foreign table | steven public | v_sum_seq | sequence | steven (34 rows)
v_group 这个跟数据库中普通的View很类似,不存储任何东西,可以把他理解成一个materialized view,并且是非常高吞吐量,realtime的物化视图。
*_mrel,这个就是存储具体数据的,跟pg中的物理表是一样一样的。上面的cv就是这个物理表的一个壳子,不过这个物理表存储的内容可能是HLL格式。
*_seq,这个是给物理表创建的一个PK,看看cv_mrel发现默认会有个$pk字段。
*cv_osrel 这个是internal relation representing an output stream
插入数据到stream
pipeline=# insert into stream_test (x,y,z) values(1,2,'a'),(3,4,'b'),(5,6,'c'),(7,8,'d'),(1,2,'a'); INSERT 0 5
查询
pipeline=# select * from v_sum; sum ----- 39 (1 row) pipeline=# select * from v_group; coun | x | y | z ------+---+---+--- 1 | 7 | 8 | d 1 | 5 | 6 | c 2 | 1 | 2 | a 1 | 3 | 4 | b (4 rows)
pipeline=# select * from v_group_mrel; coun | x | y | z | $pk ------+---+---+---+----- 1 | 7 | 8 | d | 1 1 | 5 | 6 | c | 2 2 | 1 | 2 | a | 3 1 | 3 | 4 | b | 4 (4 rows)
cv跟cv_mrel只是多了个$pk,这是在普通情况下,数据是这样的,如果做agg可能数据存储为HLL格式.
滑动窗口
我们来看看滑动窗口,在流计算中,窗口是个很重要的东西,例如最近5分钟,最近1小时,最近1天的汇总。
1、创建一个流,列名time,数据类型timestamp;
pipeline=# create stream sliding (time timestamp);
2、创建一个滑动窗口(流动视图)
pipeline=# create continuous view cv_sliding with(sw='1 minute') as select time from sliding; CREATE VIEW
3、插入一条当前时间数据
pipeline=# insert into sliding(time) values(now()); INSERT 0 1
4、查询
pipeline=# select * from cv_sliding; time ---------------------------- 2018-05-18 08:46:58.771057 (1 row)
5、过一会再插入两条时间数据,再次查询
pipeline=# insert into sliding(time) values(now()); INSERT 0 1 pipeline=# insert into sliding(time) values(now()); INSERT 0 1
pipeline=# select * from cv_sliding; time ---------------------------- 2018-05-18 08:46:58.771057 2018-05-18 08:47:22.253052 2018-05-18 08:47:29.265144 (3 rows)
可以看到三条数据
6、过一会查询,少了一条,再过一会全部消失
pipeline=# select * from cv_sliding; time ---------------------------- 2018-05-18 08:47:22.253052 2018-05-18 08:47:29.265144 (2 rows)
pipeline=# select * from cv_sliding; time ------ (0 rows)
ttl功能
pipeline=# create continuous view v_ttl with (ttl = '10 minute',ttl_column= 'minute') as select minute(arrival_timestamp), count(*) from sliding group by minute; CREATE VIEW
pipeline=# insert into sliding values(now()); INSERT 0 1 pipeline=# insert into sliding values(now()); INSERT 0 1 pipeline=# insert into sliding values(now()); INSERT 0 1 pipeline=# insert into sliding values(now()); INSERT 0 1 pipeline=# select * from v_ttl; minute | count ------------------------+------- 2018-05-18 09:04:00+00 | 4
pipeline=# insert into sliding values(now()); INSERT 0 1 pipeline=# select * from v_ttl; minute | count ------------------------+------- 2018-05-18 09:04:00+00 | 4 2018-05-18 09:06:00+00 | 1 (2 rows)
transform
1、创建流和相对应的流动视图
pipeline=# create stream str1(x bigint,y text,z timestamp); CREATE FOREIGN TABLE pipeline=# create stream str2(x bigint,y text,z timestamp); CREATE FOREIGN TABLE pipeline=# create continuous view cv_1 as select x,y,z from str1; CREATE VIEW pipeline=# create continuous view cv_2 as select x,y,z from str2; CREATE VIEW pipeline=#
2、创建transform
pipeline=# create continuous transform tran_1 as select x,y,z from str1 then execute procedure pipeline_stream_insert('str2'); CREATE VIEW pipeline=# insert into str1(x,y,z) values(1,'hi,i from str1',now()); INSERT 0 1 pipeline=# select * from cv_1; x | y | z ---+----------------+--------------------------- 1 | hi,i from str1 | 2018-05-18 09:21:01.11329 (1 row) pipeline=# select * from cv_2; x | y | z ---+----------------+--------------------------- 1 | hi,i from str1 | 2018-05-18 09:21:01.11329 (1 row)
在创建Transform用到的pipeline_stream_insert是PipelineDB自己提供的一个函数,这个我们可以自己定义一个函数。
pipeline=# create table t(x bigint,y text,z timestamp); CREATE TABLE pipeline=# CREATE OR REPLACE FUNCTION insert_into_t() pipeline-# RETURNS trigger AS pipeline-# $$ pipeline$# BEGIN pipeline$# INSERT INTO t (x, y,z) VALUES (NEW.x, NEW.y,NEW.z); pipeline$# RETURN NEW; pipeline$# END; pipeline$# $$ pipeline-# LANGUAGE plpgsql; CREATE FUNCTION pipeline=# CREATE CONTINUOUS TRANSFORM tran_t AS pipeline-# SELECT x,y,z FROM str1 pipeline-# THEN EXECUTE PROCEDURE insert_into_t(); CREATE CONTINUOUS TRANSFORM pipeline=# insert into str1(x,y,z) values(10,'I want insert table t',now()); INSERT 0 1 pipeline=# select * from t; x | y | z ----+-----------------------+--------------------------- 10 | I want insert table t | 2017-05-15 14:01:48.17516 (1 row) 自己写了一个trigger,然后把数据插入到表T中。