版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012447842/article/details/89175772
业务场景如下:
概述:采集工厂设备的数据。
flink连接emqtt消息队列, 流处理数据, 设备数据json内没有设备的machID, 需要从mysq根据gateMac获取对应的machID, 并且需要5分钟获取从mysql获取一次, 定时刷新,
如果flink项目只是在初始化的时候获取一次,那么平台新建一个设备, flink项目需要重启一次, 从mysql来获取全部的machID, 这样效果很差, 被否定。
思路:
将flink读取mysql写成一个单流, 每5分钟重新获取一次, 将结果写入map中。
SourceMain.java (flink处理数据的主项目)
package com.flink;
import com.flink.utils.mysql.JdbcReader;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.*;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.Future;
import com.flink.utils.emqtt.Client11;
import org.fusesource.mqtt.client.FutureConnection;
public class SourceMain {
public static Map<String,String> DeviceMap = new Hashtable<String,String>();
public static int Num = 0;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Map<String,String>> deviceStream= env.addSource(new JdbcReader());
deviceStream.map(new MapFunction<Map<String,String>, Object>() {
@Override
public Object map(Map<String, String> value) {
DeviceMap = value;
return null;
}
});
deviceStream.print();
DataStream<String> inputStream= env.addSource(new EmqttSource());
DataStream<String> Data1 = inputStream.rebalance().map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
String[] array = s.split("@@");
String topic = (String) array[1];
String message = (String) array[0];
return DataRulesEngine1(message, topic);
}
});
Data1.print();
// Data1.addSink(new OpnetsdbWriter());
// Data1.addSink(new redisWriter());
env.execute("emqtt subscribe");
}
public static String DataRulesEngine1(String message, String topic){
try {
String[] array = topic.split("/");
String Type = array[3];
if (Type.equals("Data")) {
Gson gson = new Gson();
deviceData d1 = new deviceData();
Map<String, Object> map = new HashMap<String, Object>();
map = gson.fromJson(message, map.getClass());
String dataType = (String) map.get("type");
if (dataType.equals("Data") || dataType.equals("data")) {
ArrayList dataList = (ArrayList) map.get("values");
String str = "";
String machID = DeviceMap.get(array[1]);
for (int i = 0; i < dataList.size(); i++) {
if (machID != null) {
Map<String, String> dataDict = (Map<String, String>) dataList.get(i);
d1.machID = machID;
d1.compID = array[0];
d1.gateMac = array[1];
d1.Type = dataType;
d1.operationValue = dataDict.get("name");
d1.operationData = dataDict.get("data");
d1.gatherTime = dataDict.get("time");
str += d1.machID + ',' + d1.compID + "," + d1.gateMac + "," + d1.operationValue + "," + d1.operationData + "," + d1.gatherTime + ";";
}
}
System.out.println(str);
return str;
} else {
System.out.println("无法解析该类型数据");
}
}
} catch (Throwable t) {
t.printStackTrace();
}
return null;
}
// SourceFunction<String>
public static class EmqttSource implements ParallelSourceFunction<String> {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
Client11 client = new Client11();
FutureConnection connection = client.start();
String msg;
while (isRunning) {
Future<Message> futrueMessage = connection.receive();
Message message = futrueMessage.await();
msg = String.valueOf(message.getPayloadBuffer()).substring(6);
ctx.collect(msg + "@@" + message.getTopic());
}
}
@Override
public void cancel() {
isRunning = false;
}
}
}
ConfigKeys.java (mysql配置文件)
package com.flink.utils.config;
public class ConfigKeys {
private String url = "jdbc:mysql://localhost/dac";
private String user = "root";
private String password = "123456";
public String SOURCE_DRIVER_URL(){
return this.url;
}
public String SOURCE_USER(){
return this.user;
}
public String SOURCE_SQL(){
return "select machID, gateMac from dac_machinestatus";
}
public String SOURCE_PASSWORD() {
return this.password;
}
private ConfigKeys(){
}
public static ConfigKeys getMysqlInstance(){
return new ConfigKeys();
}
}
JdbcReader.java (读取mysql)
package com.flink.utils.mysql;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Hashtable;
import java.util.Map;
import com.flink.utils.config.*;
public class JdbcReader extends RichSourceFunction<Map<String,String>> {
private static final Logger logger = LoggerFactory.getLogger(JdbcReader.class);
private Connection connection = null;
private PreparedStatement ps = null;
private volatile boolean isRunning = true;
//该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ConfigKeys mysqlConfig = ConfigKeys.getMysqlInstance();
connection = DriverManager.getConnection(mysqlConfig.SOURCE_DRIVER_URL(), mysqlConfig.SOURCE_USER(), mysqlConfig.SOURCE_PASSWORD());//获取连接
ps = connection.prepareStatement(mysqlConfig.SOURCE_SQL());
}
//执行查询并获取结果
@Override
public void run(SourceContext<Map<String,String>> ctx) throws Exception {
Map<String,String> DeviceMap = new Hashtable<String,String>();
try {
while (isRunning) {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
String gateMac = resultSet.getString("gateMac");
String machID = resultSet.getString("machID");
if (! (gateMac.isEmpty() && machID.isEmpty())) {
DeviceMap.put(gateMac, machID);
}
}
ctx.collect(DeviceMap);//发送结果
DeviceMap.clear();
Thread.sleep(5000 * 60);
}
} catch (Exception e) {
logger.error("runException:{}", e);
}
}
//关闭数据库连接
@Override
public void cancel() {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
logger.error("runException:{}", e);
}
isRunning = false;
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.du</groupId>
<artifactId>kafka-flink-hbase</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-flink-hbase</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.11.11</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.2.0</spark.version>
<c3p0.version>0.9.1.1</c3p0.version>
<mysql.version>5.1.26</mysql.version>
<fastjson.version>1.1.41</fastjson.version>
<hbase.version>1.2.0</hbase.version>
<flink.version>1.4.2</flink.version>
</properties>
<repositories>
<!-- 指定该项目可以从哪些地方下载依赖包 -->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.org/nexus/content/groups/public</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.stars</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>