Flink
flink介绍
什么是flink?
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。
Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并发化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。Flink与Storm类似,属于事件驱动型实时流系统。
Flink流处理特性:
-
支持高吞吐、低延迟、高性能的流处理
-
支持带有事件时间的窗口(Window)操作
-
支持有状态计算的Exactly-once语义
-
支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
-
支持具有Backpressure功能的持续流模型
-
支持基于轻量级分布式快照(Snapshot)实现的容错
-
一个运行时同时支持Batch on Streaming处理和Streaming处理
-
Flink在JVM内部实现了自己的内存管理
-
支持迭代计算
-
支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
flink技术栈
-
DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便的采用Flink提供的各种操作符对分布式数据集进行各种操作,支持Java,Scala和Python。
-
DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便的采用Flink提供的各种操作符对分布式数据流进行各种操作,支持Java和Scala。
-
Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过Flink提供的类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。
-
Flink ML,Flink的机器学习库,提供了机器学习Pipelines API以及很多的机器学习算法实现。
-
Gelly,Flink的图计算库,提供了图计算的相关API以及很多的图计算算法实现。
Flink集群的搭建
# 下载
$ wget https://mirrors.shu.edu.cn/apache/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.11.tgz
# 解压缩
$ tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz
# 配置flink-conf.yaml
$ vim $FLINK_HOME/conf/flink-conf.yaml
jobmanager.rpc.address: node-1
# 配置slaves
$ vim $FLINK_HOME/conf/slaves
node-2
node-3
# 启动flink集群
$ $FLINK_HOME/bin/start-cluster.sh
# 关闭flink集群
$ $FLINK_HOME/bin/stop-cluster.sh
flink java word count
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.junit.Test;
/**
* <p>
*
* @author leone
* @since 2019-02-28
**/
public class FlinkJavaWc {
/**
* 从socket中读取数据并处理
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("node-1", 8082);
DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase().split(" ");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<>(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(2), Time.seconds(1)).sum(1);
dataStream.print();
env.execute("Java WordCount from SocketTextStream Example");
}
}
flink scala word count
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* <p>
*
* @author leone
* @since 2019-02-28
**/
object FlinkScalaWordCount {
def main(args: Array[String]): Unit = {
// 必须要导入隐式转换
import org.apache.flink.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("node-1", 9999, '\n')
val result = text.flatMap(line => line.split("\\s"))
.map(w => WordWithCount(w, 1))
.keyBy("word")
.timeWindowAll(Time.seconds(2), Time.seconds(1))
.sum("count")
result.print().setParallelism(1)
env.execute("scala—nc-wc")
}
case class WordWithCount(word: String, count: Long)
}