目前跑通的读写MySQL的方式有三种,一种是直接使用flink自带的JDBCInputFormat和JDBCOutputFormat,一种是自定义source和sink,最后一种是通过DDL连接MySQL进行读写(但是这种只在idea调试通了,打包上传后运行报错,因此比较的时候只比较前两种)。
引入依赖
-
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
-
<dependency>
-
<groupId>mysql</groupId>
-
<artifactId>mysql-connector-java</artifactId>
-
<version>8.0.17</version>
-
</dependency>
-
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-jdbc_2.12</artifactId>
-
<version>1.10.0</version>
-
</dependency>
方式一:使用自带的JDBCInputFormat和JDBCOutputFormat
-
public class ReadWriteMysqlByJDBC {
-
public static void main(String[] args) throws Exception {
-
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
-
//需要与获取的字段一一对应,否则会取不到值
-
TypeInformation[] fieldTypes = new TypeInformation[] {
-
BasicTypeInfo.STRING_TYPE_INFO,
-
BasicTypeInfo.STRING_TYPE_INFO,
-
BasicTypeInfo.STRING_TYPE_INFO,
-
BasicTypeInfo.STRING_TYPE_INFO,
-
BasicTypeInfo.STRING_TYPE_INFO,
-
BasicTypeInfo.STRING_TYPE_INFO,
-
BasicTypeInfo.STRING_TYPE_INFO};
-
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
-
//读mysql
-
DataSet<Row> dataSource = fbEnv.createInput(JDBCInputFormat.buildJDBCInputFormat()
-
.setDrivername("com.mysql.jdbc.Driver")
-
.setDBUrl("jdbc:mysql://xxx/xxx")
-
.setUsername("xxx")
-
.setPassword("xxx")
-
.setQuery("xxx")
-
.setRowTypeInfo(rowTypeInfo)
-
.finish());
-
//写MySQL
-
dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
-
.setDrivername("com.mysql.jdbc.Driver")
-
.setDBUrl("jdbc:mysql://xxx/xxx")
-
.setUsername("xxx")
-
.setPassword("xxx")
-
.setQuery("xxx")
-
.finish());
-
fbEnv.execute();
-
}
-
}
设置并行度为2,show plan如下:
方式二:自定义source和sink
source
-
public class MysqlSource extends RichSourceFunction<SourceVo> {
-
private static final Logger logger = LoggerFactory.getLogger(MysqlSource.class);
-
private Connection connection = null;
-
private PreparedStatement ps = null;
-
@Override
-
public void open(Configuration parameters) throws Exception {
-
super.open(parameters);
-
Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
-
connection = DriverManager.getConnection("jdbc:mysql://xxx/xxx", "xxx", "xxx");//获取连接
-
ps = connection.prepareStatement("xxx");
-
}
-
@Override
-
public void run(SourceContext<SourceVo> ctx) throws Exception {
-
try {
-
ResultSet resultSet = ps.executeQuery();
-
while (resultSet.next()) {
-
SourceVo vo = new SourceVo();
-
vo.setxxx(resultSet.getString("xxx"));
-
ctx.collect(vo);
-
}
-
} catch (Exception e) {
-
logger.error("runException:{}", e);
-
}
-
}
-
@Override
-
public void cancel() {
-
try {
-
super.close();
-
if (connection != null) {
-
connection.close();
-
}
-
if (ps != null) {
-
ps.close();
-
}
-
} catch (Exception e) {
-
logger.error("runException:{}", e);
-
}
-
}
-
}
sink
-
public class MysqlSink extends RichSinkFunction<SourceVo> {
-
private Connection connection;
-
private PreparedStatement preparedStatement;
-
@Override
-
public void open(Configuration parameters) throws Exception {
-
super.open(parameters);
-
// 加载JDBC驱动
-
Class.forName("com.mysql.jdbc.Driver");
-
// 获取数据库连接
-
connection = DriverManager.getConnection("jdbc:mysql://xxx/xxx", "xxx", "xxx");//获取连接
-
preparedStatement = connection.prepareStatement("xxx");
-
super.open(parameters);
-
}
-
@Override
-
public void close() throws Exception {
-
super.close();
-
if(preparedStatement != null){
-
preparedStatement.close();
-
}
-
if(connection != null){
-
connection.close();
-
}
-
super.close();
-
}
-
@Override
-
public void invoke(SourceVo value, Context context) throws Exception {
-
try {
-
preparedStatement.setString(1,value.getxxx());
-
preparedStatement.executeUpdate();
-
}catch (Exception e){
-
e.printStackTrace();
-
}
-
}
-
}
main
-
public static void main(String[] args) throws Exception {
-
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
DataStreamSource<SourceVo> source = fsEnv.addSource(new MysqlSource());
-
source.addSink(new MysqlSink());
-
fsEnv.execute();
-
}
设置并行度为2,show plan如下:
方式三:通过DDL读写mysql
-
public class ReadWriteMysqlByDDL {
-
public static void main(String[] args) throws Exception {
-
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
-
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv,fsSettings);
-
String sourceTable ="CREATE TABLE sourceTable (\n" +
-
" FTableName VARCHAR,\n" +
-
" FECName VARCHAR\n" +
-
") WITH (\n" +
-
" 'connector.type' = 'jdbc', -- 使用 jdbc connector\n" +
-
" 'connector.url' = 'jdbc:mysql://xxx/xxx', -- jdbc url\n" +
-
" 'connector.table' = 'xxx', -- 表名\n" +
-
" 'connector.username' = 'xxx', -- 用户名\n" +
-
" 'connector.password' = 'xxx', -- 密码\n" +
-
" 'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条\n" +
-
")";
-
tableEnvironment.sqlUpdate(sourceTable);
-
String sinkTable ="CREATE TABLE sinkTable (\n" +
-
" FID VARCHAR,\n"+
-
" FRoomName VARCHAR\n" +
-
") WITH (\n" +
-
" 'connector.type' = 'jdbc', -- 使用 jdbc connector\n" +
-
" 'connector.url' = 'jdbc:mysql://xxx/xxx', -- jdbc url\n" +
-
" 'connector.table' = 'xxx', -- 表名\n" +
-
" 'connector.username' = 'xxx', -- 用户名\n" +
-
" 'connector.password' = 'xxx, -- 密码\n" +
-
" 'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为100条\n" +
-
")";
-
tableEnvironment.sqlUpdate(sinkTable);
-
String query = "SELECT FTableName as tableName,FECName as ecName FROM sourceTable";
-
Table table = tableEnvironment.sqlQuery(query);
-
table.filter("tableName === 'xxx'").select("'1',ecName").insertInto("sinkTable");
-
streamEnv.execute();
-
}
-
}
奇怪的是这种方式打包上传show plan的时候报错:
-
2020-05-22 12:09:48,198 WARN org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
-
2020-05-22 12:09:48,201 WARN org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
-
2020-05-22 12:09:48,201 WARN org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
-
2020-05-22 12:09:48,372 ERROR org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler - Unhandled exception.
-
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. findAndCreateTableSource failed.
-
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
-
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
-
at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
-
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
-
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
-
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
-
at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:100)
-
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
-
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
-
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
-
at java.lang.Thread.run(Thread.java:748)
-
Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
-
at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130)
-
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
-
at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124)
-
at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66)
-
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
-
at connector.mysql.ReadWriteMysqlByDDL.main(ReadWriteMysqlByDDL.java:44)
-
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
-
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
-
at java.lang.reflect.Method.invoke(Method.java:498)
-
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
-
... 10 more
-
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
-
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
-
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
-
at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138)
-
at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97)
-
at org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86)
-
at java.util.Optional.map(Optional.java:215)
-
at org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76)
-
at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
-
at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
-
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
-
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
-
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
-
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
-
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
-
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
-
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1008)
-
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:968)
-
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3122)
-
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3104)
-
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3376)
-
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
-
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
-
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1008)
-
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:968)
-
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
-
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943)
-
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:650)
-
at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:126)
-
... 20 more
-
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
-
the classpath.
-
Reason: Required context properties mismatch.
-
The following properties are requested:
-
connector.password=xxx
-
connector.table=xxx
-
connector.type=jdbc
-
connector.url=jdbc:mysql://xxx/xxx
-
connector.username=xxx
-
connector.write.flush.max-rows=100
-
schema.0.data-type=VARCHAR(2147483647)
-
schema.0.name=FTableName
-
schema.1.data-type=VARCHAR(2147483647)
-
schema.1.name=FECName
-
The following factories have been considered:
-
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
-
org.apache.flink.table.sources.CsvBatchTableSourceFactory
-
org.apache.flink.table.sources.CsvAppendTableSourceFactory
-
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
-
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
-
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
-
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
-
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
-
... 47 more
暂时不知道是什么原因
比较
- 如果不设置并行度,则两种方式的source和sink并行度都是1(默认)
- 如果设置多并行度,则方式一的source会采用设置的并行度读数据,会造成数据重复读,而方式二不会
- 对于第一种方式,读取的数据以ROW类型返回,且写入时也必须为ROW类型,不方便使用pojo
- 方式一返回的是DataSet,方式二返回的是DataStreamSource
- 方式一需要为每个字段指定类型BasicTypeInfo,如果字段比较多,则不太方便