Flink 流处理为什么需要⽹络流控?
Flink V1.5
版之前⽹络流控介绍
Flink V1.5
版之前的反压策略存在的问题
Credit
的反压策略实现原理
,
Credit
是如何解决
Flink 1.5
之前的问题?
Flink
如何在吞吐量和延迟做权衡?
Flink
反压相关
Metrics
介绍 基于 Flink
的流控机制和反压如何定位
Flink
任务的瓶颈。或者说,如果⼀个平时正常的
Flink 任务突然出现延迟了,怎么来定位问题?到底是 Kafka
读取数据慢,还是中间某个计算环节 ⽐较消耗资源使得变慢,还是由于最后的写⼊外部存储时⽐较慢?
Flink
流处理为什么需要⽹络流控?
分析⼀个简单的
Flink
流任务,下图是⼀个简单的
Flink
流任务执⾏图:任务⾸先从
Kafka
中读取数据、 map 算⼦对数据进⾏转换、
keyBy
按照指定
key
对数据进⾏分区(相同
key
的数据经过
keyBy
后分到 同⼀个 subtask
实例中),
keyBy
后对数据接着进⾏
map
转换,然后使⽤
Sink
将数据输出到外部存 储。
众所周知,在⼤数据处理中,⽆论是批处理还是流处理,单点处理的性能总是有限的,我们的单个
Job ⼀般会运⾏在多个节点上,多个节点共同配合来提升整个系统的处理性能。图中,任务被切分成 4
个可 独⽴执⾏的 subtask
(
A0
、
A1
、
B0
、
B1
),在数据处理过程中,就会存在
shufflfflffle
(数据传输)的过 程。例如,subtask A0
处理完的数据经过
keyBy
后发送到
subtask B0
、
B1
所在节点去处理。
那么问题来了,下图中,上游
Producer
向下游
Consumer
发送数据,在发送端和接受端都有相应的 Send Buffffer 和
Receive Buffffer
,但是上游
Producer
⽣成数据的速率⽐下游
Consumer
消费数据的速 率快。Producer
⽣产数据
2MB/s
,
Consumer
消费数据
1MB/s
,
Receive Buffffer
只有
5MB
,所以过 了5
秒后,接收端的
Receive Buffffer
满了。(可以把下图中的
Producer
当做上⾯案例中的
subtask A0,把下图中的
Consumer
当做上⾯案例中的
subtask B0
) 下游接收区的 Receive Buffffer
有限,如果上游⼀直有源源不断的数据,那么将会⾯临着以下两个情况:
1.
下游消费者会丢弃新到达的数据,因为下游消费者的缓冲区放不下
2.
为了不丢弃数据,所以下游消费者的
Receive Buffffer
持续扩张,最后耗尽消费者的内存,
OOM
,
程序挂掉
常识告诉我们,这两种情况在⽣产环境都是不能接受的,第⼀种会把数据丢弃、第⼆种会把我们的应⽤ 程序挂掉。所以,该问题的解决⽅案不应该是下游 Receive Buffffer
⼀直累积数据,⽽是上游
Producer 发现下游 Consumer
处理⽐较慢之后,应该在
Producer
端做出限流的策略,防⽌在下游
Consumer 端⽆限制的数据堆积。 那上游 Producer
端该如何做限流呢?可以采⽤下图所示静态限流的策略: 静态限速的思想就是,提前已知下游 Consumer
的消费速率,然后通过在上游
Producer
端使⽤类似令 牌桶的思想,限制 Producer
端⽣产数据的速率,从⽽控制上游
Producer
端向下游
Consumer
端发送 数据的速率。但是静态限速会存在问题:
1.
通常⽆法事先预估下游
Consumer
端能承受的最⼤速率
2.
就算通过某种⽅式预估出下游
Consumer
端能承受的最⼤速率,下游应⽤程序也可能会因为⽹络
抖动、
CPU
共享竞争、内存紧张、
IO
阻塞等原因造成下游应⽤程序的吞吐量降低,然后⼜会出现 上⾯所说的下游接收区的 Receive Buffffer
有限,上游⼀直有源源不断的数据发送到下游的问题, 还是会造成下游要么丢数据,要么为了不丢数据 buffffer
不断扩充导致下游
OOM
的问题 综上所述,我们发现了,上游 Producer
端必须有⼀个限流的策略,且静态限流是不可靠的,于是就需 要⼀个动态限流的策略。可以采⽤下图动态反馈所示:下游
Consumer
端会频繁地向上游
Producer
端进⾏动态反馈,告诉
Producer
下游
Consumer
的负载 能⼒,从⽽ Producer
端动态调整向下游
Consumer
发送数据的速率实现
Producer
端的动态限流。当 Consumer 端处理较慢时,
Consumer
将负载反馈到
Producer
端,
Producer
端
会根据反馈适当降低
Producer
⾃身从上游或者
Source
端读数据的速率
来降低向下游
Consumer
发送数据的速率。当 Consumer 处理负载能⼒提升后,⼜及时向
Producer
端反馈,
Producer
会通过提升从上游或
Source 端读数据的速率来提升向下游发送数据的速率。通过这个动态反馈来提升整个系统的吞吐量。 补充⼀点,假如我们的 Job 分为
Task A
、
B
、
C
,
Task A
是
Source Task
、
Task B
处理数 据、Task C
是
Sink Task
。假如
Task C
由于各种原因吞吐量降低,会将负载信息反馈给
Task B
,
Task B 会降低向 Task C
发送数据的速率,此时如果
Task B
如果还是⼀直从
Task A
读取数据,那么按照同样的 道理,数据会把 Task B
的
Send Buffffer
和
Receive Buffffer
撑爆,⼜会出现上⾯描述的问题。所以,当 Task B 的
Send Buffffer
和
Receive Buffffer
被⽤完后,
Task B
会⽤同样的原理将负载信息反馈给
Task A,
Task A
收到
Task B
的负载信息后,会降低 给
Task B
发送数据的速率,以此类推。 上⾯这个流程,就是 Flink
动态限流(反压机制)的简单描述。我们可以看到
Flink
的反压其实是从下游 往上游传播的,⼀直往上传播到 Source Task
后,
Source Task
最终会降低从
Source
端读取数据的速 率。如果下游 Task C
的负载能⼒提升后,会及时反馈给
Task B
,于是
Task B
会提升往
Task C
发送数 据的速率,Task B
⼜将负载提升的信息反馈给
Task A
,
Task A
就会提升从
Source
端读取数据的速率, 从⽽提升整个系统的负载能⼒。 读到这⾥,我们应该知道 Flink 为什么需要⼀个⽹络流控机制了,并且知道
Flink
的⽹络流控机制必须是 ⼀个动态反馈的过程。但是还有以下⼏个问题:
1.
数据具体是怎么从上游
Producer
端发送到下游
Consumer
端的?
2. Flink
的动态限流具体是怎么实现的?下游的负载能⼒和压⼒是如何传递给上游的?
我们带着这两个问题,学习下⾯的
Flink
⽹络流控与反压机制
Flink V1.5
版之前⽹络流控介绍
在
Flink V1.5
版之前,其实
Flink
并没有刻意做上述所说的动态反馈。那么问题来了,没有做上述的动 态反馈机制,Flink
难道不怕数据丢失或者上游和下游的⼀些
Buffffer
把内存撑爆吗?当然不怕了,因为 Flink 已经依赖其他机制来实现了所谓的动态反馈。其实很简单,让我们继续往下看。 对于⼀个 Flink
任务,动态反馈可以抽象成以下两个阶段:
1.
跨
Task
,动态反馈如何从下游
Task
的
Receive Buffffer
反馈给上游
Task
的
Send Buffffer 当下游 Task C
的
Receive Buffffer
满了,如何告诉上游
Task B
应该降低数据发送速率 当下游 Task C
的
Receive Buffffer
空了,如何告诉上游
Task B
应该提升数据发送速率 注:这⾥⼜分了两种情况,Task B
和
Task C
可能在同⼀台节点上运⾏,也有可能不在同⼀个台节 点运⾏ Task B 和
Task C
在同⼀台节点上运⾏指的是:⼀台节点运⾏了⼀个或多个
TaskManager
, 包含了多个 Slot
,
Task B
和
Task C
都运⾏在这台节点上,且
Task B
是
Task C
的上游,给 Task C 发送数据。此时
Task B
给
Task C
发送数据实际上是同⼀个
JVM
内的数据发送,所以
不存在⽹络通信 Task B 和
Task C
不在同⼀台节点上运⾏指的是:
Task B
和
Task C
运⾏在不同的 TaskManager 中,且
Task B
是
Task C
的上游,给
Task C
发送数据。此时
Task B
给
Task C 发送数据是跨节点的,所以会存在⽹络通信
2. Task
内,动态反馈如何从内部的
Send Buffffer
反馈给内部的
Receive Buffffer 当 Task B
的
Send Buffffer
满了,如何告诉
Task B
内部的
Receive Buffffer
下游
Send Buffffer
满 了、下游处理性能不⾏了?因为要让 Task B
的
Receive Buffffer
感受到压⼒,才能把下游的压⼒传 递到 Task A 当 Task B
的
Send Buffffer
空了,如何告诉
Task B
内部的
Receive Buffffer
下游
Send Buffffer
空 了,下游处理性能很强,上游加快处理数据吧
跨
TaskManager
,反压如何向上游传播
先了解⼀下
Flink
的
TaskManager
之间⽹络传输的数据流向:
图中,我们可以看到
TaskManager A
给
TaskManager B
发送数据,
TaskManager A
做为
Producer
, TaskManager B 做为
Consumer
。
Producer
端的
Operator
实例会产⽣数据,最后通过⽹络发送给 Consumer 端的
Operator
实例。
Producer
端
Operator
实例⽣产的数据⾸先缓存到
TaskManager
内 部的 NetWork Buffffer
。
NetWork
依赖
Netty
来做通信,
Producer
端的
Netty
内部有 ChannelOutbound Buffffer,
Consumer
端的
Netty
内部有
ChannelInbound Buffffer
。
Netty
最终还是 要通过 Socket
发送⽹络请求,
Socket
这⼀层也会有
Buffffer
,
Producer
端有
Send Buffffer
,
Consumer 端有 Receive Buffffer
。
总结⼀下,现在有两个 TaskManager A
、
B
,
TaskManager A
中
Producer Operator
处理完的数据由 TaskManager B 中
Consumer Operator
处理。那么
Producer Operator
处理完的数据是怎么到达 Consumer Operator 的?⾸先
Producer Operator
从⾃⼰的上游或者外部数据源读取到数据后,对⼀ 条条的数据进⾏处理,处理完的数据⾸先输出到 Producer Operator
对应的
NetWork Buffffer
中。 Buffffer 写满或者超时后,就会触发将
NetWork Buffffer
中的数据拷⻉到
Producer
端
Netty
的 ChannelOutbound Buffffer,之后⼜把数据拷⻉到
Socket
的
Send Buffffer
中,这⾥有⼀个从⽤户态拷⻉ 到内核态的过程,最后通过 Socket
发送⽹络请求,把
Send Buffffer
中的数据发送到
Consumer
端的 Receive Buffffer。数据到达
Consumer
端后,再依次从
Socket
的
Receive Buffffer
拷⻉到
Netty
的 ChannelInbound Buffffer,再拷⻉到
Consumer Operator
的
NetWork Buffffer
,最后
Consumer Operator 就可以读到数据进⾏处理了。这就是两个
TaskManager
之间的数据传输过程,我们可以看到 发送⽅和接收⽅各有三层的 Buffffer
。 了解了数据传输流程,我们再具体了解⼀下跨 TaskManager
的反压过程,如下图所示,Producer 端⽣ 产数据速率为 2
,
Consumer
消费数据速率为
1
。持续下去,下游消费较慢,
Buffffer
容量⼜是有限的, 那 Flink
反压是怎么做的?
上⾯介绍后,我们知道每个
Operator
计算数据时,输出和输⼊都有对应的
NetWork Buffffer
,这个 NetWork Buffffer 对应到
Flink
就是图中所示的
ResultSubPartition
和
InputChannel
。 ResultSubPartition 和
InputChannel
都是向
LocalBufffferPool
申请
Buffffer
空间,然后 LocalBufffferPool 再向
NetWork BufffferPool
申请内存空间。这⾥,
NetWork BufffferPool
是 TaskManager 内所有
Task
共享的
BufffferPool
,
TaskManager
初始化时就会向堆外内存申请
NetWork BufffferPool。
LocalBufffferPool
是每个
Task
⾃⼰的
BufffferPool
,假如⼀个
TaskManager
内运⾏着
5
个 Task,那么就会有
5
个
LocalBufffferPool
,但
TaskManager
内永远只有⼀个
NetWork BufffferPool
。 Netty 的
Buffffer
也是初始化时直接向堆外内存申请内存空间。虽然可以申请,但是
必须明⽩内存申请肯
定是有限制的,不可能⽆限制的申请
,我们在启动任务时可以指定该任务最多可能申请多⼤的内存空间 ⽤于 NetWork Buffffer
。 我们继续分析我们的场景, Producer
端⽣产数据速率为
2
,
Consumer
端消费数据速率为
1
。数据从 Task A 的
ResultSubPartition
按照上⾯的流程最后传输到
Task B
的
InputChannel
供
Task B
读取并计 算。持续⼀段时间后,由于 Task B
消费⽐较慢,导致
InputChannel
被占满了,所以
InputChannel
向 LocalBufffferPool 申请新的
Buffffer
空间,
LocalBufffferPool
分配给
InputChannel
⼀些
Buffffer
。
再持续⼀段时间后,InputChannel
重复向
LocalBufffferPool
申请
Buffffer
空间,导致
LocalBufffferPool 也满了,所以 LocalBufffferPool
向
NetWork BufffferPool
申请
Buffffer
空间,
NetWork BufffferPool
给 LocalBufffferPool 分配
Buffffer
。
再持续下去,NetWork BufffferPool
满了,或者说
NetWork BufffferPool
不能把⾃⼰的
Buffffer
全分配给 Task B 对应的
LocalBufffferPool
,因为
TaskManager
上⼀般会运⾏了多个
Task
,每个
Task
只能使⽤ NetWork BufffferPool 中的⼀部分。所以,
可以认为
Task B
把⾃⼰可以使⽤的
InputChannel
、
LocalBufffferPool
和
NetWork BufffferPool
都⽤完了
。此时
Netty
还想把数据写⼊到
InputChannel
, 但是发现 InputChannel
满了,所以
Socket
层会把
Netty
的
autoRead disable
,
Netty
不会再从 Socket 中去读消息。可以看到下图中多个 ,表示
Buffffer
已满,数据已经不能往下游写了,发⽣了阻 塞。
由于 Netty
不从
Socket
的
Receive Buffffer
读数据了,所以很快
Socket
的
Receive Buffffer
就会变满, TCP 的
Socket
通信有动态反馈的流控机制,会把容量为
0
的消息反馈给上游发送端,所以上游的
Socket
就不会往下游再发送数据 。
Task A
持续⽣产数据,发送端
Socket
的
Send Buffffer
很快被打满,所以
Task A
端的
Netty
也会停⽌往 Socket 写数据。 接下来,数据会在 Netty
的
Buffffer
中缓存数据,但
Netty
的
Buffffer
是⽆界的。但可以设置
Netty
的⾼
⽔位,即:设置⼀个
Netty
中
Buffffer
的上限。所以每次
ResultSubPartition
向
Netty
中写数据时,都 会检测 Netty
是否已经到达⾼⽔位,如果达到⾼⽔位就不会再往
Netty
中写数据,防⽌
Netty
的
Buffffer ⽆限制的增⻓。
接下来,数据会在
Task A
ResultSubPartition
中累积,
ResultSubPartition
满了后,会向 LocalBufffferPool 申请新的
Buffffer
空间,
LocalBufffferPool
分配给
ResultSubPartition
⼀些
Buffffer
。
持续下去 LocalBufffferPool
也会⽤完,
LocalBufffferPool
再向
NetWork BufffferPool
申请
Buffffer
。
然后
NetWork BufffferPool
也会⽤完,或者说
NetWork BufffferPool
不能把⾃⼰的
Buffffer
全分配给
Task A 对应的
LocalBufffferPool
,因为
TaskManager
上⼀般会运⾏了多个
Task
,每个
Task
只能使⽤ NetWork BufffferPool 中的⼀部分。此时,
Task A
已经申请不到任何的
Buffffer
了,
Task A
的
Record Writer 输出就被
wait
,
Task A
不再⽣产数据。
通过上述的这个流程,来动态反馈,保障各个 Buffffer
都不会因为数据太多导致内存溢出。上⾯描述了整 个阻塞的流程,当下游 Task B
持续消费,
Buffffer
的可⽤容量会增加,所有被阻塞的数据通道会被⼀个 个打开,之后 Task A
⼜可以开始正常的⽣产数据了。 之前介绍,Task
之间的数据传输可能存在上游的
Task A
和下游的
Task B
运⾏在同⼀台节点的情况,整 个流程与上述类似,只不过由于 Task A
和
B
运⾏在同⼀个
JVM
,所以不需要⽹络传输的环节,
Task B 的 InputChannel
会直接从
Task A
的
ResultSubPartition
读取数据。
Task
内部,反压如何向上游传播 假如 Task A
的下游所有
Buffffer
都占满了,那么
Task A
的
Record Writer
会被
block
,
Task A
的 Record Reader、
Operator
、
Record Writer
都属于同⼀个线程,所以
Task A
的
Record Reader
也会 被 block
。
然后可以把这⾥的
Task A
类⽐成上⾯所说的
Task B
,
Task A
上游持续⾼速率发送数据到
Task A
就会导 致可⽤的 InputChannel
、
LocalBufffferPool
和
NetWork BufffferPool
都会被⽤完。然后
Netty
、
Socket 同理将压⼒传输到 Task A
的上游。
假设 Task A
的上游是
Task X
,那么
Task A
将压⼒反馈给
Task X
的过程与
Task B
将压⼒反馈给
Task A 的过程是⼀样的。整个 Flink
的反压是从下游往上游传播的,⼀直传播到
Source Task
,
Source Task
有 压⼒后,会降低从外部组件中读取数据的速率,例如:Source Task
会降低从
Kafka
中读取数据的速 率,来降低整个 Flink Job
中缓存的数据,从⽽降低负载。
Flink V1.5
版之前的反压策略存在的问题
看着挺完美的反压机制,其实是有问题的。如下图所示,我们的任务有
4
个
SubTask
,
SubTask A
是 SubTask B的上游,即
SubTask A
给
SubTask B
发送数据。
Job
运⾏在两个
TaskManager
中, TaskManager 1 运⾏着
SubTask A.1
和
SubTask A.2
,
TaskManager 2
运⾏着
SubTask B.3
和 SubTask B.4。现在假如由于
CPU
共享或者内存紧张或者磁盘
IO
瓶颈造成
SubTask B.4
遇到瓶颈、处理 速率有所下降,但是上游源源不断地⽣产数据,所以导致 SubTask A.2
与
SubTask B.4
产⽣反压。 这⾥需要明确⼀点:不同 Job
之间的每个(远程)⽹络连接将在
Flink
的⽹络堆栈中获得⾃⼰的
TCP
通 道。 但是,如果同⼀ Task
的不同
SubTask
被安排到同⼀个
TaskManager
,则它们与其他 TaskManager 的⽹络连接将
被多路复⽤并共享⼀个
TCP
信道以减少资源使⽤
。例如,图中的
A.1 -> B.3、
A.1 -> B.4
、
A.2 -> B.3
、
A.2 -> B.4
这四条将会多路复⽤共享⼀个
TCP
信道。 现在 SubTask B.3
并没有压⼒,从上⾯跨
TaskManager
的反压流程,我们知道当上图中
SubTask A.2 与 SubTask B.4
产⽣反压时,会把
TaskManager1
端该任务对应
Socket
的
Send Buffffer
和 TaskManager2 端该任务对应
Socket
的
Receive Buffffer
占满,多路复⽤的
TCP
通道已经被占住了,会 导致 SubTask A.1
和
SubTask A.2
要发送给
SubTask B.3
的数据全被阻塞了,从⽽导致本来没有压⼒的 SubTask B.3 现在接收不到数据了。所以,
Flink 1.5
版之前的反压机制会存在当⼀个
Task
出现反压时, 可能导致其他正常的 Task
接收不到数据。
Credit
的反压策略实现原理
Flink 1.5
之后,为了解决上述所描述的问题,引⼊了基于
Credit
的反压机制。如下图所示,反压机制作 ⽤于 Flink
的应⽤层,即在
ResultSubPartition
和
InputChannel
这⼀层引⼊了反压机制。每次上游 SubTask A.2 给下游
SubTask B.4
发送数据时,会把
Buffffer
中的数据和上游
ResultSubPartition
堆积的 数据量 Backlog size
发给下游,下游会接收上游发来的数据,并向上游反馈⽬前下游现在的
Credit
值, Credit 值表示⽬前下游可以接收上游的
Buffffer
量,
1
个
Buffffer
等价于
1
个
Credit
。
例如,上游
SubTask A.2
发送完数据后,还有
5
个
Buffffer
被积压,那么会把发送数据和
Backlog size = 5 ⼀块发送给下游
SubTask B.4
,下游接受到数据后,知道上游积压了
5
个
Buffffer
,于是向
Buffffer Pool 申请 Buffffer
,由于容量有限,下游
InputChannel
⽬前仅有
2
个
Buffffer
空间,所以,
SubTask B.4
会向 上游 SubTask A.2
反馈
Channel Credit = 2
。然后上游下⼀次最多只给下游发送
2
个
Buffffer
的数据,这 样每次上游发送的数据都是下游 InputChannel
的
Buffffer
可以承受的数据量,所以通过这种反馈策略, 保证了不会在公⽤的 Netty
和
TCP
这⼀层数据堆积⽽影响其他
SubTask
通信。 ResultSubPartition 会把
buffffer
和
backlog size
同时发送给下游,下游向上游反馈
credit
。再⽤⼀个案 例来详细地描述⼀下整个过程。
Task A
向
Task B
发送了数据
<8,9>
和
backlog size =3
,下游
InputChannel
接受完
<8,9>
后,发现上 游⽬前积压了 3
条数据,但是⾃⼰的缓冲区不够,于是向
LocalBufffferPool
申请
buffffer
空间,申请成功 后,向上游反馈 credit = 3
,表示下游⽬前可以接受
3
条记录(实际上是以
Buffffer
为单位,⽽不是记录 数,Flink
将真实记录序列化后的⼆进制数据放到
Buffffer
中),然后上游下次最多发送
3
条数据给下 游。持续下去,上游⽣产数据速率⽐下游消费速率快,所以
LocalBufffferPool
和
NetWork BufffferPool
都会 被申请完,下游的 InputChannel
没有可⽤的缓冲区了,所以会向上游反馈
credit = 0
,然后上游就不会 发送数据到 Netty
。所以基于
Credit
的反压策略不会导致
Netty
和
Socket
的数据积压。当然上游也不 会⼀直不发送数据到下游,上游会定期地仅发送 backlog size
给下游,直到下游反馈
credit > 0
时,上 游就会继续发送真正的数据到下游了。
基于 Credit
的反压机制还带来了⼀个优势:由于我们在发送⽅和接收⽅之间缓存较少的数据,可能
会更
早地将反压反馈给上游
,缓冲更多数据只是把数据缓冲在内存中,并没有提⾼处理性能。
Flink
如何在吞吐量和延迟做权衡?
Flink
天然⽀持流式处理,即每来⼀条数据就能处理⼀条,⽽不是像
Spark Streaming
⼀样,完全是微 批处理。但是为了提⾼吞吐量,默认使⽤的 Flink
并不是每来⼀条数据就处理⼀条。那这个到底是怎么 控制的呢? 我们分析了上述的⽹络传输后,知道每个 SubTask
输出的数据并不是直接输出到下游,⽽是在 ResultSubPartition 中有⼀个
Buffffer
⽤来缓存⼀批数据后,再
Flush
到
Netty
发送到下游
SubTask
。那 到底哪些情况会触发 Buffffer Flush
到
Netty
呢?
1. Buffffer
变满时
2. Buffffer timeout
时
3.
特殊事件来临时,
例如:
CheckPoint
的
barrier
来临时 Flink 在数据传输时,会把数据序列化成⼆进制然后写到
Buffffer
中,当
Buffffer
满了,需要
Flush
(默认 为32KiB
,通过
taskmanager.memory.segment-size
设置)。但是当流量低峰或者测试环节,可能
1
分 钟都没有 32 KB
的数据,就会导致
1
分钟内的数据都积攒在
Buffffer
中不会发送到下游
Task
去处理,从 ⽽导致数据出现延迟,这并不是我们想看到的。所以 Flink
有⼀个
Buffffer timeout
的策略,意思是当数 据量⽐较少,Buffffer
⼀直没有变满时,后台的
Output flflusher
线程会强制地将
Buffffer
中的数据
Flush 到下游。Flink
中默认
timeout
时间是
100ms
,即:
Buffffer
中的数据要么变满时
Flush
,要么最多等 100ms 也会
Flush
来保证数据不会出现很⼤的延迟。当然这个可以通过 env.setBufferTimeout(timeoutMillis) 来控制超时时间。timeoutMillis > 0 表示最⻓等待
timeoutMillis
时间,就会
flflush
timeoutMillis = 0 表示每条数据都会触发
flflush
,直接将数据发送到下游,相当于没有
Buffffer
了
(
避免设置为0
,可能导致性能下降
)
timeoutMillis = -1 表示只有等到
buffffer
满了或
CheckPoint
的时候,才会
flflush
。
相当于取消了 timeout 策略 严格来讲,Output flflusher
不提供任何保证
——
它只向
Netty
发送通知,⽽
Netty
线程会按照能⼒与意 愿进⾏处理。这也意味着如果存在反压,则 Output flflusher
是⽆效的。⾔外之意,如果反压很严重,下 游 Buffffer
都满了,当然不能强制⼀直往下游发数据。 ⼀些特殊的消息如果通过 RecordWriter
发送,也会触发⽴即
Flush
缓存的数据。其中最重要的消息包 括 Checkpoint barrier
以及
end-of-partition
事件,这些事件应该尽快被发送,⽽不应该等待
Buffffer 被填满或者 Output flflusher
的下⼀次
Flush
。当然如果出现反压,
CheckPoint barrier
也会等待,不能 发送到下游。 引⼊ Network buffffers
以获得更⾼的资源利⽤率和更⾼的吞吐量,代价是让⼀些记录在
Buffffer
中等待 ⼀段时间。虽然可以通过缓冲区超时给出此等待时间的上限,你可能知道有关这两个维度(延迟和吞 吐量)之间权衡的更多信息:显然,⽆法同时获得这两者。下图是 Flink
官⽹的博客展示的不同的 buffffer timeout 下对应的吞吐量,从
0
毫秒开始(每个记录都
flflush
)到
100
毫秒(默认值),测试在具 有 100
个节点每个节点
8
个
Slot
的群集上运⾏,每个节点运⾏没有业务逻辑的
Task
,因此只⽤于测试 ⽹络协议栈。为了进⾏⽐较,还测试了低延迟改进之前的 Flink 1.4
版本。
如图,使⽤
Flink 1.5+
,即使是⾮常低的
Buffffer timeout
(例如
1ms
,对于低延迟场景)也提供⾼达超 时默认参数(100ms
)
75
% 的最⼤吞吐,但会缓存更少的数据。但是笔者仍然不理解为什么
timeout 设置为0
时,吞吐量竟然能⽐
Flink 1.4
的吞吐量提⾼那么多。
Credit
只是解决了反压的问题,并不能优 化低延迟的吞吐量。杨华⽼师的回答是⽹络协议栈做了其他优化⽽且性能测试是在特定场景下做的。