说RDD持久化之前,先来了解一下惰性机制。
RDD在设计时采用了惰性机制的特性,指的是转换RDD的过程先记录而不发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。举例说明:
假设/mnt/下又一个文件word.txt,内容如下:
Hadoop is good Spark is fast Spark is better
代码:
from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("My App") sc = SparkContext(conf=conf) lines = sc.textFile("file:///mnt/word.txt") # 记录,并不执行 lineLengths = lines.map(lambda s:len(s)) # 记录,并不执行 totalLength = lineLengths.reduce(lambda a, b: a + b) # 开始执行!
为了看着更清晰,代码不妨写成:
# 记录,并不执行。 rdd1 = sc.textFile("file:///mnt/word.txt") # 是一个RDD对象。 # 记录,并不执行。 rdd2 = rdd1.map(lambda s:len(s)) # 是一个RDD对象。 # 开始执行! totalLength = rdd2.reduce(lambda a, b: a+b) # 是个数字。 """ 打印验证 """ print(rdd1) # file:///mnt/word.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 print(rdd2) # PythonRDD[5] at RDD at PythonRDD.scala:53 print(totalLength) # 42 # 调用RDD自带的函数,来取出rdd1和rdd2对象中的值 rdd1.foreach(print) # Spark is better Hadoop is good Spark is fast rdd2.foreach(print) # 14 13 15
上面代码中,
(i)第1行语句中的textFile()是一个转换操作(函数返回一个RDD对象),系统只会记录这次转换,并不会真正读取word.txt文件的数据到内存中;
(ii)第2行语句的map()也是一个转换操作(函数返回一个RDD对象),系统只是记录这次转换,不会真正执行map()方法;
(iii)而第3行语句的reduce()是一个“行动”类型的操作(函数返回一个整型数字),这时系统会生成一个作业,触发真正的计算。也就是说,这时才会加载word.txt的数据到内存,生成RDD。
在Spark中,RDD采用惰性求值的机制。导致每次遇到“行动”操作,都会从头开始执行计算(即每次调用行动操作,都会触发一次从头开始的计算),这对于迭代计算而言,代价是很大的,影响效率(因为迭代计算经常需要多次重复使用同一组数据)。下面是多次计算同一个RDD的例子:
li = ["Hadoop", "Spark", "Hive"] rdd = sc.parallelize(li) # 记录操作。生成一个RDD print(rdd.count()) # 行动操作,触发一次真正的从头到尾的计算。运行结果:3 print(','.join(rdd.collect())) # 行动操作,再触发一次真正的从头到尾的计算。运行结果:'Hadoop', 'Spark', 'Hive' # 注:rrd.collect()是以数组形式返回数据集中的所有元素。结果:['Hadoop', 'Spark', 'Hive']
为了避免这种重复计算的开销,可以使用RDD的持久化(缓存),方法是使用persist()函数将一个RDD标记为持久化。注意:之所以“标记为持久化”,是因为出现persist()语句的地方并不会马上计算生成RDD并把它持久化,而是要对等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化。持久化后的RDD将会被保留在计算节点的内存中,被后面的行动操作重复时候。
persist()使用的时候有两种参数供选择:
上面的例子,增加持久化缓存语句:
from pyspark import SparkConf, SparkContext from pyspark.storagelevel import StorageLevel # 创建SparkConf对象,并给对象赋值 conf = SparkConf().setMaster("local").setAppName("My app") # 创建SparkContext对象,不妨命名为sc sc = SparkContext(conf=conf) """ spark创建的sc,其功能之一是调用自带的parallelize()函数来加载自定义的变量来创建RDD,如下面的 sc.parallelize: (sc还有如加载文件textFile()等其他很多函数和功能) """ li = ["hadoop", "spark", "hive"] rdd = sc.parallelize(li) # 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存 rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。 # 等价 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY) # rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) # --内存+磁盘,超出内存则存硬盘方式。 # 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。 print(rdd.count()) # 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd print(','.join(rdd.collect())) # 解除标记。释放名为rdd的RDD对象在内存中的缓存空间 rdd.unpersist()
注:持久化RDD会占用内存空间,当不再需要一个RDD时,就可以使用unpersist()函数手动地把持久化的RDD从缓存中移除,释放内存空间。
注意,上面标记为仅内存执行rdd.persist() 或 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY) 后,要想重新标记为内存+磁盘执行 rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) ,需要先执行rdd.unpersist()释放标记!!!否则报错!
实际开发中,我们使用cache()方法就会自动调用persist(MEMORY_ONLY),我们一般用rdd.cache()或rdd.persist()即可,不用再导包from pyspark.storagelevel import StorageLevel
来传参,通过查看cache()和persist()源码,可以看到这两个方法会自动导入包。
重点!!RDD持久化 实际开发代码,一般写法如下:
from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("My app") sc = SparkContext(conf=conf) li = ["hadoop", "spark", "hive"] rdd = sc.parallelize(li) # 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存 rdd.cache() # 会调用persist(MEMORY_ONLY) # 或 rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。 # 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。 print(rdd.count()) # 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd print(','.join(rdd.collect())) # 解除标记。释放名为rdd的RDD对象在内存中的缓存空间 rdd.unpersist()
附:cache()和persist()函数的源码。在Anaconda的site-packages/pyspark/rdd.py文件: