1. 引言
随着信息时代的不断发展及线上线下交互需求的增加,实时计算能力强的流处理系统越来越被产业办重视。Flink因其具有强大的实时计算能力被广泛应用于实时计算 [1] [2]、高频交易和社交网络等方面。如美团 [3]、阿里云实时计算 [4] 及在线监测 [5] 等场景。
反压(backpressure)能力的好坏是判断实时处理系统数据处理能力强弱的重要依据。当系统瞬时负载高峰使得其接受数据的速度远远高于其处理数据的能力时,就会出现反压情况,如垃圾回收不及时或者停顿可能使得流入系统的数据快速堆积、大促或秒杀活动时出现的流量陡增等都会造成反压。如不对反压及时处理,将会使系统资源耗尽甚至导致系统崩溃。
Flink通过自身数据流来响应反压问题,下游的消费者处理数据减慢会降低上游发送者的发送速率。
2. 相关研究现状
现有大数据实时处理系统处理反压问题方面,Storm [6] [7] 是通过监控Bolt中的接收队列负载情况,如果超过高水位值就会将反压信息写到Zookeeper,Zookeeper上的watch会通知该拓扑的所有Worker都进入反压状态,最后Spout停止发送tuple。J Storm [8] [9] 采用逐级降速的方式来处理反压,使用Topology Master替代Zookeeper来协调拓扑进入反压状态,效果较Storm更为稳定。Spark Streaming [10] [11] 根据批处理时间(Batch Processing Time)和批次间隔(Batch Interval,即Batch Duration)的信息来动态调整系统的摄入速率,从而完成其反压工作。
在Flink优化方面,关沫使用Flink执行一种传统堆序优化后的算法Heap Optimize,增加了Flink的吞吐量 [12]。针对Flink默认的先来先服务的任务调度策略,王丽娟等人 [13] 通过资源感知,将待执行任务分配到最佳节点进行计算,优化了Flink的负载均衡。何贞贞等人 [14] 则根据任务间数据流的大小确定拓扑边的权重,以生成关键路径,大幅缩减了Flink节点间的通信开销。文献 [15] [16] [17] 把Flink从原来的CPU迁移扩展到异构的CPU-GPU集群,在并行计算、内存管理及通信策略方面极大地提高了Flink的计算能力。
目前诸多的研究当中没有Flink反压方面的问题,当Flink面临远端传输问题时,其所依托的Netty所采用的是一种静态的水位机制,这使得Flink在面临颠簸状态数据的远程传输问题时,容易出现反复反压的情况,极大地影响了Flink传输数据的效率,故而本文将针对此问题展开研究。
3. Flink反压原理解析
3.1. Flink反压原理
Flink的反压原理如图1所示,假如Flink的一个Job分为Task A、B、C,其中Task A是Source Task、Task B处理数据、Task C为Sink Task。假如Task C由于各种原因吞吐量降低,会将负载信息反馈给Task B,Task B会降低向Task C发送数据的速率,此时若Task B还保持从Task A读取数据,数据会把Task B的Send Buffer和Receive Buffer撑爆,导致OOM或者丢失数据。所以,当Task B的Send Buffer和Receive Buffer被用完后,Task B会用同样的原理将负载信息反馈给Task A,Task A收到Task B的负载信息后,会降低给Task B发送数据的速率,以此类推。
![](//html.hanspub.org/file/32-1542157x9_hanspub.png)
Figure 1. Flink back pressure schematic diagram
图1. Flink反压原理图
3.2. Flink网络传输的数据流向
Flink反压存在Task内与跨Task两种情况,本文已在图1中标注,本文主要是针对Flink跨Task传输进行反压优化,故下文主要对Flink跨Task传输进行介绍:图2展示了Flink网络传输时的数据流向,可以看到Task Manager A给TaskManager B发送数据,Task Manager A做为Producer,Task Manager B做为Consumer。Producer端的Operator实例会产生数据,最后通过网络发送给Consumer端的Operator实例。Producer端Operator实例生产的数据首先缓存到Task Manager内部的Net Work Buffer。Net Work依赖Netty来做通信,Producer端的Netty内部有Channel Outbound Buffer,Consumer端的Netty内部有Channel Inbound Buffer。Netty最终还是要通过Socket发送网络请求,Socket这一层也会有Buffer,Producer端有Send Buffer,Consumer端有Receive Buffer。
故Flink网络传输时的整个反压过程为:首先Producer Operator从自己的上游或者外部数据源读取到数据后,对一条条的数据进行处理,处理完的数据首先输出到Producer Operator对应的Net Work Buffer中。Buffer写满或者超时后,就会触发将Net Work Buffer中的数据拷贝到Producer端Netty的Channel Outbound Buffer,之后又把数据拷贝到Socket的Send Buffer中,这里有一个从用户态拷贝到内核态的过程,最后通过Socket发送网络请求,把Send Buffer中的数据发送到Consumer端的Receive Buffer。数据到达Consumer端后,再依次从Socket的Receive Buffer拷贝到Netty的Channel Inbound Buffer,再拷贝到Consumer Operator的Net Work Buffer,最后Consumer Operator就可以读到数据进行处理了,这就是两个Task Manager之间的数据传输过程。
4. Flink反压机制的缺点
如图2所示,Flink通过Netty完成其数据的网络传输任务,Netty在向底层的Channel写数据的时候会用到Channel Outbound Buffer,Channel Outbound Buffer本身是无界的,如果水位控制不当的话就会造成占用大量的内存,因此Netty为其配置了一个高水位线和低水位线。为避免上游数据量太大,当上游数据的大小超过高水位线的时候对应channel的isWritable就会变成false,当上游数据的大小低于低水位线的时候,isWritable就会变成true,低水位线主要是其自动恢复运行的一种保障,为便于理解下文所提的水位线均指高水位线。Flink以此来保证不在网络中写入太多数据,进而保证Flink的反压能力。
![](//html.hanspub.org/file/32-1542157x10_hanspub.png)
Figure 2. Data flow chart of Flink network transmission
图2. Flink网络传输数据流向图
分析源码可知,Netty水位机制是一种静态的机制,Netty默认其水位线的高度为定值,这使得Flink系统在面临瞬时流量不稳定的场景(即系统的数据流量值在特别高与特别低的值之间不断跳动时)时,会出现下述两种问题:
1) 水位值较下游可用缓存区数偏低:如图3 (左图)所示,图中以“圆圈”表示数据,以“方框”表示缓存区的大小,下同。假设当上游A点来临的数据量是9 (Flink中以buffer为数据单位,每个buffer大小为32 k,为便于表述,下文块描述),而此时下游B点的可用缓存区是10,H代表代表数据通道(其作用类似于水坝,水位值的大小决定了其单位时间通过的数据量大小),此处设水位值高度为4,则Flink传输本批次的数据需要3个单位时间(上游共9块数据,每个单位时间只能通过4块的数据,需要3个单位的时间来处理这批数据)。而若此时的水位值为9或者10的话,则只需要一个单位时间,Flink便可以处理本批次的数据。
2) 水位值较下游可用缓存区数偏高:如图3 (右图),假设当上游A点来临的数据量是4,而此时下游B点的可用缓存区为2,水位值高度为4。由于数据量不于水位值高度,Flink会误以为可以在一单位时间内接受这批数据,如图中可以看出,只有2块的缓存区,直接接收了4块的数据量,会直接导致内存溢出(OOM)甚至引起系统阻塞。综上,由于不合理的静态水位线的设置,使得Flink传输数据时间延长,或者出现非正常的阻塞,进而影响整个Flink的数据传输情况。
![](//html.hanspub.org/file/32-1542157x11_hanspub.png)
Figure 3. Schematic diagram of data transmission of different Netty water level values
图3. 不同Netty水位值数据传输示意图
综上,由于不合理的静态水位线的设置,使得Flink传输数据时间延长,或者出现非正常的阻塞,进而影响整个Flink的数据传输情况。
5. 反压优化算法
本节将针对Flink反压传输所存在的缺点,提出一种基于动态水位值的Flink调度优化算法,并给出示例进行说明:
5.1. 基于动态水位值的Flink调度优化算法
虽然可以在数据处理前对Netty所默认的两个buffer高度进行参数调整,但这种默认的定值始终是一种静态的机制。这种相对静态的机制使得Flink在面临远程传输问题时,容易出现上文所述的两种问题。本节将针对Flink反压传输所存在的缺点,提出一种基于动态水位值Flink调度优化算法,并给出例子进行说明。Flink-N算法的核心思想是:把Flink中Netty下游可用buffer数Bt实时写入Redis中,根据Redis中前后时刻buffer数(即Bt值)的大小变化,对水位值Wt进行动态调整,算法流程如图4所示,其具体步骤如下:
第一步,设置访问函数,并创建接口,使得Flink启动的同时运行访问函数。其中,访问函数的作用是,每间隔一段时间访问Netty下游缓存区可用buffer (图中B点位置)的数量,并将其记录到Redis中;
第二步,获得下游可用buffer数Bt;
第三步,取0.8倍的B0值的整数部分(向下取整)作为Netty的高水位值,即令W0H = ⌊0.8|B0|⌋;
第四步,将Bt值反馈到Redis中并记录;
第五步,根据Bt值调整水位值Wt,具体方法为:若Bt大于Bt与Bt−1的平均值,则Wt取Bt大于Bt与Bt−1的平均值;反之,当Bt ≤ Bt−1时,则令Wt = Bt。
第六步,重复第二步、第四步与第五步。
本文选择Redis是因为Flink处理时延是ms级别的,而Redis数据读取速度可达110,000次/s,写数据的速度可达81,000次/s,选择Redis相较于其他数据库而言,不会对Flink的时效性产生负增益。
5.2. 示例说明
现对所提出的算法给出实例加以说明:设某连续时间段t1、t2、t3、t4、t5内Netty上游来临的数据量分别8、4、8、4、8,且下游对应时间段内的缓存区数分别为10、4、10、4、10,在表1中以G表示对应时刻系统接收的数据量,以R表示系统实际的数据传输量。则两种不同的机制对应的数据传输结果如下(假设此处静态水位值为6,且每次阻塞系统需要两个时刻的时间才能恢复正常运行。):
如表1所示,对于Flink默认的反压算法,系统t1时刻要传输8单位的数据量,默认的水位值一直是6,故而Flink在t1时刻只能传输6块的数据。在t2时刻系统接收外部来的4块数据,同时要传输t1时刻剩余的2块数据,因此实际传输数据为6块,此时水位值为6,但下游缓存区只有4,故而会出现第4节中的第二种情况而导致系统阻塞。因为发生了阻塞,系统在t3、t4会自行调整恢复到可运行状态,无法进行数据的传输。t5、t6、t7、t8时刻的状态与t1、t2、t3、t4时刻的状态相似。t10时,系统已无外部的数据要接收,但仍有t9时刻的2块数据需要传输,此时水位值为6,下游缓存区数为4,可直接传输,从而完成所有数据的传输。
![](Images/Table_Tmp.jpg)
Table 1. Comparison of data transmission examples under two algorithms
表1. 两种算法下数据传输示例对照
对于Flink-N算法,因为t1时刻下游缓存区数为10,所以我们为水位值赋初始值8,此时要传输的数据为8,故传输后无数据剩余。t2时刻时,下游缓存区数为4,因为10与4的平均值7大于4,所以水位值设为4,上游数据为4,且下游缓存区为4,刚好可以把t2时刻的数据传输完毕。t3时刻时,下游缓存区为10,因为10与4的平均值7小于10,所以水位值设为7,此时上游来临的数据为8,下游缓存区数为10,故t3时刻传输后,系统会剩余1单位数据在t4时刻传输。t4时刻时,下游缓存区数为4,因为10与4的平均值大于4,所以t4时刻的水位值设为4,此时外部数据为4,加之t3时刻的数据剩余,整个t4时刻系统实际要传输的数据为5,此时下游缓存区为4,因此t4时刻后,系统会剩余1块数据在t5时刻传输。t5时刻下游缓存区数为10,因为10与4的平均值7小于10,故水位值设为7,此时外部数据为8,加上t4时刻剩余的1块数据,t5时刻实际要传输的数据为9,则t5时刻后系统剩余2块的数据在t6时刻传输。在t6时刻时,系统下游缓存区为4,因为10与4的平均值7大于4,故水位值设为4,此时系统外部无待接收数据,加上t5时刻剩余的2块数据,则系统t6实际要传输的数据为2,可以全部完成传输。
综上,对于本例所给出的数据颠簸的案例,Flink完成整个传输需要10个时刻,Flink-N只需6个时刻即可完成数据传输。显然,理论上Flink-N较Flink默认的反压算法具有更好的时效性。
6. 实验分析
6.1. 实验环境
实验在三台配置均为Inter(R)Core(TM) i7-8700、2.66 GHz CPU、32GDDR4内存、2T硬盘的机器所组成的集群上进行,采用一主两从结构,其他配置如表2所示:
6.2. 实验及分析
实验选择典型大数据处理应用单词统计Word Count进行对比实验,将Flink默认反压机制与优化后的算法机制Flink-N进行对比。
实验分别从kafka集群中订阅同一个Topic,而kafka不断读取合成的人工数据集并向该Topic注入数据。Flink处理的数据便会随着时间的变化越来越多。为了消除不确定因素对实验结果的影响,本文将实验分别运行10组,取统计的平均值进行分析,分别观察两组实验各个时间点的吞吐量、CPU利用率及传输时延。
1) 吞吐量效果评测
如图5显示的是Flink默认反压机制与Flink-N在吞吐量方面的差别,相比于默认反压机制,Flink-N的反压机制在整体吞吐量上提高了20%左右。
2) CPU利用率效果评测
如图6显示的是Flink默认反压机制与Flink-N在CPU利用率方面的差别,相比于默认的Flink反压机制,Flink-N的反压机制在系统刚开始运行时便可以迅速提高系统的利用率。Redis的加入,使得Flink-N一开始便拥有更高的CPU利用率,相较于Flink本身的反压机制,其最高优化率达到36%,平均优化率为21%。
3) 传输时延效果评测
如图7所示为Flink与Flink-N的传输时延效果对比,随着时间增加,系统的吞吐量不断增大,伴随着系统吞吐量的增加,系统的传输时延不断增大。虽然在系统运行初始阶段,由于数据传输量小,但系统整体组件较多,Flink-N也会出现时延较大的情况,但从整体角度来看,Flink-N在时延方面的优化还是很成功的,其整体优化率达18%,最大优化率达23%。
7. 结论与展望
本文基于Netty水位机制,对Flink反压机制进行了动态水位调优,使Flink在面临远程传输问题时,具有更好的反压能力。经实验评测可知:优化后的Flink-N算法在吞吐量、CPU利用率及时延方面均优于Flink默认的反压机制。但本文研究工作也有不足,可以从以下两方面改进:1) 对Flink本地传输进行反压优化问题;2) 本文在Flink-N算法中接入了Redis,是一种曲线式解决问题的方法。
基金项目
科技部重点研发项目“云计算和大数据”重点专项项目(2018YFB1004402)。