Spark(Java+MySQL)中的性能调优
最近这段时间都在做Spark作业的性能调优,其中遇到不少问题与对spark框架的误解,所以借此机会总结一下。
目录
作业背景
这是一个财务系统中的账单计算任务,处理的数据量是十亿级的,数据库有分库的处理,使用的是MySQL 5.7,应用spark框架的2.0.2版本以JAVA作为载体进行定时任务处理。优化前没有任何数据支撑,只知道整个计算任务耗时很长,最长时达到8个小时。
优化伊始
没有数据支撑是优化的一个噩梦,因为完全没有切入点了。为了解决这个问题,我们需要添加日志输出,统计分段耗时和数据量之间的关系。这个状态下,最合适的是使用AOP的方式织入方法间耗时。然而,一方面为了保持任务的轻量级和性能,我们没有使用诸如Spring之类的框架;另一方面,为了获得多个不破坏代码结构的跨方法耗时,AOP的切入点有点难以实现,因此我自制了一个工具类PerformanceUtils。
PerformanceUtils
这个工具类的设计思路很简单,我需要很随意的定义切入点,切入点之间可能会重叠,但我需要区分指定切入点的耗时。
比如上图,切入点3的开始和结束分别位于CalcClass3和CalcClass1,并且同一个类有可能有多个线程在执行。因此,我需要一个线程本地变量记录每个切入点的状态。另外,由于我需要关心的应该是耗时较高的任务执行情况,所以我需要设置一个阈值,当超过这个阈值时,方法在日志中可以给我提示。个人觉得这个工具类复用率还是挺高的,代码如下:
/**
* 效率管理工具
* @author william.liang
*
*/
public class PerformanceUtils {
private PerformanceUtils() {
}
private static ThreadLocal<Timer> localTimer = new ThreadLocal<>();
private static ThreadLocal<Map<String, Timer>> localTimerMap = new ThreadLocal<>();
/**
* 记录开始时间
*/
public static void start() {
getTimer().start();
}
/**
* 记录结束时间
*/
public static void end() {
getTimer().end();
}
/**
* 计算耗时
* @return
*/
public static long duration() {
return getTimer().duration();
}
/**
* 清除ThreadLocal
*/
public static void remove() {
localTimer.remove();
localTimerMap.remove();
}
/**
* 记录开始时间
* @param key
*/
public static void start(String key) {
getTimer(key).start();
}
/**
* 记录结束时间
* @param key
*/
public static void end(String key) {
getTimer(key).end();
}
/**
* 计算耗时
* @param key
* @return
*/
public static long duration(String key) {
return getTimer(key).duration();
}
/**
* 清除ThreadLocal
* @param key
*/
public static void remove(String key) {
getTimerMap().remove(key);
}
/**
* 当处理时间超过预定的阈值时发出警告信息
* @param log
* @param threshold 阈值(单位:ms)
*/
public static void warn(Logger log, long threshold) {
if (duration() > threshold) {
log.warn("[Performance Warning] json= msg=任务处理时长超过设定的阈值,总时长为{}ms", duration());
}
}
/**
* 当处理时间超过预定的阈值时发出警告信息
* @param log
* @param key
* @param threshold 阈值(单位:ms)
*/
public static void warn(Logger log, String key, long threshold) {
if (duration(key) > threshold) {
log.warn("[Performance Warning] json= msg=任务【{}】处理时长超过设定的阈值,总时长为{}ms", key, duration(key));
}
}
/**
* 当处理时间超过预定的阈值时发出警告信息
* @param log
* @param threshold 阈值(单位:ms)
* @param json 需要记录的对象json
*/
public static void warn(Logger log, long threshold, String json) {
if (duration() > threshold) {
log.warn("[Performance Warning] json={} msg=任务处理时长超过设定的阈值,总时长为{}ms", json, duration());
}
}
/**
* 当处理时间超过预定的阈值时发出警告信息
* @param log
* @param key
* @param threshold 阈值(单位:ms)
* @param json 需要记录的对象json
*/
public static void warn(Logger log, String key, long threshold, String json) {
if (duration(key) > threshold) {
log.warn("[Performance Warning] json={} msg=任务【{}】处理时长超过设定的阈值,总时长为{}ms", json, key, duration(key));
}
}
/**
* 记录结束时间并当处理时间超过预定的阈值时发出警告信息,最后清除
* @param log
* @param threshold 阈值(单位:ms)
*/
public static void endWithWarnAndRemove(Logger log, long threshold) {
end();
warn(log, threshold);
remove();
}
/**
* 记录结束时间并当处理时间超过预定的阈值时发出警告信息,最后清除
* @param log
* @param key
* @param threshold 阈值(单位:ms)
*/
public static void endWithWarnAndRemove(Logger log, String key, long threshold) {
end(key);
warn(log, key, threshold);
remove(key);
}
/**
* 记录结束时间并当处理时间超过预定的阈值时发出警告信息,最后清除
* @param log
* @param threshold 阈值(单位:ms)
* @param json 需要记录的对象json
*/
public static void endWithWarnAndRemove(Logger log, long threshold, String json) {
end();
warn(log, threshold, json);
remove();
}
/**
* 记录结束时间并当处理时间超过预定的阈值时发出警告信息,最后清除
* @param log
* @param key
* @param threshold 阈值(单位:ms)
* @param json 需要记录的对象json
*/
public static void endWithWarnAndRemove(Logger log, String key, long threshold, String json) {
end(key);
warn(log, key, threshold, json);
remove(key);
}
private static Timer getTimer(String key) {
Timer timer = getTimerMap().getOrDefault(key, new Timer());
getTimerMap().putIfAbsent(key, timer);
return timer;
}
private static Timer getTimer() {
Timer timer = localTimer.get();
if (timer == null) {
timer = new Timer();
localTimer.set(timer);
}
return timer;
}
private static Map<String, Timer> getTimerMap() {
Map<String, Timer> timerMap = localTimerMap.get();
if (timerMap == null) {
timerMap = new HashMap<>(16);
localTimerMap.set(timerMap);
}
return timerMap;
}
static class Timer {
private long start;
private long end;
public void start() {
start = System.currentTimeMillis();
}
public void end() {
end = System.currentTimeMillis();
}
public long duration() {
return end - start;
}
}
使用时只需要简单的用start和end方法埋点,就可以使用duration获取耗时,或者使用warn作报警,也可以使用组合方法endWithWarnAndRemove节省代码。
public void method(){
PerformanceUtils.start(); // 切入点,可带参数标识
...
PerformanceUtils.endWithWarnAndRemove(log, 5000L); // 切入点,可带参数标识,当耗时超过5秒报警
}
有了这个工具类,我已经能方便地获取已经埋点的方法耗时,另外通过一些处理量的统计,我发现了一些有趣的问题……
焦点问题
通过上面的工具类,我们发现最高的耗时在于账单头的计算。分析其原因可以得出经典的Spark话题:transform算子与action算子。因为我们实际的action是放在账单头的处理上的,所以他的计算量很大,耗时很长。但还有一个问题引起了我的注意——算子代码块执行时间与方法开始时间的差有点大,换句话说,我们在transform算子中的处理量过大。由于我们大量使用了Spark SQL作为我们的逻辑关联工具,所以一开始的时候,我更加关注的是它的执行计划。但偶然的一次线上问题令我注意到,在该任务执行期间,出现了大量的慢查询情况,超过200条以上的查询300秒以上的耗时。
慢查询
刚刚转向spark的我们,天真的以为spark是大进大出的处理框架,我们需要做的是把数据从MySQL抽到spark中,让他在内存中运算后把结果持久化回MySQL就OK了。而这个观点让我们想当然地认为我们不会在spark任务中遇到慢查询。然而这次意外的相遇,让我意识到必须了解一下spark的处理机制。
Spark每次运行时,会开启一个Spark UI,通过这个UI,我们可以观察到任务执行的情况,尤其是使用spark SQL的时候,相应的执行计划会显示。
根据Spark UI的物理执行计划,我看到最底端都出现了这个策略“Scan JDBCRelation”,也就是在这个处理节点上,spark是需要通过JDBC到数据库中读取数据的。而根据代码逻辑,实际产生这个读取的代码为
sparkSession.read().jdbc(url, tableName, predicates, connectionInfo);
并且这个策略后,通常带有PushedFilters,而里面的条件正式我们在Spark SQL中的where的条件,而慢查询就是出现在这个物理计划的策略执行时。
JDBC查询的组成
虽然我们知道了,执行计划中,能看到最终的读数据库的策略,但是,这个语句是如何构成的呢?
唯品里有一个专门监控数据库的工具,可以方便地看到慢查询的情况,这个在阿里云的数据云服务也是可以看得到的,另外最简单的就是使用数据库自带的slow_query_log参数相关设置查看。
从实际的慢查询最终语句与执行计划,依据代码反推得出实际语句构成如下:
以上流程是推测的逻辑流程,并非spark的执行流程,spark是根据实际物理执行计划执行的,而transform时就会组装和生成执行计划了。
关于cache
从上面推测的逻辑流程,我们发现,cache其实是可以截断Spark SQL的查询条件的,而我们面对的慢查询问题通常是各类的索引问题。参考美团的MySQL索引原理及慢查询优化,我发现我们的索引问题正好存在于有两个范围查询外加一个或查询。这种查询的索引按照美团的总结来说,是无法建立有效的索引的。于是我萌生出使用cache来截断另一个范围查询条件的想法。然而,当真正使用cache的时候,我们遇到了一个更加严重的问题——资源占用问题。
cache锁
根据spark的源码,cache的操作是有writeLock的:
/**
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQuery(
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sparkSession = query.sparkSession
cachedData +=
CachedData(
planToCache,
InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName))
}
}
这个writeLock会导致Driver端如果有多线程运行时,存在锁等待的情况。因此,有可能的情况下Driver端应该避免有大量的数据处理操作,数据处理应当分散到每个分区action中去。
cache级别
我们知道,spark的cache分了7个级别,一般来说,只要内存够用的情况下,我们最好还是保存到MEMORY_ONLY,其他级别基本上都会引入序列化消耗等问题。
然而使用MEMORY_ONLY时,我们需要额外注意相关的几个Spark参数配置。这些参数,都会影响到实际cache时会不会有更多的性能开销。
最后,我们要记得用完这个cache的数据后,一定要把他unpersist掉,不然内存和硬盘的资源都可能耗光,导致程序运行假死。别问我上面为什么会知道不unpersist会导致假死……
合理使用cache
如多数人在spark调优的经验中说的一样,我们应该只在该数据集需要重复计算的情况下才将其cache。因为cache在实际执行时,也是非常昂贵的操作,特别是需要序列化和持久化到硬盘的时候。
那怎么判断是否需要重复计算呢?简单地说,当你的spark SQL使用到同一个view超过1次,或者使用action算子(如foreachPartition、count等)超过1次时。
predicates
使用cache截断查询条件后,不仅存储资源消耗异常,慢查询的效果也不理想,主要原因是因为条件收束效果较差,导致查询后的数据集过大。这时候,我转向了另一个逻辑流程的步骤——predicates。
Predicates是Spark提供的JDBC API中一个分区参数,也有一个替换接口直接给出字段和上下边间值,然后按给定分区数划分。这里选择使用predicates这个接口主要是因为我们会将一下附加条件添加到分区条件中取,做到最大限度限制拉取到spark的数据数量的效果。
各位看官没有看错,我是要限制实际拉取到spark的数据数量,和我们之前的看法“大进大出”相反,我们是尽量仅读需要计算的数据,也就是从十亿级的数据中过滤出所需要的数据。因为无论spark多强大,spark集群的机器有多牛,如果数据进入了spark的内存,还是得占地方,还是得占CPU处理时间,所以我们调优的时候,要保证输入数据尽量少并包含符合所有需要计算的数据的全集合。
Spark分区的本意是为了实现Map-Reduce模型,更好的实现基础算法中的分治、动态规划和贪婪;而这个刚好触及到我们过滤所需的数据的需求。为了解决慢查询的问题,我们可以建立支持获取最小集的索引,然后用边界值来获取最大最小ID,通过限制最优primary key索引来过滤所需数据。这个就类似于将Spark的另一个API中的columnName替换成我们需要过滤的条件。比方说,我们在数据来源表,需要过滤符合处理的日期的某供应商的账单,就需要过滤tx_date和vendor在符合条件的最大最小ID,然后使用对应的where条件来过滤:
where id between minId and maxId and vendor='abc' and tx_date between yesterday and today
获取的最大和最小ID中,可能存在上千万的数据,所以我们在具体分区中,可以以primary key的最佳查询量——10000为基准划分。
那么实际到达MySQL的语句应该为:
select exact_fields from source_table where id between minId and minId+10000 and vendor='abc' and tx_date between yesterday and today and other_condition_from_spark_sql
select exact_fields from source_table where id between partition_minId and partition_minId+10000 and vendor='abc' and tx_date between yesterday and today and other_condition_from_spark_sql
...(忽略中间多个分区)
select exact_fields from source_table where id between partition_minId and maxId and vendor='abc' and tx_date between yesterday and today and other_condition_from_spark_sql
因为每个分区的数据都是需要计算的数据,所以性能并没有浪费,而每个分区的查询语句因为使用的是primary key,所以速率都在十毫秒级别。全部分区加起来,就算是亿级数据,也只是10000个分区,大概十秒级的处理时间,并且数据分布相对比较均匀。至此,慢查询的问题在准确分区下解决了。
慢查询调优小结
- 无论使用MySQL也好,其他数据源也好,我们在读取的时候,应该尽量精确定位计算所需数据;
- 针对关系型数据库的数据源,可以通过必要数据所属查询条件建立得索引快速定位ID区段,按10000条数据进行分区;
- 有重复使用的数据集进行需要cache,没有重复使用需要的数据集谢绝cache。
Spark计算速度优化
在解决慢查询问题后,整个账单计算大概提速了40%,所以要再优化的话,就需要继续深入复杂的运算逻辑中,优化每个算子的计算速度了。
UDF
由于我们使用了大量的Spark SQL,一些过程数据,我们就得放在UDF中处理了。这就导致了每个UDF的开销需要尽量压榨,比如说,尽早排除不符合运算条件的数据,避免不必要的运算,避免重复运算同一个UDF(听说新版本中的Catalyst可以自定义UDF解释,但不知道能否合并同样的UDF运算)。
foreachPartition算子
使用这个算子的时候,通常是我们需要把数据落库的时候。这里推荐使用SQL的批量操作,减少数据库IO。另外,如果有其他操作,也是前面的原则,尽量减少运算量。
count算子
尽量不要使用的算子,因为就算数据量很少,还是需要再默认最少200个分区中获取结果,这个是非常消耗性能的。如果有统计需要,尽量使用Accumulator,在foreachPartition的时候同时做好统计的处理。如果只是需要判断是否空结果集,可以直接使用rdd的API——isEmpty。
Rdd.isEmpty
这个API使用的方法take(n)的comment有一个很恐怖的注释,大意是这个n的数据会实际传输到driver端,所以尽量控制这个数据的大小,减少IO。然而因为多分片的原因,take方法会动态扩展搜索的分区,所以最坏情况下,可能会返回分区数量一般的数据到driver端,所以使用isEmpty时,计算控制分区数量还是空分区的情况。
Spark计算速度优化小结
- 无论是算子还是UDF,应该尽量减少运算量,尽早排除不需要的数据,不重复计算已计算的数据;
- 对数据库的操作仍需要注意锁和慢查询的情况。
优化进行时
截止到这篇日志完结时,我们的性能优化仍然持续执行着;而我们对spark的观念也持续的改变着;我们一直在路上。
written by [email protected]