并行计算框架Spark中一种新的RDD分区权重缓存替换算法

2018-10-18 02:18恒,谭良,2
小型微型计算机系统 2018年10期
关键词:代价列表分区

刘 恒,谭 良,2

1(四川师范大学 计算机科学学院,成都 610101)

2(中国科学院 计算技术研究所,北京 100190)

1 引 言

随着数据量与日俱增,人们对数据处理效率的需求也逐步增加.Spark[1]是继Hadoop[2]之后被提出基于内存计算的可扩展的高性能并行计算框架.针对Hadoop的不足[3,4],即大量的网络传输和磁盘I/O使得效率低下,Spark使用内存进行数据计算,同样的算法在Spark中的运行速度比Hadoop快10倍~100倍[5],而且Spark广泛的支持了多种的计算模式,包括批处理、迭代计算、交互式查询和流处理.这使得Spark的应用越来越广泛,使用它的公司包括雅虎、百度和腾讯.但Spark性能方面仍需提高,特别是当大量任务集中处理时,有限的内存资源会限制Spark系统整体性能的发挥.缓存替换是合理利用内存资源、提高任务处理性能的关键技术,所以针对缓存替换算法的研究便成为了热点[6-13].

当前Spark缓存替换算法是LRU[14].该算法的缺点是在进行缓存替换时,Spark不能预测RDD对象将来的使用顺序,没有考虑到RDD计算代价和大小等影响任务执行效率的重要因素,度量方法不够准确.当高重用但最近未使用或重新计算耗费极高代价的RDD的分区被替换出去,应用执行的效率会降低.针对LRU在计算框架中的表现不够理想,便有了一系列改进措施被提出.如使用其他典型的替换算法包括:FIFO、LFU、LRFU、MIN来代替LRU等[9].其中权重RDD缓存替换算法是最重要的改进措施之一.在计算过程中不同的RDD,由于计算代价不同、大小不同,所以对整个计算过程的影响便不同.然而当前权重RDD缓存替换算法存在对权重值的计算结果不准确和考虑因素不全面,对计算代价缺乏量化的问题.因此,研究RDD权重缓存替换具有一定的现实意义.

针对上述问题,本文提出了一种新的RDD分区权重缓存替换算法——WCSRP,WCSRP综合考虑了RDD的计算代价、使用次数、分区的大小和生命周期四大因素,并进行量化计算.相比于之前的研究,弥补了对RDD计算代价缺乏量化的缺陷,增加考虑了应用在Spark中运算时Task的locality level决定的输入RDD位置,这时会涉及到网络传输或磁盘I/O,对应用执行时间存在影响,另外相比较于使用RDD的权重值作为是否替换的度量工具,使用分区的权重值更符合Spark底层缓存机制.这样使得权重值的计算更加准确,提高了内存利用率和作业执行效率.

2 相关工作

目前,国内外对Spark系统的缓存替换算法做出了不少研究,取得了一些的研究成果.下面我们对这些成果进行分析和总结.

对于国外,文献[15]提出了Tachyon,一种基于内存的分布式文件系统,能够充分发挥分布式集群的内存特性,能够为Spark提供存储服务,但在Tachyon的实现中,替换算法还是采用LRU.文献[10]提出了在内存空间不足的情况下,在内存上压缩数据来节省内存空间,然而压缩数据又会带来CPU资源消耗的问题,在对数据集进行复杂处理时CPU资源也是非常宝贵的.文献[11]中将Spark任务中同一个Stage的RDD看作同一组,当缓存已满或没有足够空间时,移除缓存中和待缓存RDD中不属于同一组的RDD,如果都属于同一组,则溢出到磁盘进行存储,该方法在对RDD进行替换时仅考虑了一个Stage,这种粗粒度的缓存机制存在局限性.文献[16]提出了AWRP(Adaptive Weight Ranking Policy)算法为每个缓存对象计算权重,并优先转换权重值最低的缓存对象,但权重值计算方法没有对Spark的RDD缓存进行针对性的优化,并不适用于分布式计算框架.

对于国内,文献[12]提出了使用RDD权重值来进行缓存替换,将RDD的计算代价、使用次数、自身大小和生命周期作为衡量RDD权重的因素.然而作者并没有对计算代价这一因素提出合理的计算方法,因此影响了RDD权重计算的准确性.文献[9]提出了自适应缓存管理策略,同样使用RDD权重值作为缓存替换的度量工具,借鉴了文献[12]中影响权重值的四大因素,提出使用RDD生成时长来代替计算代价,并且忽略了和RDD大小有关的因素.这样虽然使计算简便了许多,然而并没有解决RDD权重计算缺乏准确性这一缺陷,影响因素考虑不全面.作者在做RDD置换的时候没有考虑到Spark系统在进行缓存置换的时候置换的单位是Block,对应的便是RDD的一个分区.文献[13]分析了Spark计算框架的内存管理和缓存机制,提出了基于RDD分区的权重计算,并以该权重值作为缓存置换的标准.但是文中仅仅考虑了RDD分区大小、RDD分区被使用次数和RDD分区存在内存中的时间,并没有考虑影响一个RDD的其他因素,比如计算代价.这样得到的权重值是不准确的,会影响Spark集群的计算效率.

总结起来,当前针对Spark缓存替换的研究工作还不够完善,已有的基于权重的缓存替换算法存在权重值计算不准确,考虑因素不全面,度量方法不够细致,影响了缓存的命中率和作业执行的效率.本文结合现有大数据快速实时处理的需要,对Spark内核中RDD对象的缓存替换进行研究分析,提出了一种新的RDD分区权重缓存替换算法,包括权重计算和缓存替换的改进措施.通过细化分析RDD分区对象的影响因素,并对这些因素进行量化分析,使得 RDD分区权重值的计算更加精确.进而在内存资源不足的情况下,缓存中的RDD分区的替换更加合理.弱化了内存资源对整体性能的影响,让集群效能得到了极大的发挥.

3 RDD分区权重新模型——WCSRP

Spark中最重要的抽象概念就是RDD,RDD通常是通过HDFS(或其他Hadoop支持的文件系统)上的文件,或者驱动器中的Scala集合对象,来创建或转换得到的.在Spark中一个RDD被划分成若干个partition,因此对于RDDi可表示为RDDi={p1,p2,p3,…,pn}.用户可以请求Spark将RDD缓存在内存中,当RDD被缓存在内存中,由于内存的容量是有限的,会出现内存存储资源不够用的情况.

由此,可以得出通过如下公式计算出RDD分区的权重:

w=α0CRDDi+α1FRDDi+α2Sp+α3LTRDDi+α4ILRDDi

(1)

公式(1)中 ,w表示该RDD分区的权重值,CRDDi表示该RDD的计算代价,FRDDi表示该RDD的使用次数,Sp表示该分区的大小,LTRDDi表示该RDD的生命周期,ILRDDi表示计算该RDD输入RDD的位置。A={α0,α1,α2,α3,α4}中的元素是常数,分别是CRDDi、FRDDi、Sp、LTRDDi和ILRDDi的归一化权重,权重值得选取由用户的具体任务Task需求决定。

下面,我们将分别对CRDDi、FRDDi、Sp、LTRDDi和ILRDDi的权重进行量化计算。

3.1 RDD计算代价CRDDi

在Spark中RDD作为计算的参与者,在宏观上,所有的计算过程都是根据RDD进行的,但从微观上来看,算子的操作其实是作用在RDD的不同分区上.当一个RDD经过转化操作派生出新的RDD时,Spark会使用谱系图(lineage graph)来记录这些不同RDD之间的依赖关系,Spark需要用这些信息来按需计算每个RDD,也可以依靠谱系图通过用缓存中的RDD来恢复下游的RDD,而不需要从头计算[17].因此RDD之间的依赖关系便成了整个应用运行的关键.在Spark源码中Dependency类是依赖关系的基类,其中NarrowDependency和ShuffleDependency均继承自该基类.前者表示RDD之间是窄依赖关系,这个的RDD会被划到同一个Stage中,这样就可以以管道的方式迭代执行;后者表示RDD之间是宽依赖关系,依赖上游的RDD不止一个,且多个子分区会依赖于同一个父RDD分区.

由于依赖关系的不同,在计算一个RDD的时候需要的进行计算的次数也是不同的.如图1所示.

图1 窄依赖和宽依赖结构图Fig.1 Narrow dependency and wide dependency

其中,RDD A和RDD B属于窄依赖,故RDD A的分区经过转换操作一产生RDD B中的分区.有图可以看出RDD A和RDD B的分区数均是4,故计算产生RDD B时,需要在4个分区上进行计算,我们可以把这个计算的过程当做RDD B的计算代价,故RDD B的计算代价值为4.也就是说通过窄依赖得到的RDD的计算代价就是该RDD的父RDD分区数.RDD C和RDD D属于宽依赖,RDD C的每个分区都可能被RDD D所使用,RDD D的分区中的数据同样来自RDD C所有的分区,在进行计算的时候对RDD C的每个分区的操作次数等于RDD D的分区数.从图1可以知道RDD C的分区数为3,RDD D的分区数为3,那么总的计算次数便是9,和窄依赖相同我们也可以得到RDD D的计算代价,所以RDD D的计算代价值是9.这样便得到了通过宽依赖得到的RDD的计算代价就是该RDD的分区数与父RDD的分区数的乘积.

由此,我们给出RDD计算代价的计算公式如下:

(2)

其中,parspRDDi表示属于宽依赖的父RDD的分区数,a表示是否存在宽依赖,若不存在宽依赖a=0,若存在a=1,parnpRDDj表示属于窄依赖的父RDD的分区数,b表示是否存在窄依赖,若不存在窄依赖b=0,若存在b=1.

3.2 RDD使用次数FRDDi

由于Spark的懒加载机制,使得Spark可以在Action算子触发SparkContext.runJob之前,程序在运行时并没有引入数据进行计算,此时系统通过分析代码的逻辑,初始化通过Transformation算子产生的RDD(此时这些RDD中并没有实际的数据).又因为每个RDD都有其依赖(除了最顶级RDD的依赖是空列表),所以可以确定各个RDD之间的依赖关系,然后根据RDD之间的依赖关系构建成DAG[18]图,将不同的RDD串联起来.在DAG图中点对应RDD,边对应一个算子.

下面,我们给出一个PageRank的例子,代码如图2所示.

图2 PageRank的Spark实现代码图Fig.2 Implement PageRank in Spark

分析代码可知,在调用saveAsTextFile这个Action算子之前,所有的Transformation算子都是为了构建RDD之间的依赖关系.而这些依赖关系也就构成了DAG图,图3便是PageRank在做两轮迭代时的DAG图.因为DAG图中边对应一个算子,那么也就是说这时该RDD被使用了一次,在图3中我们可以看出links出去有三个箭头,那么可以知道此时links被使用了三次.

所以我们可以通过统计DAG图中某个RDD的出度,便可以得到该RDD的使用次数.具体公式如下:

FRDDi=N

(3)

其中,N表示RDDi的出度.

3.3 RDD分区大小Sp

在Spark中使用BlockManager来缓存RDD数据,程序内部定义了抽象类BlockStore,用于制定所有存储类型的规范.目前BlockStore的具体实现包括MemoryStore、DiskStore和TachyonStore.其中MemoryStore是负责将没有序列化的Java对象或序列化的ByteBuffer存储到内存中.开发者使用 persist()或者 cache()函数来标记一个 RDD 是持久化的,当这个 RDD 在被一个action触发的作业提交计算后,它就会缓存在内存中.我们都知道,RDD 的运算是基于partition,每个task代表一个分区上一个stage内的运算闭包,task被分别调度到多个executor上去运行,运算过程中的RDD若需要存储则会将在该executor上运行的partition缓存下来对应的就是Block,Spark中对存储内容的读取就是根据Block进行的.本文讨论的是在内存中的缓存,故在缓存的时候会调用到MemoryStore中的实现方法putIteratorAsValues或putIteratorAsBytes,这两个方法的区别在于一个是负责尝试将没有序列化的Java对象放入内存,另一个是试着将序列化的ByteBuffer放入内存,这两个均是尝试待缓存的数据大小是否超过当前剩余的空闲内存,如果未超过则进行存储操作,完成缓存动作.

图3 PageRank的DAG图Fig.3 DAG graph of PageRank

因此可以通过监听,获得准备缓存的分区的大小,公式如下:

Sp=sacquireMemory

(4)

其中,sacquireMemory表示该分区申请内存的大小.

3.4 RDD生命周期LTRDDi

RDD的生命周期(LTRDDi)就是该RDD存活的时间段,如果一个RDD不会再被使用我们可以看做该RDD已经死亡,反之一个RDD还可以通过转换操作产生新的RDD那么该RDD还处于存活状态.对于生命周期的计算可以通过DAG图结合应用的执行过程进行分析.Spark在进行任务处理的时候,计算框架内管理线程级别的Task,在进行任务调度的时候,一个Stage中的Task会被分别调度到计算节点.Task在运行的时候可以看做pipeline.一个RDD由多个partition组成,每个partition经过Transformation操作产生新的RDD中对应的partition,依次进行下去直到该Stage结束.这样的过程便是一个Task的运算过程,当Task在一个Executor上进行运算时,它是串行执行的.故某个RDD的生命周期是该RDD的第一个partition产生后一直到最后一个使用该RDD所在的Stage完成计算的这段时间.如图4所示.

图4 Stage中Task的执行过程图Fig.4 Execution of the Task in the Stage

故对于RDDi的生命周期就是RDD生成第一个分区的时间fpTRDDi与最后一次使用该RDD的Stage完成计算时间eTstagei之差.我们给出的公式如下:

LTRDDi=eTstagei-fpTRDDi

(5)

3.5 RDD计算位置ILRDDi

在Spark中任务的处理也要考虑数据的本地性,Spark目前支持PROCESS_LOCAL(本地进程)、NODE_LOCAL(本地节点)、NO_PREE(没有喜好)、PACK_LOCAL(本地机架)、ANY(任何).Spark的设置中有关于本地进程、本地节点和本地机架等的等待时间.当某一个Task在根据自己的locality level执行时,由于启动失败,然后以自己当前的locality level等待第二次被调度,若等待时间超过了该本地化级别的默认等待时间,则该Task会通过降低自己的locality level来尝试被再次启动.我们知道,对于很多Task来说,执行时间往往比网络传输和磁盘I/O的耗时要短得多.本文将不同的Locality Level进行量化依次是:1、2、3、4、5,随着数字的增大代表着Task计算节点与Task的输入数据的节点距离越来越远,这样也就产生了网络传输对Task执行时间的影响,进而也影响着RDD的计算效率.通过在Spark源码中内嵌代码,统计RDDi的各个partition对应的Task运行的Locality Level,然后进行叠加计算出RDDi的输入RDD位置值.由于在Task运行的过程中,是在一个executor上进行的串行式的计算,所以除Task起始的那个RDD其他RDD的输入位置都是本地进程.

由此,我们给出公式如下:

(6)

其中,pLk表示RDDi中的第k个分区的输入节点的Locality Level.

4 基于WCSRP模型的缓存替换算法

由公式(1)-公式(6)可得:

(7)

根据公式(7)我们提出基于WCSRP模型的缓存替换算法.具体操作如算法1所示.

算法1. 基于WCSRP模型的缓存替换算法

输入: RDD分区权重集合 wList

空闲缓存大小 freememory

待缓存RDD分区的大小 size

待缓存RDD分区的权重 weight

输出:缓存成功 true; 缓存失败 false

初始化:

//待替换列表

rptlist<-new List

//待替换列表的总大小

rptlistsize = 0

//替换列表

rplist<-new List

for i=0 to wList.length-1 do

if weight > wList[i].weight then

rptlist.add(wList[i]);

rptlistsize+=wList[i].size;

end if

end for

if rptlist.Length==0 then

return false;

end if

if rptlistsize+freememory

rptlist.clean();

return false;

else

rptlist.orderByWeight(); // 重排序

for i=0 to rptlist.Length-1 do

rplist.add(rptlist[i]);

freememory+=rptlist[i].size;

if freememory>size then

for j=0 to rplist.Length-1 do

//移除替换列表中的Block

delete(rplist[j]);

end for

rplist.clean();

return true;

end if

end for

rptlist.clean();

return false;

end if

算法1的具体实施过程如下:

1.首先,获取需要缓存的RDD分区的大小和该RDD分区的权重值.

2.然后对缓存中的Block进行过滤,将权重值小于待缓存RDD分区权重值的加入待替换列表.

3.若待替换列表为空,则停止替换,不缓存该RDD的分区.若不为空,判断待替换列表的大小和空闲空间的和是否小于申请大小,若小于则停止替换并清空待替换列表,若大于则将列表按权重值从小到大的顺序排列.

4.遍历待替换列表中的Block,依次加入替换列表,直到空闲内存与替换列表中Block总大小的和大于等于待缓存RDD分区的大小,则停止遍历,将替换列表中的Block替换出缓存,将待缓存RDD分区加入缓存,并清空替换列表和待替换列表.

5.若待替换列表中的Block的总大小与空闲内存空间的大小的和小于待缓存RDD分区的大小,则停止替换,不缓存该RDD的分区,清空待替换列表.

5 理论分析与实验评价

5.1 理论比较分析

当前Spark权重缓存替换算法,通常涉及RDD计算代价、RDD使用频率、RDD分区大小和RDD生命周期等关键参数.下面我们将本文提出的WCSRP与文献[9]提出SACM、文献[13]提出的DWRP以及Spark平台现有的置换算法LRU进行比较.如表1所示.

表1 相关缓存置换算法对比Table 1 Comparison of correlation cache replacement algorithms

对于SACM,计算权重时考虑了RDD使用频率和RDD计算代价,提供参数校准.和本文提出的WCSRP相比,考虑的因素不够全面,另外其在进行内存置换时的置换目标是RDD,当出现内存置换时,会影响其他Task的执行.因此,在采用本文WCSRP替换算法,不会出现Task运行时所使用的RDD分区被替换出去需要重新计算的情况,应用的运行时间会比SACM有所降低.当运行长作业时,降低的幅度尤为明显.

对于DWRP,计算权重时考虑的因素有RDD使用频率、RDD分区大小和RDD生命周期.和WCSRP相比较,其在进行权重计算时仅考虑了三个因素,而RDD计算代价和Locality Level这两个因素对应用的执行时间有着很大影响,另外权重计算公式中没有提供参数校准.所以最终计算得到的权重值没有本文提供的公式计算的权重值精确,在进行内存置换时对Spark作业执行效率的提升没有本文WCSRP明显.

最后,对于Spark自带的LRU置换算法,在进行内存置换时忽略了RDD分区的差异性,仅考虑当前内存中的RDD分区的访问顺序,而当高重用但最近未使用或重新计算耗费极高代价的RDD的分区被替换出去,应用执行的耗时增加,整个框架作业运行效率会降低.而本文的WCSRP替换算法综合考虑了影响RDD分区的多个因素,使得在进行替换时不是盲目的仅根据当前的访问顺序.这样可以省去不必要的重新计算所花费的时间,提高应用的运行效率.

5.2 实验评价

为了实验验证WCSRP,我们在Spark平台上实现了WCSRP,WCSRP各因素初始的权重值是A={а0,а1,а2,а3,а4}={0.3,0.2,0.2,0.2,0.1}.为了与本文提出的WCSRP形成对照,我们选取Spark自带的缓存替换算法LRU来作对比.

实验在一台服务器DELL PowerEdge T620(Intel Xeon E5-2620*2/32G/1T)上进行,操作系统使用Debian 8.7,在服务器上虚拟出了4台主机,利用这四台主机搭建了Spark集群.其中一个主机作为Spark的Master节点,其他三个作为worker节点.应用提交后的运行时间可以通过Spark的日志文件得到.实验数据选择由SNAP提供的标准数据集Amazon0601,实验是测试使用不同的缓存替换机制,在不同迭代次数下PageRank算法的执行时间.选择PageRank算法来进行实验的原因是因为PageRank算法是典型的数据密集型算法,会涉及到多次迭代,当使用缓存后会有效的提升计算的效率.

首先测试PageRank算法使用Spark默认的缓存替换算法,记录不同的迭代次数进行实验的结果,每个迭代次数进行5次实验,分别记录下执行的时间,然后得出平均值便是该迭代次数的执行时间.然后使用新的权重缓存替换算法进行同样的实验,对记录的数据求平均值.实验结果如表2所示.

表2 LRU和WCSRP对比试验的统计结果Table 2 Result of Comparative test between LRU and WCSRP 单位:秒(S)

通过上表,便可得到图5.

图5 PageRank使用不同缓存替换算法对比试验Fig.5 PageRank uses different cache replacement algorithms to experiment

实验结果显示,在相同的迭代次数使用不同的缓存替换算法的情况下,PageRank在Spark框架中的执行时间是不同的.尽管WCSRP和LRU均随着迭代次数的增加,执行时间也在增加.但是使用WCSRP算法作为缓存替换算法时应用的执行时间明显比使用LRU算法作为缓存替换算法时有所降低.因为在使用LRU算法时,仅仅只是考虑内存中的Block被访问的时间,长期未被访问的便会被置换出去,它忽略了该Block对整个应用的价值.而在使用WCSRP算法时我们不仅综合考虑RDD的计算代价、使用次数、分区的大小和生命周期四大因素对权重的影响,而且还增加考虑了Task执行时locality level这个因素,准确计算出了缓存中的每个Block的权重,将对该应用执行最有价值的Block继续缓存在内存中,防止在需要的时候进行重复计算,使得有限的内存资源得到了充分的利用,减少了重复计算的出现,也便提高了整个应用执行的效率.所以才出现了实验结果中的执行时间减少的现象.

6 总 结

本文提出了新的权重缓存替换算法,综合考虑了影响RDD缓存的各大因素,改进了之前提出的RDD权重缓存替换策略,用细粒度的RDD分区权重值来作为替换的度量标准,改进了各影响因素的量化方式.通过这种方法,提高了Spark框架应用执行的效率,减少了因内存异常导致任务执行失败的情况出现.理论和实验结果均证明了新的权重缓存替换算法对Spark框架有着明显的优化作用.

猜你喜欢
代价列表分区
贵州省地质灾害易发分区图
上海实施“分区封控”
学习运用列表法
扩列吧
爱的代价
幸灾乐祸的代价
代价
大型数据库分区表研究
大空间建筑防火分区设计的探讨
列表画树状图各有所长