数据处理的并行度
1、BlockRDD的分区数
(1)通过Receiver接受数据的特点决定
(2)也可以自己通过repartition设置
2、ShuffleRDD的分区数
(1)默认的分区数为spark.default.parallelism(core的大小)
(2)通过我们自己设置决定
val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER) val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER) val lines = lines1.union(lines2) lines.repartition(100) //通过repartition设置 //处理的逻辑,就是简单的进行word count val words = lines.repartition(100).flatMap(_.split(" ")) //自己设置决定ShuffleRDD的分区数 以及分区算法,默认是core的数量 val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10)) //并发度是10个分区,根据集群资源情况调节
数据的序列化
两种需要序列化的数据:
1、输入数据
默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中(以序列化的方式存储在内存中,内存不够放在DISK中)
2、Streaming操作中产生的缓存RDD
默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中
使用Kryo序列化机制,比Java序列化机制性能好
import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * WordCount程序,Spark Streaming消费TCP Server发过来的实时数据的例子: * * 1、在master服务器上启动一个Netcat server * `$ nc -lk 9998` (如果nc命令无效的话,我们可以用yum install -y nc来安装nc) * * 2、用下面的命令在在集群中将Spark Streaming应用跑起来 * spark-submit --class com.twq.wordcount.JavaNetworkWordCount \ * --master spark://master:7077 \ * --deploy-mode client \ * --driver-memory 512m \ * --executor-memory 512m \ * --total-executor-cores 4 \ * --executor-cores 2 \ * /home/hadoop-twq/spark-course/streaming/spark-streaming-basic-1.0-SNAPSHOT.jar */ object KryoNetworkWordCount { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("KryoNetworkWordCount") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //指定spark.serializer.KryoSerializer sparkConf.set("spark.kryo.registrator", "com.twq.spark.rdd.example.ClickTrackerKryoRegistrator") // 自定义的数据类型通过Kryo序列化 val sc = new SparkContext(sparkConf) // Create the context with a 1 second batch size val ssc = new StreamingContext(sc, Seconds(1)) //如果一个batchInterval中的数据量不大,并且没有window等操作,则可以使用MEMORY_ONLY val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_ONLY_SER) //处理的逻辑,就是简单的进行word count val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) //将结果输出到控制台 wordCounts.print() //启动Streaming处理流 ssc.start() //等待Streaming程序终止 ssc.awaitTermination() } } class ClickTrackerKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[TrackerLog]) } } case class TrackerLog(id: String, name: String)
内存调优
1、需要内存大小
和transform类型有关系
数据存储的级别
2、GC
driver端和executor端都使用CMS垃圾收集器
CMS(Concurrent Mark Sweep 标记清除算法)收集器是一种以获取最短回收停顿时间为目标的收集器
(通过--driver-java-options和spark.executor.extraJavaOptions)