Flink():
实时性高,吞吐量高,离线+实时
算子较为丰富
Spark Streaming():
有延迟(毫秒级别),吞吐量高,离线+实时
算子丰富,可以做机器学习,图计算(生态圈)
flink的批处理实际上流处理,只是把一个批处理当作是一个大的流,而spark的实时实际上时批处理,是极小的批次,比如说把1秒的一个批次进行一次处理,可以看作是流处理,但是会有延迟
Spark做实时计算,原来是一次性提交一个大的RDD,想要做实时计算,需要把不断的读取数据,形成多个小的RDD,每隔一段时间就会产生一个小的RDD,然后将小的RDD提交到集群
下面是在使用Scala对flink和spark进行编程时的一个对比
flink实时的wordcount:
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author: zxl
* @Date: 2019/1/12 10:16
* @Version 1.0
* 通过实时收集 某个端口的数据, 然后进行wordcount
*/
object FlinkDemo01 {
def main(args: Array[String]): Unit = {
// flink 的一个工具类,可以获取参数
val tool = ParameterTool.fromArgs(args)
// 获取hostname,如果没有就取默认值
val hostname = tool.get("hostname","192.168.136.150")
val port = tool.getInt("port",9999)
// env 相当于spark中的StreamContext上下文
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 实时处理 端口的 内容 处理
val data = env.socketTextStream(hostname,port)
// 添加一个隐式转换
import org.apache.flink.api.scala._
val result = data.flatMap(line=>line.split(","))
.map((_,1))
.keyBy(0) // 按第0 个进行 分组
.timeWindow(Time.seconds(6),Time.seconds(2))// 每2秒处理近六秒的数据
.sum(1) // 按 第一个进行 sum
// 打印 设置并行度
result.print().setParallelism(1)
// 执行, 设置job名
env.execute("FlinkDemo01")
}
}
spark的实时的wordcount
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author: zxl
* @Date: 2019/1/12 10:59
* @Version 1.0
*/
object SparkDemo02 {
def main(args: Array[String]): Unit = {
// 设置线程的为2个及以上
val conf = new SparkConf().setAppName("SparkDemo02").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(1))
//Dstream, 获取DStream
val data: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.136.150",9999)
val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
print("--------------------")
result.print()
ssc.start()
ssc.awaitTermination()
}
}
flink的批处理:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* @Author: zxl
* @Date: 2019/1/12 10:46
* @Version 1.0
*/
object FlinkDemo02 {
def main(args: Array[String]): Unit = {
//获取env
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 处理的文件
val file: DataStream[String] = env.readTextFile("d:/yue/applist.txt")
//进行隐式转换
import org.apache.flink.api.scala._
val result = file.flatMap(line => line.split("\t"))
.map((_, 1))
.keyBy(0)
.sum(1)
result.print().setParallelism(1)
env.execute("FlinkDemo02")
}
}
spark的批处理
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author: zxl
* @Date: 2019/1/12 10:54
* @Version 1.0
*/
object SparkDemo01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkDemo01").setMaster("local")
val sc = new SparkContext(conf) //上下文
val file = sc.textFile("d:/yue/applist.txt")
val result = file.flatMap(_.split("\t"))
.map((_, 1)).reduceByKey(_ + _)
val tuples: Array[(String, Int)] = result.collect()
tuples.foreach(println)
sc.stop()
}
}