还未实现,先存着……
前提:已安装 VMware Station,Linux(CentOS),Xshell,Xftp,Zookeeper,Kafka
一、安装confluent的Connector
下载地址:https://www.confluent.io/hub/ (是confluentinc/kafka-connect-cdc-mssql:1.0.0-preview)
开始安装:
(1)解压至$KAFKA_HOME/connector 文件夹下,$KAFKA_HOME本人的是/usr/local/kafka:
小插曲:Xftp上传错误(无权限),退回上级目录,chmod 777 connector,给connector文件夹设置写入权限
新建:mkdir connector
解压:tar -xzvf confluent-5.1.0-2.11.tar.gz
(2)配置Connector
文件位置:$KAFKA_HOME/config/connect-distributed.properties(单机模式文件为connect-standalone.properties)
修改:vi connect-distributed.properties或vi connect-standalone.properties,按a进行插入,按esc输入:wq保存修改退出
(3)创建topic
首先启动zookeeper,kafka,否则无法创建。
cd /usr/local/kafka_2.10-0.10.2.1/
cd bin
bin/kafka-topics.sh --zookeeper hserver1:2181/kafka --create --topic connect-offsets --replication-factor 2 --partitions 12
bin/kafka-topics.sh --zookeeper hserver1:2181/kafka --create --topic connect-configs --replication-factor 2 --partitions 1
bin/kafka-topics.sh --zookeeper hserver1:2181/kafka --create --topic connect-status --replication-factor 2 --partitions 6
小插曲:若zk为单机模式,创建topic语句为:
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic connect-offsets --replication-factor 1 --partitions 12
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic connect-configs --replication-factor 1 --partitions 1
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic connect-status --replication-factor 1 --partitions 6connect-standalone.properties改为replication.factor 1
(4)运行Connector
./connect-distributed.sh config/connect-distributed.properties(分布式)
./connect-distributed.sh -daemon config/connect-distributed.properties
或
./connect-standalone.sh config/connect-standalone.properties(单机)
./connect-standalone.sh -daemon config/connect-standalone.properties
(5)SQL Server准备,开启Change Tracking
(6)
curl -s -X POST -H "Content-Type: application/json" --data '{ "name": "connector-mssql-online", "config": { "connector.class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector","tasks.max": 1,"server.name": "127.0.0.1","server.port" : "1433","username": "sa","password": "13338684803","initial.database": "flink","database.server.name": "localhost","change.tracking.tables": "dbo.test_table"}}' http://127.0.0.1:6097/connector
curl -s 127.0.0.1:6097/connector