step1:搭建flink环境
step2:搭建kafka环境(配置系统变量$KAFKA_HOME)
step3:搭建zookeeper环境
step4:启动zookeeper:进入zookeeper的bin目录下输入:zkServer.sh start
step5:启动kafka:进入kafka的bin目录下输入:kafka-server-start.sh $KAFKA_HOME/config/server.properties
step6:创建topic,生产者,消费者,查看topic状态
1.创建topic: zk
kafka-topics.sh --create --zookeeper hadoop001:2181 --replication-factor 1 --partitions 1 --topic hello_topic
2.查看所有topic
kafka-topics.sh --list --zookeeper hadoop001:2181
3.发送消息: broker
kafka-console-producer.sh --broker-list hadoop001:9092 --topic hello_topic
4.消费消息: zk
kafka-console-consumer.sh --zookeeper hadoop001:2181 --topic hello_topic --from-beginning
step7:生成wordCount demo代码
1.pom引用(kafka+flink+mysql)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.0.0</version>
<!--<scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.0.0</version>
</dependency>
2.实现代码
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.200:9092");//kafka server address
properties.setProperty("zookeeper.connect", "192.168.1.200:2181");//zookeeper address
properties.setProperty("group.id", "testFlink");//kafa topic
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("testFlink", new SimpleStringSchema(),properties);
DataStream<String> stream = env.addSource(myConsumer);
DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
counts.print();
env.execute("WordCount from Kafka data");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
step8:flink+mysql
public static void main(String[] args) throws Exception {
Properties pro = new Properties();
pro.put("bootstrap.servers", Config.getString("kafka.hosts"));
pro.put("zookeeper.connect", Config.getString("kafka.zookper"));
pro.put("group.id", Config.getString("kafka.group"));
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况
env.getConfig().setRestartStrategy(
RestartStrategies.fixedDelayRestart(4, 10));
env.enableCheckpointing(5000);
DataStream<String> sourceStream = env
.addSource(new FlinkKafkaConsumer09<String>(Config
.getString("kafka.topic"), new SimpleStringSchema(),
pro));
DataStream<Tuple3<Integer, String, String>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return StringUtils.isNotBlank(value);
}
}).map(new MapFunction<String, Tuple3<Integer, String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple3<Integer, String, String> map(String value)
throws Exception {
String[] args = value.split(":");
return new Tuple3<Integer, String, String>(Integer
.valueOf(args[0]), args[1],args[2]);
}
});
sourceStreamTra.addSink(new MysqlSink());
env.execute("data to mysql start");
}
step9:插入mysql DB的代码
private static final long serialVersionUID = 1L;
private Connection connection;
private PreparedStatement preparedStatement;
String username = Config.getString("mysql.user");
String password = Config.getString("mysql.password");;
String drivername = Config.getString("mysql.driver");
String dburl = Config.getString("mysql.url");
@Override
public void invoke(Tuple3<Integer, String, String> value) throws Exception {
Class.forName(drivername);
connection = DriverManager.getConnection(dburl, username, password);
String sql = "replace into flinkData(deptno,dname,location) values(?,?,?)";
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, value.f0);
preparedStatement.setString(2, value.f1);
preparedStatement.setString(3, value.f2);
int insert = preparedStatement.executeUpdate();
if(insert>0){
System.out.println("value = [" + value + "]");
}
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
}
原文:https://blog.csdn.net/long19900613/article/details/80725073
kafka+flink实现wordCount及数据写入mysql
猜你喜欢
转载自blog.csdn.net/u013411339/article/details/88937870
今日推荐
周排行