一、背景
由于业务统计数据,需要根据业务日志对业务情况进行简单分析处理统计,为了更好的查询统计则选择将业务日志汇总有用的数据筛选入库,由于数据非实时性,选择将日志内容导出,并使用flink批处理进行筛选、过滤、入库,后续通过sql语句查询统计,当然flink也可以进行统计,但是非重复性工作统计多变每次得改和跑程序读比较麻烦,故选择入库。
二、准备工作
- 新建一个maven项目
- 加入flink相关依赖包,选择启动类地址
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<hutool-all.version>5.3.8</hutool-all.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool-all.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_${scala.binary.version}</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<!-- 不要拷贝 META-INF 目录下的签名,否则会引起 SecurityExceptions 。 -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 启动类地址 -->
<mainClass>com.test.DoveBootStrap</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
</dependencies>
</plugin>
三、具体实现
- 利用flink自带的JDBCOutputFormat进行入库
public class DoveBootStrap {
public static void main(String[] args) throws Exception {
TimeInterval timer = DateUtil.timer();
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/sms?user=root&password=123456&serverTimezone=UTC")
.setQuery("insert into sea_dove1 (id,send_time,phone,msg,business,source) values (?,?,?,?,?,?)")
//设置为每有1000条数据就提交一次,这里可以不要也行
.setBatchInterval(1000)
.finish();
//初始化批处理执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//文件地址
String filePath = "D:\\log\\seadove\\10yue.json";
//读取文件内容
DataSource<String> dataSource = env.readTextFile(filePath);
//进行了数据的筛选
FlatMapOperator<String, SeaDove> mapOperator = dataSource.flatMap(new SeaDoveFlatMapFunction());
//筛选内容赋值
MapOperator<SeaDove, Row> map = mapOperator.map(new MapFunction<SeaDove, Row>() {
@Override
public Row map(SeaDove value) throws Exception {
Row row = new Row(6);
row.setField(0, SnowFlakeFactory.getSnowFlakeFromCache().nextId());
row.setField(1, value.getSend_time());
row.setField(2, value.getPhone());
row.setField(3, value.getMsg());
row.setField(4, value.getBusiness());
row.setField(5, value.getSource());
return row;
}
});
//输出
map.output(jdbcOutput);
env.execute();
System.out.println("JDBCOutputFormat 耗时:"+timer.interval());
}
}
- 文件数据百万入库27万约3分钟