一、使用kryo序列化 ----------------------------------------------- 1.为什么要使用kryo序列化方式? a.默认情况下,使用的java原生的序列化手段,ObjectInputStream/ObjectOutputStream对象输入输出流的方式来进行序列化 b.这种默认序列化机制的好处在于,处理起来比较方便;也不需要我们手动去做什么事情,只要你在算子里面使用的变量,实现了Serializable接口的即可。 但是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢;序列化以后的数据,占用的内存空间相对还是比较大。 c.Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10 所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。 2.Kryo序列化机制,一旦启用以后,会生效的几个地方: a.算子函数中使用到的外部变量 算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗 b.持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER 持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。 c.shuffle 可以优化网络传输的性能 3.如何使用? a.SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") b.在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类; c.Kryo之所以没有被作为默认的序列化类库的原因,主要是因为Kryo要求,如果要达到它的最佳性能的话,那么就一定要注册你自定义的类 (比如,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,否则Kryo达不到最佳性能)。 d.注册你使用到的,需要通过Kryo序列化的,一些自定义类,SparkConf.registerKryoClasses() f.项目中的使用: //1.构建Spark上下文,指定kryo序列化格式 SparkConf conf = new SparkConf() .setAppName(Constants.SPARK_APP_NAME_SESSION) .setMaster("local") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //2.注册涉及到序列化的类,有一个添一个 SparkConf conf = new SparkConf() .setAppName(Constants.SPARK_APP_NAME_SESSION) .setMaster("local") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(new Class[]{CategorySortKey.class}); 二、使用fastutil优化数据格式 --------------------------------------- 1.fastutil介绍 a.fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库, 提供了特殊类型的map、set、list和queue; b.fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己 平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,可以减小内存的占用,并且在 进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度; c.fastutil也提供了64位的array、set和list,以及高性能快速的,以及实用的IO类,来处理二进制和文本类型的文件; d.fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实现了Java的Map接口), 因此可以直接放入已有系统的任何代码中。fastutil还提供了一些JDK标准类库中没有的额外功能(比如双向迭代器)。 e.fastutil除了对象和原始类型为元素的集合,fastutil也提供引用类型的支持,但是对引用类型是使用等于号(=) 进行比较的,而不是equals()方法。 f.fastutil尽量提供了在任何场景下都是速度最快的集合类库。 2.Spark中应用fastutil的场景 如果算子函数使用了外部变量 首先,boardcast变量 其次,kryo序列化 再次,如果外部变量是比较大的集合,可以使用fastutil改写集合,从源头上减少开销 3.fastutil的使用 第一步:在pom.xml中引用fastutil的包 <dependency> <groupId>fastutil</groupId> <artifactId>fastutil</artifactId> <version>5.0.9</version> </dependency> List<Integer> => IntList 基本都是类似于IntList的格式,前缀就是集合的元素类型;特殊的就是Map,Int2IntMap,代表了key-value映射的元素类型。 除此之外,还支持object、reference。但是不支持String 三、调节本地化等待时长 ----------------------------------------- 1.数据本地化 a.PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中; 计算数据的task由executor执行,数据在executor的BlockManager中;性能最好 b.NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,就在节点上, 而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中; 数据需要在进程间进行传输 c.NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分 d.RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输 e.ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差 对于以上5中级别,spark首选是第一种,当第一种不能实现,就执行本地化等待时长,比如3s,如果等待之后发现还是不行,就降级 以此类推 2.原理 Spark在Driver上,对Application的每一个stage的task,进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partition; Spark的task分配算法,优先,会希望每个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络间传输数据; 但是呢,通常来说,有时,事与愿违,可能task没有机会分配到它的数据所在的节点,为什么呢,可能那个节点的计算资源和计算能力都满了; 所以呢,这种时候,通常来说,Spark会等待一段时间,默认情况下是3s钟(不是绝对的,还有很多种情况,对不同的本地化级别,都会去等待), 到最后,实在是等待不了了,就会选择一个比较差的本地化级别,比如说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,然后进行计算。 但是对于第二种情况,通常来说,肯定是要发生数据传输,task会通过其所在节点的BlockManager来获取数据,BlockManager发现自己本地没有数据, 会通过一个getRemote()方法,通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,通过网络传输回task所在节点。 对于我们来说,当然不希望是类似于第二种情况的了。最好的,当然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据, 纯内存,或者带一点磁盘IO;如果要通过网络传输数据的话,那么实在是,性能肯定会下降的,大量网络传输,以及磁盘IO,都是性能的杀手。 3.我们什么时候要调节这个参数? 观察日志,spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。 日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL 观察大部分task的数据本地化级别 如果大多都是PROCESS_LOCAL,那就不用调节了 如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长 调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志 看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短 但是别本末倒置,本地化级别倒是提升了,但是因为大量的等待时长,spark作业的运行时间反而增加了, 那就还是不要调节了 4.如何调节? 主参数 spark.locality.wait,默认是3s;6s,10s 默认情况下,下面3个的等待时长,都是跟上面那个是一样的,都是3s spark.locality.wait.process spark.locality.wait.node spark.locality.wait.rack new SparkConf().set("spark.locality.wait", "10")
企业级Spark调优解决方案(四)之使用kryo序列化,使用fastutil,调节本地化等待时长
猜你喜欢
转载自blog.csdn.net/xcvbxv01/article/details/86557619
今日推荐
周排行