Debezium:数据实时采集从Postgresql到Kafka

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012551524/article/details/82798066

目的:构建基于hbase的实时数仓

解决的问题:RDBMS到Hbase的数据实时采集

方法:Postgresql    ----->     Debezium     ----->     Kafka     ------>     Sparkstreaming    ------>     Phoenix

本文:本文主要是从Postgresql到Kafka,不包括后续消费动作

官网参考地址:https://debezium.io/docs/connectors/postgresql/


一、组件安装配置 (本次安装采用的服务器系统为:ubuntu14.04,本次安装不包括kafka安装,如果服务器已安装kafka直接使用即可,如果没有安装需要安装kafka)

1、Postgresql安装,配置 (Postgresql这里安装版本为9.5)

sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt/ `lsb_release -cs`-pgdg main" >> /etc/apt/sources.list.d/pgdg.list'

wget -q https://www.postgresql.org/media/keys/ACCC4CF8.asc -O - | sudo apt-key add -

sudo apt-get update

sudo apt-get install postgresql-9.5

sudo apt-get install postgresql-server-dev-9.5

sudo su - postgres

配置用户密码

psql

postgres-# \conninfo

postgres=# \password postgres

配置Postgresql:

  • 修改配置文件pg_hba.conf,添加如下内容

    local   replication     postgres                          trust

    host    replication     postgres  127.0.0.1/32            trust

    host    replication     postgres  ::1/128                 trust

  • 修改配置文件postgresql.conf,添加如下内容

    # MODULES

    shared_preload_libraries = 'decoderbufs,wal2json'

    # REPLICATION

    wal_level = logical

    max_wal_senders = 8

    wal_keep_segments = 4

    max_replication_slots = 4

  • 重启PG   service postgresql restart

2、安装java环境

安装包准备:jdk-8u144-linux-x64.tar.gz

3、安装依赖包

add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update

apt-get install libprotobuf-c-dev

apt-get install libproj-dev liblwgeom-dev

4、安装decoderbufs 和 wal2json  (PostgreSQL逻辑解码器输出插件,用于将数据作为协议缓冲区传送,根据自己Postgresql版本,9.6之前装wal2json,之后装decoderbufs,装一个即可)

  • decoderbufs安装  git clone http://www.github.com/xstevens/decoderbufs        make && make install
  • wal2json安装 git clone https://github.com/eulerto/wal2json       make && make install

5、安装配置debezium

二、测试验证

1、现在PG里边postgres下建一张测试表,我们同步这张表的变化到kafka

create table test1 (

ID int primary key,

AGE int

);     //表这里要有主键,没有主键的要另外处理,这里测试带主键的表

2、启动kafka连接到PG

./connect-standalone.sh /opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-standalone.properties   /opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/postgres.properties

启动后控制台正常输出如下:

3、这时候我们可以看到kafka已经为监控表建立了一个topic

./kafka-topics.sh --list --zookeeper localhost:2181

topic形式:database.server.name:schema:tablename

4、启动一个kafka consumer,验证一下PG端修改表test1的内容,是否kafka端能接收到

  • 启动consumer监听topic  "fullfillment.public.test1"                                                                                                            bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic fullfillment.public.test1 --from-beginning
  • 我们在PG 端插入一条数据                                                
  • kafka-consumer端输出如下

    图中红框部分可以看到我们插入了一条数据:

    "payload": {

    "before": null,

    "after": {

    "id": 1,

    "age": 20

    },

    "source": {

    "version": "0.8.3.Final",

    "name": "fullfillment",

    "db": "postgres",

    "ts_usec": 1537507018856209000,

    "txId": 689,

    "lsn": 24790219,

    "schema": "public",

    "table": "test1",

    "snapshot": false,

    "last_snapshot_record": null

    },

    "op": "c",

    "ts_ms": 1537507018869

    }

  • 结下来更新PG这条数据                                                            

  • kafka-consumer端输出如下信息

    "payload": {

    "before": {

    "id": 1,

    "age": null

    },

    "after": {

    "id": 1,

    "age": 30

    },

    "source": {

    "version": "0.8.3.Final",

    "name": "fullfillment",

    "db": "postgres",

    "ts_usec": 1537507328357302000,

    "txId": 690,

    "lsn": 24790883,

    "schema": "public",

    "table": "test1",

    "snapshot": false,

    "last_snapshot_record": null

    },

    "op": "u",

    "ts_ms": 1537507328373

    }

  • 测试PG端删除这条记录                                                                  

  • kafka-consumer端输出如下

    "payload": {

    "before": {

    "id": 1,

    "age": null

    },

    "after": null,

    "source": {

    "version": "0.8.3.Final",

    "name": "fullfillment",

    "db": "postgres",

    "ts_usec": 1537510949561039000,

    "txId": 691,

    "lsn": 24793855,

    "schema": "public",

    "table": "test1",

    "snapshot": false,

    "last_snapshot_record": null

    },

    "op": "d",

    "ts_ms": 1537510949587

    }

测试通过!

猜你喜欢

转载自blog.csdn.net/u012551524/article/details/82798066