1. 持久化
action操作:
常用action:reduce
, count
, take
, collect
,countByKey
, saveAsTextFile
凡是actoin级别的操作都会触发sc.runjob( )
一个spark应用程序可以有很多个job,hadoop中只有一个
Spark所有的算法 都有persit
。
persit原因: Spark在默认情况下,数据放在内存,适合高速迭代,风险当然也高,所以需要将前面的操作进行persit.
持久化:
在必要步骤需要persist
和cache
persist
:
- 某步骤计算特别耗时,重新算代价大;
- 计算链条特别长的情况;
checkpoint
所在的RDD(之前)也一定要持久化数据;shuffle
之后(网络传输);- shuffle之前(框架默认帮助我们把数据持久化到本地磁盘)
前4步手动的,第5是系统自动的
持久化实际上是指定存储级别。
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
STORAGE_LEVEL:
MEMORY_ONLY
只放在内存,容易内存溢出(OOM)
MEMORY_AND_DISK
优先考虑内存,内存不够放硬盘
cache是persist的一种特殊情况(源码中调用了persist),cache不是action。因为运行后没有执行作业;cache之后一定不能立即有其他算子,如下:
sc.textFile("/library/wordcount/input/Data")
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_+_,1)
.cache.count
val cached = sc.textFile("/library/wordcount/input/Data")
.flatMap(_.split(" "))
.map(word => (word, 1))
.cache //变量保存起来
销毁cache可以强制unpersist
persist是lazy级别的,unpersist是eager级别(立即)
2. 广播
同步(广播过去的内容不可修改)、减少通信、冗余、大变量、join等原因
广播放到executor的内存中。减少网络传输,节省内存,减少OOM的可能。
广播是由
Driver
发给当前Application
分配的所有Executor
内存级别的全局只读变量,Executor
中的线程池中的线程共享该全局变量,极大的减少了网络传输(否则的话每个Task都要传输一次该变量)并极大的节省了内存,当然也隐性的提高了cpu的有效工作
Executor共享数据通过HDFS或Tachyon, Executor是一个进程,资源独立不能共享
广播变量到executor中示例:
val number = 10
val broadcastNumber = sc.broadcast(number) //声明广播变量
val data = sc.parallelize(1 to 10000)
val bn = data.map(_ * broadcastNumber.value)
3. 累加器
累加器,全局级别的
Accumulator:对于Executor只能修改但不可读,只对Driver可读
accumulator通过SparkContext创建,内部有锁,累加器全局唯一,每次操作只增不减
scala> val sum = sc.accumulator(0)
scala> val data= sc.parallelize(1 to 5)
scala> val result = data.foreach(key => sum+=1)
scala> println(sum)
5