一、Spark Dataset简述
Dataset是从Spark 1.6开始引入的一个新的抽象,当时还是处于alpha版本;在Spark 2.0,它已经变成了稳定版了。下面是Dataset的官方定义:
Dataset是特定域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作。每个Dataset都有一个称为DataFrame的非类型化的视图,这个视图是行的数据集。
RDD也是可以并行化的操作,Dataset和RDD主要的区别是:Dataset是特定域的对象集合;然而RDD是任何对象的集合。DataSet的API总是强类型的,可以利用这些模式进行优化,然而RDD却不行。
Dataset的定义中还提到了DataFrame,DataFrame是特殊的Dataset,它在编译时不会对模式进行检测。在未来版本的Spark,Dataset将会替代RDD成为我们开发编程使用的API(注意,RDD并不是会被取代,而是会作为底层的API提供给用户使用)。
二、Dataset与RDD的persist默认级别的区别
首先来看下RDD的persist默认存储级别:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
可以看出RDD的persist默认存储级别为MEMORY_ONLY,即保存在内存中。
再来看Dataset的persist:
/**
* Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
*
* @group basic
* @since 1.6.0
*/
def persist(): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this)
this
}
可看出,自spark 1.6.0 版本引入Dataset后,Dataset的persist默认存储级别与RDD不同,为MEMORY_AND_DISK(同时保存于内存和磁盘),再深入查看:
/**
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQuery(
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sparkSession = query.sparkSession
cachedData +=
CachedData(
planToCache,
InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName))
}
}
从 cacheQuery 方法的第3个参数 storageLevel 中可以得知,默认级别为MEMORY_AND_DISK。