Spark(Java+MySQL)中的性能调优

Spark(Java+MySQL)中的性能调优

最近这段时间都在做Spark作业的性能调优,其中遇到不少问题与对spark框架的误解,所以借此机会总结一下。


目录


作业背景

这是一个财务系统中的账单计算任务,处理的数据量是十亿级的,数据库有分库的处理,使用的是MySQL 5.7,应用spark框架的2.0.2版本以JAVA作为载体进行定时任务处理。优化前没有任何数据支撑,只知道整个计算任务耗时很长,最长时达到8个小时。

优化伊始

没有数据支撑是优化的一个噩梦,因为完全没有切入点了。为了解决这个问题,我们需要添加日志输出,统计分段耗时和数据量之间的关系。这个状态下,最合适的是使用AOP的方式织入方法间耗时。然而,一方面为了保持任务的轻量级和性能,我们没有使用诸如Spring之类的框架;另一方面,为了获得多个不破坏代码结构的跨方法耗时,AOP的切入点有点难以实现,因此我自制了一个工具类PerformanceUtils。

PerformanceUtils

这个工具类的设计思路很简单,我需要很随意的定义切入点,切入点之间可能会重叠,但我需要区分指定切入点的耗时。

Created with Raphaël 2.1.2 CalcClass1 CalcClass1 PerformanceUtils PerformanceUtils CalcClass2 CalcClass2 CalcClass3 CalcClass3 记录切入点1 执行方法1 记录切入点2 执行方法2 记录切入点3 执行方法2完成 记录切入点2完成时间 执行方法1完成 记录切入点3完成时间 记录切入点1完成时间

比如上图,切入点3的开始和结束分别位于CalcClass3和CalcClass1,并且同一个类有可能有多个线程在执行。因此,我需要一个线程本地变量记录每个切入点的状态。另外,由于我需要关心的应该是耗时较高的任务执行情况,所以我需要设置一个阈值,当超过这个阈值时,方法在日志中可以给我提示。个人觉得这个工具类复用率还是挺高的,代码如下:

/**
 * 效率管理工具
 * @author william.liang
 *
 */
public class PerformanceUtils {
    private PerformanceUtils() {
    }

    private static ThreadLocal<Timer> localTimer = new ThreadLocal<>();
    private static ThreadLocal<Map<String, Timer>> localTimerMap = new ThreadLocal<>();

    /**
     * 记录开始时间
     */
    public static void start() {
        getTimer().start();
    }

    /**
     * 记录结束时间
     */
    public static void end() {
        getTimer().end();
    }

    /**
     * 计算耗时
     * @return
     */
    public static long duration() {
        return getTimer().duration();
    }

    /**
     * 清除ThreadLocal
     */
    public static void remove() {
        localTimer.remove();
        localTimerMap.remove();
    }

    /**
     * 记录开始时间
     * @param key
     */
    public static void start(String key) {
        getTimer(key).start();
    }

    /**
     * 记录结束时间
     * @param key
     */
    public static void end(String key) {
        getTimer(key).end();
    }

    /**
     * 计算耗时
     * @param key
     * @return
     */
    public static long duration(String key) {
        return getTimer(key).duration();
    }

    /**
     * 清除ThreadLocal
     * @param key
     */
    public static void remove(String key) {
        getTimerMap().remove(key);
    }

    /**
     * 当处理时间超过预定的阈值时发出警告信息
     * @param log
     * @param threshold 阈值(单位:ms)
     */
    public static void warn(Logger log, long threshold) {
        if (duration() > threshold) {
            log.warn("[Performance Warning] json= msg=任务处理时长超过设定的阈值,总时长为{}ms", duration());
        }
    }

    /**
     * 当处理时间超过预定的阈值时发出警告信息
     * @param log
     * @param key
     * @param threshold 阈值(单位:ms)
     */
    public static void warn(Logger log, String key, long threshold) {
        if (duration(key) > threshold) {
            log.warn("[Performance Warning] json= msg=任务【{}】处理时长超过设定的阈值,总时长为{}ms", key, duration(key));
        }
    }

    /**
     * 当处理时间超过预定的阈值时发出警告信息
     * @param log
     * @param threshold 阈值(单位:ms)
     * @param json 需要记录的对象json
     */
    public static void warn(Logger log, long threshold, String json) {
        if (duration() > threshold) {
            log.warn("[Performance Warning] json={} msg=任务处理时长超过设定的阈值,总时长为{}ms", json, duration());
        }
    }

    /**
     * 当处理时间超过预定的阈值时发出警告信息
     * @param log
     * @param key
     * @param threshold 阈值(单位:ms)
     * @param json 需要记录的对象json
     */
    public static void warn(Logger log, String key, long threshold, String json) {
        if (duration(key) > threshold) {
            log.warn("[Performance Warning] json={} msg=任务【{}】处理时长超过设定的阈值,总时长为{}ms", json, key, duration(key));
        }
    }

    /**
     * 记录结束时间并当处理时间超过预定的阈值时发出警告信息,最后清除
     * @param log
     * @param threshold 阈值(单位:ms)
     */
    public static void endWithWarnAndRemove(Logger log, long threshold) {
        end();
        warn(log, threshold);
        remove();
    }

    /**
     * 记录结束时间并当处理时间超过预定的阈值时发出警告信息,最后清除
     * @param log
     * @param key
     * @param threshold 阈值(单位:ms)
     */
    public static void endWithWarnAndRemove(Logger log, String key, long threshold) {
        end(key);
        warn(log, key, threshold);
        remove(key);
    }

    /**
     * 记录结束时间并当处理时间超过预定的阈值时发出警告信息,最后清除
     * @param log
     * @param threshold 阈值(单位:ms)
     * @param json 需要记录的对象json
     */
    public static void endWithWarnAndRemove(Logger log, long threshold, String json) {
        end();
        warn(log, threshold, json);
        remove();
    }

    /**
     * 记录结束时间并当处理时间超过预定的阈值时发出警告信息,最后清除
     * @param log
     * @param key
     * @param threshold 阈值(单位:ms)
     * @param json 需要记录的对象json
     */
    public static void endWithWarnAndRemove(Logger log, String key, long threshold, String json) {
        end(key);
        warn(log, key, threshold, json);
        remove(key);
    }

    private static Timer getTimer(String key) {
        Timer timer = getTimerMap().getOrDefault(key, new Timer());
        getTimerMap().putIfAbsent(key, timer);
        return timer;
    }

    private static Timer getTimer() {
        Timer timer = localTimer.get();
        if (timer == null) {
            timer = new Timer();
            localTimer.set(timer);
        }
        return timer;
    }

    private static Map<String, Timer> getTimerMap() {
        Map<String, Timer> timerMap = localTimerMap.get();
        if (timerMap == null) {
            timerMap = new HashMap<>(16);
            localTimerMap.set(timerMap);
        }
        return timerMap;
    }

    static class Timer {
        private long start;
        private long end;

        public void start() {
            start = System.currentTimeMillis();
        }

        public void end() {
            end = System.currentTimeMillis();
        }

        public long duration() {
            return end - start;
        }
    }

使用时只需要简单的用start和end方法埋点,就可以使用duration获取耗时,或者使用warn作报警,也可以使用组合方法endWithWarnAndRemove节省代码。

    public void method(){
        PerformanceUtils.start(); // 切入点,可带参数标识
        ...
        PerformanceUtils.endWithWarnAndRemove(log, 5000L); // 切入点,可带参数标识,当耗时超过5秒报警
    }

有了这个工具类,我已经能方便地获取已经埋点的方法耗时,另外通过一些处理量的统计,我发现了一些有趣的问题……

焦点问题

通过上面的工具类,我们发现最高的耗时在于账单头的计算。分析其原因可以得出经典的Spark话题:transform算子与action算子。因为我们实际的action是放在账单头的处理上的,所以他的计算量很大,耗时很长。但还有一个问题引起了我的注意——算子代码块执行时间与方法开始时间的差有点大,换句话说,我们在transform算子中的处理量过大。由于我们大量使用了Spark SQL作为我们的逻辑关联工具,所以一开始的时候,我更加关注的是它的执行计划。但偶然的一次线上问题令我注意到,在该任务执行期间,出现了大量的慢查询情况,超过200条以上的查询300秒以上的耗时。

慢查询

刚刚转向spark的我们,天真的以为spark是大进大出的处理框架,我们需要做的是把数据从MySQL抽到spark中,让他在内存中运算后把结果持久化回MySQL就OK了。而这个观点让我们想当然地认为我们不会在spark任务中遇到慢查询。然而这次意外的相遇,让我意识到必须了解一下spark的处理机制。

Spark每次运行时,会开启一个Spark UI,通过这个UI,我们可以观察到任务执行的情况,尤其是使用spark SQL的时候,相应的执行计划会显示。

根据Spark UI的物理执行计划,我看到最底端都出现了这个策略“Scan JDBCRelation”,也就是在这个处理节点上,spark是需要通过JDBC到数据库中读取数据的。而根据代码逻辑,实际产生这个读取的代码为

sparkSession.read().jdbc(url, tableName, predicates, connectionInfo);

并且这个策略后,通常带有PushedFilters,而里面的条件正式我们在Spark SQL中的where的条件,而慢查询就是出现在这个物理计划的策略执行时。

JDBC查询的组成

虽然我们知道了,执行计划中,能看到最终的读数据库的策略,但是,这个语句是如何构成的呢?

唯品里有一个专门监控数据库的工具,可以方便地看到慢查询的情况,这个在阿里云的数据云服务也是可以看得到的,另外最简单的就是使用数据库自带的slow_query_log参数相关设置查看。

从实际的慢查询最终语句与执行计划,依据代码反推得出实际语句构成如下:

Created with Raphaël 2.1.2 开始 是否有视图定义? 获取视图定义 是否有分区? 拼接分区条件 是否有SQL查询条件 拼接SQL查询条件 是否存在cache? 截断cache后的Spark SQL查询条件 根据table名和拼接条件生成最终JDBC查询SQL 结束 抛出异常 yes no yes no yes no yes no

以上流程是推测的逻辑流程,并非spark的执行流程,spark是根据实际物理执行计划执行的,而transform时就会组装和生成执行计划了。

关于cache

从上面推测的逻辑流程,我们发现,cache其实是可以截断Spark SQL的查询条件的,而我们面对的慢查询问题通常是各类的索引问题。参考美团的MySQL索引原理及慢查询优化,我发现我们的索引问题正好存在于有两个范围查询外加一个或查询。这种查询的索引按照美团的总结来说,是无法建立有效的索引的。于是我萌生出使用cache来截断另一个范围查询条件的想法。然而,当真正使用cache的时候,我们遇到了一个更加严重的问题——资源占用问题。

cache锁

根据spark的源码,cache的操作是有writeLock的:

  /**
   * Caches the data produced by the logical representation of the given [[Dataset]].
   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
   * recomputing the in-memory columnar representation of the underlying table is expensive.
   */
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
    val planToCache = query.queryExecution.analyzed
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      cachedData +=
        CachedData(
          planToCache,
          InMemoryRelation(
            sparkSession.sessionState.conf.useCompression,
            sparkSession.sessionState.conf.columnBatchSize,
            storageLevel,
            sparkSession.sessionState.executePlan(planToCache).executedPlan,
            tableName))
    }
  }

这个writeLock会导致Driver端如果有多线程运行时,存在锁等待的情况。因此,有可能的情况下Driver端应该避免有大量的数据处理操作,数据处理应当分散到每个分区action中去。

cache级别

我们知道,spark的cache分了7个级别,一般来说,只要内存够用的情况下,我们最好还是保存到MEMORY_ONLY,其他级别基本上都会引入序列化消耗等问题。

然而使用MEMORY_ONLY时,我们需要额外注意相关的几个Spark参数配置。这些参数,都会影响到实际cache时会不会有更多的性能开销。

最后,我们要记得用完这个cache的数据后,一定要把他unpersist掉,不然内存和硬盘的资源都可能耗光,导致程序运行假死。别问我上面为什么会知道不unpersist会导致假死……

合理使用cache

如多数人在spark调优的经验中说的一样,我们应该只在该数据集需要重复计算的情况下才将其cache。因为cache在实际执行时,也是非常昂贵的操作,特别是需要序列化和持久化到硬盘的时候。

那怎么判断是否需要重复计算呢?简单地说,当你的spark SQL使用到同一个view超过1次,或者使用action算子(如foreachPartition、count等)超过1次时。

predicates

使用cache截断查询条件后,不仅存储资源消耗异常,慢查询的效果也不理想,主要原因是因为条件收束效果较差,导致查询后的数据集过大。这时候,我转向了另一个逻辑流程的步骤——predicates。

Predicates是Spark提供的JDBC API中一个分区参数,也有一个替换接口直接给出字段和上下边间值,然后按给定分区数划分。这里选择使用predicates这个接口主要是因为我们会将一下附加条件添加到分区条件中取,做到最大限度限制拉取到spark的数据数量的效果。

各位看官没有看错,我是要限制实际拉取到spark的数据数量,和我们之前的看法“大进大出”相反,我们是尽量仅读需要计算的数据,也就是从十亿级的数据中过滤出所需要的数据。因为无论spark多强大,spark集群的机器有多牛,如果数据进入了spark的内存,还是得占地方,还是得占CPU处理时间,所以我们调优的时候,要保证输入数据尽量少并包含符合所有需要计算的数据的全集合

Spark分区的本意是为了实现Map-Reduce模型,更好的实现基础算法中的分治、动态规划和贪婪;而这个刚好触及到我们过滤所需的数据的需求。为了解决慢查询的问题,我们可以建立支持获取最小集的索引,然后用边界值来获取最大最小ID,通过限制最优primary key索引来过滤所需数据。这个就类似于将Spark的另一个API中的columnName替换成我们需要过滤的条件。比方说,我们在数据来源表,需要过滤符合处理的日期的某供应商的账单,就需要过滤tx_date和vendor在符合条件的最大最小ID,然后使用对应的where条件来过滤:

where id between minId and maxId and vendor='abc' and tx_date between yesterday and today

获取的最大和最小ID中,可能存在上千万的数据,所以我们在具体分区中,可以以primary key的最佳查询量——10000为基准划分。

那么实际到达MySQL的语句应该为:

select exact_fields from source_table where id between minId and minId+10000 and vendor='abc' and tx_date between yesterday and today and other_condition_from_spark_sql
select exact_fields from source_table where id between partition_minId and partition_minId+10000 and vendor='abc' and tx_date between yesterday and today and other_condition_from_spark_sql
...(忽略中间多个分区)
select exact_fields from source_table where id between partition_minId and maxId and vendor='abc' and tx_date between yesterday and today and other_condition_from_spark_sql

因为每个分区的数据都是需要计算的数据,所以性能并没有浪费,而每个分区的查询语句因为使用的是primary key,所以速率都在十毫秒级别。全部分区加起来,就算是亿级数据,也只是10000个分区,大概十秒级的处理时间,并且数据分布相对比较均匀。至此,慢查询的问题在准确分区下解决了。

慢查询调优小结

  • 无论使用MySQL也好,其他数据源也好,我们在读取的时候,应该尽量精确定位计算所需数据;
  • 针对关系型数据库的数据源,可以通过必要数据所属查询条件建立得索引快速定位ID区段,按10000条数据进行分区;
  • 有重复使用的数据集进行需要cache,没有重复使用需要的数据集谢绝cache。

Spark计算速度优化

在解决慢查询问题后,整个账单计算大概提速了40%,所以要再优化的话,就需要继续深入复杂的运算逻辑中,优化每个算子的计算速度了。

UDF

由于我们使用了大量的Spark SQL,一些过程数据,我们就得放在UDF中处理了。这就导致了每个UDF的开销需要尽量压榨,比如说,尽早排除不符合运算条件的数据,避免不必要的运算,避免重复运算同一个UDF(听说新版本中的Catalyst可以自定义UDF解释,但不知道能否合并同样的UDF运算)。

foreachPartition算子

使用这个算子的时候,通常是我们需要把数据落库的时候。这里推荐使用SQL的批量操作,减少数据库IO。另外,如果有其他操作,也是前面的原则,尽量减少运算量。

count算子

尽量不要使用的算子,因为就算数据量很少,还是需要再默认最少200个分区中获取结果,这个是非常消耗性能的。如果有统计需要,尽量使用Accumulator,在foreachPartition的时候同时做好统计的处理。如果只是需要判断是否空结果集,可以直接使用rdd的API——isEmpty。

Rdd.isEmpty

这个API使用的方法take(n)的comment有一个很恐怖的注释,大意是这个n的数据会实际传输到driver端,所以尽量控制这个数据的大小,减少IO。然而因为多分片的原因,take方法会动态扩展搜索的分区,所以最坏情况下,可能会返回分区数量一般的数据到driver端,所以使用isEmpty时,计算控制分区数量还是空分区的情况。

Spark计算速度优化小结

  • 无论是算子还是UDF,应该尽量减少运算量,尽早排除不需要的数据,不重复计算已计算的数据;
  • 对数据库的操作仍需要注意锁和慢查询的情况。

优化进行时

截止到这篇日志完结时,我们的性能优化仍然持续执行着;而我们对spark的观念也持续的改变着;我们一直在路上。

written by [email protected]

猜你喜欢

转载自blog.csdn.net/vipshop_fin_dev/article/details/80243136