大家好,我是Jackson,是一名计算机学院大数据专业大二的学生。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!个人VX:LQ1518123002,欢迎与大家交流。
前言:基础不牢,地动山摇。
1.不设置setParallelism代码
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object SourceTest_fromCollectionTest {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建case class类
case class IOT(id: Int, timestamp: Long, temperature: Double)
//读取数据源
val res: DataStream[IOT] = env.fromCollection(List(
IOT(1, 1234678217, 21321.12),
IOT(3, 1234678217, 21321.12),
IOT(7, 1234678217, 21321.12),
IOT(19, 1234678217, 21321.12),
IOT(20, 1234678217, 21321.12),
IOT(30, 1234678217, 21321.12),
IOT(31, 1234678217, 21321.12),
IOT(32, 1234678217, 21321.12),
IOT(33, 1234678217, 21321.12),
IOT(34, 1234678217, 21321.12),
IOT(35, 1234678217, 21321.12)
))
//打印输出
res.print("Jackson_TestIOT")
//执行
env.execute()
}
}
运行结果
Jackson_TestIOT:2> IOT(19,1234678217,21321.12)
Jackson_TestIOT:1> IOT(7,1234678217,21321.12)
Jackson_TestIOT:4> IOT(30,1234678217,21321.12)
Jackson_TestIOT:6> IOT(32,1234678217,21321.12)
Jackson_TestIOT:7> IOT(1,1234678217,21321.12)
Jackson_TestIOT:8> IOT(3,1234678217,21321.12)
Jackson_TestIOT:5> IOT(31,1234678217,21321.12)
Jackson_TestIOT:3> IOT(20,1234678217,21321.12)
Jackson_TestIOT:8> IOT(34,1234678217,21321.12)
Jackson_TestIOT:7> IOT(33,1234678217,21321.12)
Jackson_TestIOT:1> IOT(35,1234678217,21321.12)
Process finished with exit code 0
结论:默认情况不设置并行度,并行度则是由机器的核数决定。
2.设置setParallelism(1)代码
//打印输出
res.print("Jackson_TestIOT").setParallelism(1)
运行结果
Jackson_TestIOT> IOT(1,1234678217,21321.12)
Jackson_TestIOT> IOT(3,1234678217,21321.12)
Jackson_TestIOT> IOT(7,1234678217,21321.12)
Jackson_TestIOT> IOT(19,1234678217,21321.12)
Jackson_TestIOT> IOT(20,1234678217,21321.12)
Jackson_TestIOT> IOT(30,1234678217,21321.12)
Jackson_TestIOT> IOT(31,1234678217,21321.12)
Jackson_TestIOT> IOT(32,1234678217,21321.12)
Jackson_TestIOT> IOT(33,1234678217,21321.12)
Jackson_TestIOT> IOT(34,1234678217,21321.12)
Jackson_TestIOT> IOT(35,1234678217,21321.12)
Process finished with exit code 0
结论:并行度设置1,则所有的都是1,后面则不打印数值。
3.设置setParallelism(n)代码
n>=2,这里测试n取10
//打印输出
res.print("Jackson_TestIOT").setParallelism(10)
运行结果:
Jackson_TestIOT:9> IOT(32,1234678217,21321.12)
Jackson_TestIOT:8> IOT(31,1234678217,21321.12)
Jackson_TestIOT:10> IOT(33,1234678217,21321.12)
Jackson_TestIOT:5> IOT(19,1234678217,21321.12)
Jackson_TestIOT:4> IOT(7,1234678217,21321.12)
Jackson_TestIOT:1> IOT(34,1234678217,21321.12)
Jackson_TestIOT:7> IOT(30,1234678217,21321.12)
Jackson_TestIOT:6> IOT(20,1234678217,21321.12)
Jackson_TestIOT:3> IOT(3,1234678217,21321.12)
Jackson_TestIOT:2> IOT(1,1234678217,21321.12)
Jackson_TestIOT:2> IOT(35,1234678217,21321.12)
Process finished with exit code 0
结论:如果设置的并行度>数据条数,后面输出结果数值则不会重复。
如果设置的并行度<数据条数,后面输出结果数值则会重复。
4.思考
在此给大家留下一个思考题
1.这个技术出现的背景、初衷和要达到什么样的目标或是要解决什么样的问题。
2. 这个技术的优势和劣势分别是什么,或者说,这个技术的 trade-off 是什么
3. 这个技术适用的场景。
4. 技术的组成部分和关键点。
5. 技术的底层原理和关键实现。
6. 已有的实现和它之间的对比。