重要!
Flink CDC 这个名字带着Flink,但是注意!!!我们本地搭建简易demo的时候不需要下载flink环境就可以本地跑起来。
FlinkCDC本质就是一个jar包,引入后写个main方法即可展现它的简单功能。(小白的我把FlinkCDC当成了类似于ik分词的插件,居然先去下载了Flink...)
准备工作
- 您需要会使用maven
- ≤FlinkCDC 1.4.0版本可以用MySQL5.6+,FlinkCDC必须5.7+
Flink CDC虽然官方说都需要是MySQL5.7+,但是Flink CDC1.4版本在MySQL 5.6上面跑目前没啥问题,简单搭建一个demo足够用,生产环境如果搭建用5.6请仔细测试!
开启binlog
您需要修改MySQL配置文件 my.cnf ,请在其中添加或新增以下内容:
[mysqld]
log-bin=mysql-bin
server-id=1
binlog_format=ROW
MAC的用户需要手动在etc目录下新建这个文件,Windows用户可以参考其他博客开启binlog(这一步很简单,所以内容较多)
检查binlog
您需要重启MySQL
之后运行以下语句,查看是否开启成功:
show variables like '%log_bin%';
运行后应该显示 log_bin 是 ON
扫描二维码关注公众号,回复:
16720691 查看本文章
show variables like 'binlog_format';
运行后应该显示 binlog_format 是 ROW
如果运行结果正确,您可以继续下一步操作。如果不正确请先修正。
新建项目并测试
打开IDE,新建空白Maven项目。
这里我的Flink CDC版本1.4.0 和 Flink CDC 2.0 都可以正常使用,大家自己自行选择(老版本朋友把2.0.x那个坐标注释掉,换成下面注释上的)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flinkcdc-test</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>*.dsp.redispositive.Application</mainClass>
<useUniqueVersions>false</useUniqueVersions>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<flink-version>1.13.0</flink-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.2</version>
</dependency>
<!--老版本的用户打开这个注释,把上面2.0注释掉
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
</project>
我们新建一个main方法进行测试:
注:Flink CDC2.0包名改了,import里面都不带 "alibaba" ,可以粘贴代码让他自动导包。
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("cdc_test") //订阅的库
.username("root")
.password("root")
.deserializer(new StringDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction)
.print()
.setParallelism(1);
env.execute();
}
}
进行测试
我们运行main方法,启动比较慢,请耐心等待!
出现下面的东西不影响demo运行,可不理会:
随便修改下数据库监听的库,看到出现下面的binlog日志信息,就是成功了!