Spark部分的复习

Spark day01
[
    1.什么是Spark?与MR的区别?
        Spark是开源的通用的计算框架,目的是为了使数据分析更快。MR也是计算框架。
        区别?
            1).MR是基于磁盘迭代,Spark是基于内存迭代。
            2).Spark中有DAG有向无环图。
            3).MR中只有map,reduce两个类,相当于Spark中两个算子。Spark中有三类算子(转换算子,行动算子,持久化算子)。
            4).MR是细粒度资源申请,Spark是粗粒度资源申请。
            
    2.Spark运行模式?
        1).Local:在eclipse/IDEA中编写代码,在本地运行
        2).Standalone:Spark自带的资源调度框架,支持分布式搭建。
        3).Yarn:Hadoop生态圈内的资源调度框架。
        4).Mesos:资源调度框架。
        
    3.Spark核心RDD
        1).RDD(Resilient Distributed Dateset),弹性分布式数据集。Spark底层操作数据都是基于RDD。
        2).RDD五大特性:
            a).RDD由一系列Partition组成。
            b).函数(算子)是作用在partition上的。
            c).RDD之间有依赖关系。
            d).分区器是作用在K,V格式的RDD上。
            e).partition提供最佳计算位置,利于处理数据的本地化。符合“计算移动,数据不移动”
        3).注意:
            a).sc.textFile(...)读取HDFS中文件的方法,底层调用的是MR读取HDFS中文件的方法,首先会split,每个split大小默认
                与一个block大小相同,每个split与RDD中的一个partition对应。
            b).什么是K,V格式的RDD?
                RDD中元素是一个个的tuple2 二元组,这个RDD就是K,V格式的RDD。
            c).哪里体现了RDD的弹性(容错)?
                i).RDD之间有依赖关系
                ii).partition个数可多可少。
            d).哪里体现了RDD的分布式?
                partition是分布在多个节点上的。
                
    4.Spark代码流程?
        1).val conf = new SparkConf().setMaster(...).setAppName(...)
        2).val sc = new SparkContext(conf)
        3).创建RDD。
        4).对RDD使用Transformation类算子进行数据转换。
        5).使用Action类算子触发Transformation类算子执行。
        6).sc.stop()
    
    5.Spark算子
        1).Transformations ,转换算子,懒执行,需要Action类算子触发。
            map/mapToPair,flatMap,filter,reduceByKey,sample,sortBy/sortByKey,groupByKey,join,leftOutJoin,rightOuterJoin,fullOuterJoin,distinct,union,intersection,subtract,repartition,coalesce,zip,zipWithIndex,mapPartitions,
            mapPartitionWithIndex,cogroup,mapValues,aggreagateByKey,combineByKey
        2).Action,行动算子,触发Action类算子执行。Spark应用程序中(Spark Application)有一个Action算子就有了一个job。
            take,frist,foreach,count,collect,reduce,foreachPartition,countByKey,countByValue
        3).持久化算子。
            a).cache
                默认将数据持久化到内存,cache()=persist()=persist(StorageLevel.MEMORY_ONLY)
            b).persist
                可以手动指定数据持久化级别。
                MEMORY_ONLY
                MEMORY_ONLY_SER
                MEMORY_AND_DISK
                MEMORY_AND_DISK_SER
                "_2"代表有副本数,尽量避免使用"DISK_ONLY"级别。
            c).checkpoint
                将数据可以持久化到磁盘,指定的checkpoint目录中,切断checkpointRDD之前的依赖关系,使之后的RDD依赖于checkpoint目录中的数据。需要设置checkpoint路径。
                RDD lineage 非常长,每一个RDD之间逻辑复杂,计算耗时。对一个RDD进行checkpoint之前最好先cache下。
                    
            注意:
            a).cache和persist注意事项:
                i).cache和persist是懒执行,需要Action算子触发。
                ii).对一个RDD进行cache/persist之后,可以赋值给一个变量,下次直接使用这个变量就是使用的持久化的数据。
                iii).cache/persist之后不能紧跟Action类算子。
            b).checkpoint执行流程:
                i).Spark任务执行完成之后,会从后往前回溯,找到CcheckpointRDD做标记。
                ii).回溯完成之后,重新计算标记RDD的数据,将数据放入checkpoint目录中。
                iii).切断RDD之间的依赖关系。
                
    6.Spark Standalone集群搭建
        1).解压,上传
        2).在../conf/slaves 中配置Worker信息
        3).在../conf/spark-env.sh 中配置Master信息
            SPARK_MASTER_IP
            SPARK_MASTER_PORT
            SPARK_WORKER_CORES
            SPARK_WORKER_MEMORY
        4).发送到其他节点 scp -r /spark1.6 nodex:`pwd`
        5).在Master节点启动集群:../sbin/start-all.sh
        
    7.搭建客户端
        原封不动将Spark安装包复制到一台新的节点,在这个节点中../bin/spark-submit提交任务,这个节点就是客户端。
            
    8.SparkPI 提交任务    
    9.创建RDD?
        java:
            sc.textFile(xxx,minNumPartitions)
            sc.parallelize(xx,numpartition)
            sc.parallelizePairs(seq[Tuple2<k,v>]) 将数据转换成K,V格式的RDD
        scala:
            sc.textFile(xxx,minNumPartitions)
            sc.parallelize(xx,numpartition)
            sc.makeRDD(xx,numpartition)
    10.Spark 基于Yarn 提交任务:
        在客户端 ../conf/spark-env.sh 中配置 HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
]

Spark day02
[
    1.Spark基于Standalone提交任务
        1).client
            命令:
                ./spark-submit --master spark://node1:7077 --class ... jar ... 参数
                ./spark-submit --master spark://node1:7077 --deploy-mode client --class .. jar .. 参数
            过程:
                a).在客户端提交Spark应用程序,会在客户端启动Driver。
                b).客户端向Master申请资源,Master找到资源返回。
                c).Driver发送task。
            注意:
                client方式提交任务,在客户端提交多个application,客户端会为每个application都启动一个Driver,Driver与集群Worker节点有大量通信,这样会造成客户端网卡流量激增。client方式提交任务适用于程序测试,不适用于真实生产环境。在客户端可以看到task执行情况和计算结果。
                
        2).cluster
            命令:
                ./spark-submit --master spark://node1:7077 --deploy-mode cluster --class ... jar ... 参数
            过程:
                a).客户端提交application,客户端首先向Master申请启动Driver
                b).Master收到请求之后,随机在一台Worker节点上启动Driver
                c).Driver启动之后,向Master申请资源,Master返回资源。
                d).Driver发送task.
            注意:
                cluster方式提交任务,Driver在集群中的随机一台Worker节点上启动,分散了client方式的网卡流量激增问题。
                cluster方式适用于真实生产环境,在客户端看不到task执行情况和执行结果,要去WEBUI中去查看。
    2.Spark基于Yarn提交任务
        1).client
            命令:
                ./spark-submit --master yarn --class ... jar ... ....
                ./spark-submit --master yarn-client --class ...jar .... 
                ./spark-submit --master yarn --deploy-mode client --class ..jar ... 
            过程:
                a).客户端提交application,Driver会在客户端启动
                b).客户端向ResourceManager申请启动ApplicationMaster
                c).ResourceManager收到请求之后,随机在一台NodeManager中启动ApplicationMaster
                d).ApplicationMaster启动之后,向ResourceManager申请资源,用于启动Executor
                e).ResourceManager收到请求之后,找到资源返回给ApplicationMaster
                f).ApplicationMaster连接NodeManager启动Executor
                g).Executor启动之后会反向注册给Driver
                h).Driver发送task到Executor执行
            注意:
                client方式提交任务,在客户端提交多个application,客户端会为每个application都启动一个Driver,Driver与集群Worker节点有大量通信,这样会造成客户端网卡流量激增。client方式提交任务适用于程序测试,不适用于真实生产环境。在客户端可以看到task执行情况和计算结果。
            总结:
                Driver功能:
                    i).发送task
                    ii).监控task,回收结果
                    iii).申请资源
        2).cluster
            命令:
                ./spark-submit --master yarn-cluster --class ...jar .... .. 
                ./spark-submit --master yarn --deploy-mode cluster --class ..jar ... .. 
            过程:
                a).客户端提交Application,首先客户端向ResourceManager申请启动ApplicationMaster
                b).ResourceManager收到请求之后,随机在一台NodeManager中启动ApplicationMaster,这里ApplicationMaster就相当于是Driver
                c).ApplicationMaster启动之后,向ResourceManager申请资源,用于启动Executor
                d).ResourceManager收到请求之后,找到资源返回给ApplicationMaster
                e).ApplicationMaster连接NodeManager启动Executor
                f).Executor启动之后会反向注册给ApplicationMaster(Driver)
                g).Driver发送task到Executor执行
            注意:
                cluster方式提交任务,Driver在集群中的随机一台Worker节点上启动,分散了client方式的网卡流量激增问题。
                cluster方式适用于真实生产环境,在客户端看不到task执行情况和执行结果,要去WEBUI中去查看。
            总结:
                ApplicationMaster的作用:
                    i).申请资源
                    ii).启动Executor
                    iii).任务调度
    
    3.术语解释
        任务层面:application->job->stage->task
        资源层面:Master->Worker->Executor->ThreadPool
    
    4.RDD宽窄依赖
        宽依赖(shuffle):
            父RDD与子RDD partition之间的关系是一对多
        窄依赖:
            父RDD与子RDD partition之间的关系是一对一
            父RDD与子RDD partition之间的关系是多对一
    
    5.Stage
        1).Stage是由一组并行的task组成的。RDD之间依赖关系形成一个DAG有向无环图,DAG有向无环图按照RDD之间的宽窄依赖划分stage。
        2).spark计算模式:
            pipeline管道计算模式,相当于高阶函数展开方式计算
                思想,1+1+1=3 
                MR:1+1=2 2+1=3
        3).在管道中的数据什么时候落地?
            a).shuffle write
            b).对RDD持久化
        4).Stage的并行度?由stage中的finalRDD中partition个数决定的。
        5).如何提高stage的并行度?
            reduceByKey(xx,numpartitions),groupByKey(xx,numpartitions),join(xx,numpartition)
            
    6.Spark任务调度和资源调度
        1).资源调度
            a).集群启动,Worker向Master汇报资源,Master掌握了集群资源情况
            b).当在客户端提交任务的时候,运行任务,new SparkContext,会创建两个对象:DAGScheduler和TaskScheduler
            c).TaskScheduler向Master申请资源
            d).Master找到满足资源的节点启动Executor
            e).Executor启动之后,反向注册给Driver,Driver掌握了集群计算资源
        2).任务调度
            a).当application运行到Action算子时,触发job,开始任务调度
            b).每个job中有一系列RDD的依赖关系,形成一个DAG有向无环图。
            c).DAG有向无环图被DAGScheduler按照RDD之间的宽窄依赖关系切割job划分stage
            d).DAGScheduler将stage以taskSet的形式提交给TaskScheduler
            e).TaskScheduler遍历set,拿到一个个task,发送到Executor中ThreadPool执行
            f).TaskScheduler监控task回收结果
            
        注意:
            a).TaskScheduler发送task会失败,重试3次,如果依然失败,由DAGScheduler重试stage,重试4次,如果依然失败,stage所在的job就失败了,job失败,application就失败了。
            b).TaskScheduler不仅可以重试失败的task,还可以重试执行缓慢的task,这就是Spark推测执行机制,默认关闭的,对于ETL的业务一定关闭推测执行。对于数据一直执行不完,首先看有没有数据倾斜,是否开启了推测执行。
    
    7.粗粒度资源申请和细粒度资源申请
        粗粒度资源申请:
            任务执行之前,先将所有的资源申请到,task执行的时候不需要自己申请资源,加快了执行速度。如果多数task执行完成,只有一个task没有执行完,那么这批申请到的资源不会被释放,只有所有的task执行完成之后才会释放所有资源。会有集群资源不能充分利用的情况。
        细粒度资源申请:
            任务执行之前,不会申请所有的资源,task执行时,自己申请资源,自己释放资源,任务执行就慢了,但是集群资源可以充分利用
    ]        
        
Spark day03
[
    1.PV&UV(统计24小时的pv,uv)
        PV:page view 有重复
        UV:unquie Vistor 无重复
        
    2.spark-submit参数
        --master
        --class
        --deploy-mode 
        --executor-cores
        --executor-memory
        --total-executor-cores
        --driver-cores
        --driver-memory
        --files
        --jars 多个jar包之间用逗号隔开
        --driver-class-path 多个jar包之间用分号隔开
        
    3.源码
        资源调度源码:
            结论:
                1.Executor在集群中是分散启动的
                2.如果提交任务不指定--executor-cores
                   集群会在每台Worker节点上启动一个Executor,这个Executor会使用这台节点的所有的core和1G内存
                3.如果想要在一个节点上启动多个Executor,一定要指定--executor-cores,执行任务要指定--total-executor-cores
                4.启动Executor不仅和core有关,还和内存有关
                
        任务调度源码:
            从Action算子看
    
    4.二次排序
        自己封装一个类,实现Comparable<SecondSortKey>接口,将要排序的字段当做属性放入类,类中实现compareTo方法,然后使用spark对Key 使用sortByKey排序。
        排序的字段大于2以上的都叫二次排序。
    5.分组取topN
        1).原生的集合工具Collections.sort(list,new comparator<xxx>)
        2).自己定义定长数组
]        
        
Spark day04
[
    1.广播变量
        当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。
        如果使用广播变量在每个Executor端中只有一份Driver端的变量副本。
        val broadcast = sc.broadCast(...)
        broadcast.value()
        注意:
            1).不能将RDD广播出去,可以将RDD的结果广播出去
            2).广播变量在Driver定义,在Exector端不可改变,在Executor端不能定义
    2.累计器
        相当于集群中的统筹的大变量。
        自定义累计器 ,实现AccumulatorParam<xxx>  实现三个方法zero,addAccumulator,addInPlace
        val accumulator = sc.accumulator(xxx,new AccumulatorParam...)
        accumulator.add(xxxx)
        注意:
            累计器只能在Driver定义初始化,在Executor端更新,在Executor不能accumulator.value获取值。
            
    3.Spark WEBUI
        【Jobs,Stages,StorageLevel,Environment,Executors,SQL,Streaming 】
        4040端口可以看到当前application中的所有的job,点击job可以看到当前job下对应的stage,点击stage出现stage下的task。
    
    4.搭建HistoryServer
        1).在客户端中../conf/spark-defaults.conf文件中配置:
            //开启记录事件日志的功能
            spark.eventLog.enabled            true
            //设置事件日志存储的目录
            spark.eventLog.dir                hdfs://node1:9000/spark/test
            //设置HistoryServer加载事件日志的位置
            spark.history.fs.logDirectory    hdfs://node1:9000/spark/test
            //日志优化选项,压缩日志
            spark.eventLog.compress         true
        2).启动HistoryServer:
            在../conf/sbin/start-history-server.sh
        3).访问:node4:18080
    5.端口:
        50070:
        8020:
        9000:
        2181:
        9083:
        9092:
        8080:
        8081:
        4040:
        7077:
        18080:
    
    6.Master HA
        1).System File
        2).Zookeeper
            a).在集群的每台节点../conf/spark-env.sh 中配置:
                export SPARK_DAEMON_JAVA_OPTS="
                -Dspark.deploy.recoveryMode=ZOOKEEPER
                -Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181 
                -Dspark.deploy.zookeeper.dir=/sparkmaster0821"
            b).在StandBy-Master节点中修改../conf/spark-env.sh:SPARK_MASTER_IP = node2
            c).启动Zookeeper
            d).在Alive-Master节点上启动集群:../sbin/start-all.sh
            e).在StandBy-Master节点上启动StandBy-Master ../sbin/start-master.sh
            d).测试。
        注意:
            a).这里的Master HA是针对的standAlone集群
            b).主备切换过程中,不影响已经在集群中运行的程序
            c).主备切换过程中,不能申请资源。
            
    7.Spark shuffle
        Spark1.2之前默认使用的是HashShuffle(hashPartitioner),1.2之后使用的sortShuffle(rangePartitioner)。
        1).HashShuffle
            a).普通机制
                产生磁盘小文件的个数:
                    M(map task个数)*R(reduce task个数)
                流程:
                    i).map task处理完数据之后,每个task将结果写入buffer缓存区(与reduce task个数一致),每个buffer缓冲区大小是32K
                    ii).每个buffer满32k,溢写磁盘,每个buffer对应磁盘一个小文件。
                    iii).reduce  task 到不同节点拉取数据
                问题:产生磁盘小文件太多?
                    i).写磁盘文件的对象多
                    ii).拉取数据读磁盘文件对象多
                    iii).创建对象多,容易造成gc,gc还不满足内存使用,就会OOM
                        OOM问题:
                            i).Driver端回收RDD数据
                            ii).Executor 端创建对象非常多,可能会有OOM(0.2 task内存)
                            iii).Executor 端拉取shuffle数据,如果5个task一次拉取的数据量在Executor0.2的shuffle内存中放不下
                            iiii).Executor端对RDD进行缓存或者广播变量的RDD数据量比较大(0.6内存)
                    
            b).合并机制
                产生磁盘小文件:
                    C(core 的个数)*R(reduce task个数)
                过程:
                    与普通机制一样,只是一个core内运行的task 会公用一份buffer缓存区。
                                        
        2).SortShuffle
            a).普通机制
                产生磁盘小文件个数:
                    2*M(map task个数)
                流程:
                    i).map task 将处理的结果写入一个5M的内存结构中
                    ii).SortShuffle中会估算这个内存结构大小,当下一次结果放不下时,会申请2*估计-当前 
                    iii).如果申请的到内存,继续往数据结构中写数据,如果申请不到,溢写磁盘,每批次是1万条溢写,溢写过程中会有排序。
                    iv).溢写的数据在磁盘上最终形成两个文件:一个索引文件一个数据文件
                    v).reduce 拉取数据首先解析索引文件,再去拉取数据
            b).bypass机制
                产生磁盘小文件个数:
                    2*M(map task个数)
                流程:
                    与普通的机制对比,少了排序。当reduce task个数小于spark.shuffle.sort.bypassMergeThreshold (默认200)会开启bypass机制。
    
    8.shuffle 文件的寻址
        1).MapOutputTracker:磁盘小文件
            MapOutputTrackerMaster(Driver端)                
            MapOutputTrackerWorker(Executor端)
        2).BlockManager:块管理者
            BlockManagerMaster(Driver端):
                DiskStore:管理磁盘数据
                MemoryStroe:管理内存数据
                ConnectionManager:负责连接其他BlockManager
                BlockTransferService:负责拉取数据
            BlockManagerSlaves(Executor端):
                DiskStore:管理磁盘数据
                MemoryStroe:管理内存数据
                ConnectionManager:负责连接其他BlockManager
                BlockTransferService:负责拉取数据
        3).寻址:
            a).map task处理完的数据,将结果和数据位置封装到MapStatus对象中,通过MapOutputTrackerWorker汇报给Driver中的
                MapOutputTrackerMaster。Driver中掌握了数据位置。
            b).reduce  端处理数据,首先向本进程中的MapOutputTrackerWorker要磁盘文件位置,再向Driver中的MapOutputTrackerMaster
                要磁盘数据位置,Driver返回磁盘数据位置。
            c).reduce 拿到数据位置之后,通过BlockManager中的ConnectionManager连接数据所在的节点,连接上之后,通过BlockManager
                中的BlockTransferService拉取数据
            d).BlockTransferService拉取数据默认启动5个task,这5个task默认一次拉取的数据量不能超过48M。
            e).拉取过来的数据放在Executor端的shuffle聚合内存中(spark.shuffle.memeoryFraction 0.2),
               如果5个task一次拉取的数据放不到shuffle内存中会有OOM,如果放下一次,不会有OOM,以后放不下的会放磁盘。
                
    9.内存管理
        Spark1.6之前使用的是静态内存管理,spark1.6之后使用的是统一内存管理
        静态内存管理:
            0.2:task运行                
            0.2:
                0.2*0.2:预留
                0.2*0.8:shuffle聚合内存
            0.6:
                0.1*0.6:预留
                0.9*0.6:
                    0.6*0.9*0.2:反序列化数据
                    0.6*0.9*0.8:RDD缓存和广播变量
        统一内存管理:
            300M预留
            (总-300M)*0.25:task运行                
            (总-300M)*0.75:
                (总-300M)*0.75*0.5:shuffle聚合内存
                (总-300M)*0.75*0.5:RDD缓存和广播变量
    10.Shuffle 调优
        1).
            spark.shuffle.file.buffer 32k
            spark.reducer.maxSizeInFlight 48M
            spark.shuffle.io.maxRetries 3
            spark.shuffle.io.retryWait 5s
            spark.shuffle.memoryFraction 0.2
            spark.shuffle.manager hash|sort
            spark.shuffle.sort.bypassMergeThreshold 200 --sortShuffle
            spark.shuffle.consolidateFiles true|false  --hashShuffle
        2).使用
            在conf.set(k,v) 级别最高
            在提交任务 ./spark-submit --conf 第二
            在../conf/spark-defaults.conf中配置 第三
]
        
Spark day05
[
    1.SparkSQL
        可以使用sql对分布式的数据查询,先有Hive->Shark->SparkSQL。
        要想使用SQL对分布式数据查询,首先要创建出来DataFrame,创建DataFrame使用SQLContext()
        conf.set("spark.sql.shuffle.partitions", "200")----SparkSQL sql有join,groupBy 默认分区数
    2. Hive on Spark & Spark on Hive 
        Hive on Spark:
            Hive:解析,优化,存储
            Spark:执行引擎
        Spark on Hive:
            Hive:存储
            Spark:解析,优化,执行引擎
            
    3.DataFrame
        SparkSQL操作基于DataFrame,DataFrame底层是RDD<Row>,既有数据,又有列的Schema信息
    
    4.创建DataFrame的方式
        1).读取json格式的文件
            a).json文件不能嵌套
            b).读取的两种方式:
                DataFrame df = sqlContext.read().format("json").load("./sparksql/json");
                DataFrame df2 = sqlContext.read().json("sparksql/json");
            c).加载过来的DataFrame 列会按照Ascii码排序
            d).可以使用DataFrame的API操作DataFrame,也可以将DataFrame注册成临时表
                df.registerTempTable("jtable");
            
        2).读取json格式的RDD
        
        3).读取普通的RDD加载成DataFrame
            a).反射的方式(少)    
                JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Person call(String line) throws Exception {
                        Person p = new Person();
                        p.setId(line.split(",")[0]);
                        p.setName(line.split(",")[1]);
                        p.setAge(Integer.valueOf(line.split(",")[2]));
                        return p;
                    }
                });
                DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
                i).自定类要实现序列化接口
                ii).自定义类的访问级别是public
                iii).加载过来的DataFrame列也会按照Ascii码排序
            b).动态创建Schema(多)
                List<StructField> asList =Arrays.asList(
                    DataTypes.createStructField("id", DataTypes.StringType, true),
                    DataTypes.createStructField("name", DataTypes.StringType, true),
                    DataTypes.createStructField("age", DataTypes.IntegerType, true)
                );
                
                StructType schema = DataTypes.createStructType(asList);
                
                DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
                
                i).加载过来的DataFrame列不会按照Ascii码排序
        
        4).读取parquent文件加载成DataFrame
            读取:
                DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet");
                load = sqlContext.read().parquet("./sparksql/parquet");
            存储:
                df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
                df.write().mode(SaveMode.Ignore).parquet("./sparksql/parquet");
            
        5).读取Mysql中的数据加载成DataFrame
            读取:
                a).
                    Map<String, String> options = new HashMap<String,String>();
                    options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
                    options.put("driver", "com.mysql.jdbc.Driver");
                    options.put("user", "root");
                    options.put("password", "123456");
                    options.put("dbtable", "person");
                    
                    DataFrame person = sqlContext.read().format("jdbc").options(options).load();
                b).
                    DataFrameReader reader = sqlContext.read().format("jdbc");
                    reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
                    reader.option("driver", "com.mysql.jdbc.Driver");
                    reader.option("user", "root");
                    reader.option("password", "123456");
                    reader.option("dbtable", "score");
                    DataFrame score = reader.load();
            存储:
                Properties properties = new Properties();
                properties.setProperty("user", "root");
                properties.setProperty("password", "123456");
                result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties);
                
        6).读取Hive中的数据加载成DataFrame
            要配置Spark on Hive,如果SparkSQL要读取数据是Hive中数据,要使用HiveContext,HiveContext是SQLContext的子类。
            读取:
                HiveContext hiveContext = new HiveContext(sc);
                hiveContext.sql("USE spark");
                DataFrame df = hiveContext.table("good_student_infos");
            存储:
                hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
                goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
    5.配置Spark on Hive
        1).在客户端创建../conf/hive-site.xml
            <configuration>
               <property>
                    <name>hive.metastore.uris</name>
                    <value>thrift://node1:9083</value>
               </property>
            </configuration>
        2).启动Hive,在服务端启动metaStore服务,hive --service metastore
        3).spark-shell 测试

    6.UDF & UDAF
        1).UDF:用户自定义函数,user defined function
            可以自定义类实现UDFX接口,传几个参数,对应实现UDFX
            sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {

                private static final long serialVersionUID = 1L;

                @Override
                public Integer call(String t1) throws Exception {
                    return t1.length();
                }
            }, DataTypes.IntegerType);
            sqlContext.sql("select name ,StrLen(name) as length from user").show();
            
        2).UDAF:用户自定义聚合函数,user defined aggreagatefunction
            sqlContext.udf().register("StringCount",new UserDefinedAggregateFunction() {
            
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
                
                /**
                 * 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
                 */
                @Override
                public void initialize(MutableAggregationBuffer buffer) {
                    buffer.update(0, 0);
                }
                
                /**
                 * 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑
                 * buffer.getInt(0)获取的是上一次聚合后的值
                 * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 
                 * 大聚和发生在reduce端.
                 * 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
                 */
                @Override
                public void update(MutableAggregationBuffer buffer, Row arg1) {
                    buffer.update(0, buffer.getInt(0)+1);
                    
                }
                /**
                 * 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
                 * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
                 * buffer1.getInt(0) : 大聚合的时候 上一次聚合后的值       
                 * buffer2.getInt(0) : 这次计算传入进来的update的结果
                 * 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
                 */
                @Override
                public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
                    buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
                }
                /**
                 * 在进行聚合操作的时候所要处理的数据的结果的类型
                 */
                @Override
                public StructType bufferSchema() {
                    return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer", DataTypes.IntegerType, true)));
                }
                /**
                 * 最后返回一个和dataType方法的类型要一致的类型,返回UDAF最后的计算结果
                 */
                @Override
                public Object evaluate(Row row) {
                    return row.getInt(0);
                }
                /**
                 * 指定UDAF函数计算后返回的结果类型
                 */
                @Override
                public DataType dataType() {
                    return DataTypes.IntegerType;
                }
                /**
                 * 指定输入字段的字段及类型
                 */
                @Override
                public StructType inputSchema() {
                    return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));
                }
                /**
                 * 确保一致性 一般用true,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。
                 */
                @Override
                public boolean deterministic() {
                    return true;
                }
                
            });
            
            sqlContext.sql("select name ,StringCount(name) as strCount from user group by name").show();
                
    7.开窗函数
        sql中分组取topN问题。
        row_number() over (partition by xxx order by xxx ) rank rank从1开始
            DataFrame result = hiveContext.sql("select riqi,leibie,jine "
                                + "from ("
                                    + "select riqi,leibie,jine,"
                                    + "row_number() over (partition by leibie order by jine desc) rank "
                                    + "from sales) t "
                            + "where t.rank<=3");
]

Spark day06
[
    1.SparkStreaming
        1).SparkStreaming是流式处理框架,7*24小时不间断运行,微批处理。
        2).与Storm的区别:
            i).Storm是纯实时处理数据,SparkStreaming是微批处理数据
            ii).Storm擅长处理汇总型业务,SparkStreaming擅长处理复杂的业务,可以使用SpakrCore,SparkSQL
            iii).Storm可以动态调度资源,SparkStreaming1.2之后可以
            v).Storm事务相对SparkStreaming比较完善
        3).接受处理数据过程:
            要使用一个task接受数据,SparkStreaming将一段时间内的接受来的数据当做一批次处理,这个一段时间就是BatchInterval,
            将接受数据每隔BatchInterval封装到一个batch中,这个batch又被封装到一个RDD中,这个RDD又被封装到一个DStream中。
            每隔batchInterval,就会产生一个DStream,后面要有task处理这个DStream。
            假设产生一批次的时间是5s,集群处理这批次数据的时间是3s,集群休息了2s.
            假设产生一批次的时间是5s,集群处理这批次数据的时间是8s,集群中就会有任务堆积。
            参照WEBUI看有没有任务堆积。
    2.SparkStreaming读取Socket端口数据
        1.代码:
            SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountOnline");
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
            JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node5", 9999);
            
            //        JavaSparkContext sc = new JavaSparkContext(conf);
            //        JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(5));
            //        JavaSparkContext sparkContext = jsc.sparkContext();
        2.注意:
         * 1、local的模拟线程数必须大于等于2 因为一条线程被receiver(接受数据的线程)占用,另外一个线程是job执行
         * 2、Durations时间的设置,就是我们能接受的延迟度,这个我们需要根据集群的资源情况以及
                监控每一个job的执行时间来调节出最佳时间。
         * 3、 创建JavaStreamingContext有两种方式 (sparkconf、sparkcontext)
         * 4、业务逻辑完成后,需要有一个output operator
         * 5、JavaStreamingContext.start()straming框架启动之后是不能在次添加业务逻辑
         * 6、JavaStreamingContext.stop()无参的stop方法会将sparkContext一同关闭,stop(false) ,默认为true,会一同关闭
         * 7、JavaStreamingContext.stop()停止之后是不能在调用start   
         
    3.SparkStreaming 算子
        Transformation类:
            updateStateByKey:更新SparkSteaming程序启动以来所有的key的状态。
                1).要设置checkpoint
                2).多久往checkpoint中写一份数据
                    如果batchInterval小于10s,10s写一次,如果batchInterval大于10s,那么batchInterval写一次。
            reduceByKeyAndWindow:
                1).
                    window length:窗口长度
                    slide length:滑动间隔
                2).普通:
                    i).代码:
                        JavaPairDStream<String, Integer> searchWordCountsDStream = 
                            searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {

                                    private static final long serialVersionUID = 1L;

                                    @Override
                                    public Integer call(Integer v1, Integer v2) throws Exception {
                                        return v1 + v2;
                                    }
                        }, Durations.seconds(15), Durations.seconds(5)); 
                        
                    ii).不用设置checkpoint目录
                3).优化:
                    i).代码:
                        JavaPairDStream<String, Integer> searchWordCountsDStream = 
                         searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {

                            private static final long serialVersionUID = 1L;

                            @Override
                            public Integer call(Integer v1, Integer v2) throws Exception {
                                return v1 + v2;
                            }
                            
                        },new Function2<Integer, Integer, Integer>() {

                            private static final long serialVersionUID = 1L;

                            @Override
                            public Integer call(Integer v1, Integer v2) throws Exception {
                                return v1 - v2;
                            }
                            
                        }, Durations.seconds(15), Durations.seconds(5));    
                    ii).必须要设置checkpoint目录
            transform:
                1).transform 可以拿到DStream中的RDD,做DStream中RDD类型的转换
                2).transform 
                    算子内,拿到的RDD算子外,代码是在Driver端执行的,每个batchInterval执行一次,可以做到动态改变广播变量。
        OutPutOperator类:
            print:
            foreachRDD:
                1).拿到DStream中的RDD,对RDD要使用RDD的action类算子触发执行。
                2).foreachRDD算子内,拿到的RDD的算子外的代码,是在Driver端执行的,可以做到动态改变广播变量。
            saveAsTextFile:
            saveAsHadoopFile:
            saveAdObjectFile:
    
    4.Kafka
        1).kafka是分布式消息系统,生产者消费者模式,数据默认保存7天。
        2).概念:
            producter:
                消息的生产者,自己决定往哪个partition中生产数据,i).轮循 ii).hash
                
            consumer:
                消息消费者,自己在zookeeper中维护消费者偏移量,每个消费者都有自己的消费者组,不同的消费者组消费同一个topic互不
                影响。同一个消费者组内消费同一个topic,这个topic中的数据只能被消费一次。
            
            broker:
                组成kafka集群的节点,之间没有主从关系,通过zookeeper协调。一个broker可以管理多个partitioin,负责消息的读写,
                存储。
            
            topic:
                一类消息/消息队列,topic是由partition组成的,有多少个?创建的时候可以指定。topic中的一个partition只能被同一个消费者组
                中的一个消费者同时消费。
                
            partition:
                存储数据的文件,partition中有消息的offset,每个partition都由一个broker管理(Leader),每个partition都有副本,
                有几个?创建topic的时候可以指定。
            
            zookeeper:
                存储原数据,topic,broker,partition,offset
        
        3).集群搭建:
            a).上传kafka_2.10-0.8.2.2.tgz包到三个不同节点上,解压。
            b).配置../ kafka_2.10-0.8.2.2/config/server.properties文件
                broker.id:Integer值 从0开始
                log.dirs:真实存储数据位置
                zookeeper.connect:zookeeper集群
            c).启动zookeeper集群,在每个节点上启动kafka:
                bin/kafka-server-start.sh   config/server.properties
        
        4).命令:
            创建topic:
                ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181  --create --topic topic2017 --partitions 3 --replication-factor 3
                
            用一台节点控制台来当kafka的生产者:
                ./kafka-console-producer.sh  --topic  topic2017 --broker-list node1:9092,node2:9092,node3:9092
                
            用另一台节点控制台来当kafka的消费者:
                ./kafka-console-consumer.sh --zookeeper node3:2181,node4:2181,node5:2181 --topic topic2017
            
            查看kafka中topic列表:
                ./kafka-topics.sh  --list --zookeeper node3:2181,node4:2181,node5:2181
            
            查看kafka中topic的描述:
                ./kafka-topics.sh --describe --zookeeper node3:2181,node4:2181,node5:2181  --topic topic2017
            
        5).删除kafka中的数据    
            a).在kafka集群中删除topic,当前topic被标记成删除。在每台broker节点上删除当前这个topic对应的真实数据。
                ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --delete --topic t1205
            b).进入zookeeper客户端,删除topic信息
                rmr /brokers/topics/t1205
            c).删除zookeeper中被标记为删除的topic信息
                rmr /admin/delete_topics/t1205
            
        6).leader的均衡机制
    
    5.Driver HA
        /**
         *  checkpoint 保存:
         *        1.配置信息
         *        2.DStream操作逻辑
         *        3.job的执行进度
         *      4.offset
         */
        final String checkpointDirectory = "./checkpoint";
        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
            @Override
            public JavaStreamingContext create() {  
                return createContext(checkpointDirectory,conf);
            }
        };
        /**
         * 获取JavaStreamingContext 先去指定的checkpoint目录中去恢复JavaStreamingContext
         * 如果恢复不到,通过factory创建
         */
        JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
        
    6.SparkStreaming+kafka整合
        1).Receiver模式
            a).receiver模式使用zookeeper管理offset,要使用一个task接收kafka中的数据,会有丢失数据的问题,开启WAL机制将数据备份到checkpoint目录中一份,避免数据丢失,开启WAL机制之后会降低任务总体执行效率,延长时间。
            b).receiver模式并行度:
                spark.sparkstreaming.blockInterval 200 ,降低这个值,提高了生成DStream中的RDD的partition个数,建议blockInterval不能小于50ms
        2).Direct模式
            a).Direct模式spark自己管理offset,放在内存中,如果设置了checkpoint目录,那么在checkpoint中也有一份offset
            b).Direct模式并行度
                与读取的topic中的partition个数一致,增加topic中partition的个数就可以提高并行度
        
    7.SparkStreaming配置项:
        spark.streaming.receiver.writeAheadLog.enable  默认false没有开启
        spark.streaming.blockInterval  默认200ms
        spark.streaming.backpressure.enabled  默认false
        spark.streaming.receiver.maxRate  默认没有设置
        spark.streaming.stopGracefullyOnShutdown    false
        spark.streaming.kafka.maxRatePerPartition    not set
    8.使用zookeeper手动管理checkpoint
        ...
]    
        
Spark 调优
[
    1.资源调优
        1).搭建集群
            在spark安装包的conf下spark-env.sh 
            SPARK_WORKER_CORES
            SPARK_WORKER_MEMORY
            SPARK_WORKER_INSTANCE
        2).提交任务的时候
            提交命令选项:(在提交Application的时候使用选项)
                --executor-cores
                --executor-memory
                --total-executor-cores
            配置信息:(在Application的代码中设置
                        在Spark-default.conf中设置)
                spark.executor.cores
                spark.executor.memory
                spark.max.cores
                
    2.并行度调优
        1).降低读取文件的block ,sc.textFile(xxx)
        2).sc.textFile(xxx,minnumPartitions)
        3).sc.parallelize(xx,numpartitions)
        4).sc.markRDD(xx,numpartitions)
        5).sc.parallelizePairs(xx,numpartitions)
        6).reduceByKey(xx,numpartitions)/join(xxx,numpartitions)
        7).repartition(coalesce(numparititons,true))/coalesce
        8).自定义分区器
        9).spark.default.parallelism  生成的RDD的默认并行度
        10).spark.sql.shuffle.partitions  默认200,sql执行有shuffle类的操作的分区数
        11).SparkStreaming+Receiver spark.streaming.blockInterval=200ms
        12).SparkStreming+Direct 读取的topic中的partition个数一致
    
    3.代码调优
        1).避免创建重复的RDD
        2).复用同一个RDD
        3).对多次使用的RDD进行持久化
        4).尽量避免使用shuffle类的算子 广播变量+filter/map...
        5).使用map-side预聚合的shuffle操作,尽量使用有combiner的shuffle类算子。
            combiner概念:
                在map端,每一个map task计算完毕后进行的局部聚合
            combiner好处:
            a)    降低shuffle write写磁盘的数据量。
            b)    降低shuffle read拉取数据量的大小。
            c)    降低reduce端聚合的次数。
        6).尽量使用高性能的算子
            使用reduceByKey替代groupByKey
            使用mapPartitions替代map
            使用foreachPartition替代foreach
            filter后使用coalesce减少分区数
            使用repartitionAndSortWithinPartitions替代repartition与sort类操作
            使用repartition和coalesce算子操作分区
        7).使用广播变量
        8).使用Kryo优化序列化性能
        9).优化数据结构    
            使用基本数据类型代替字符串
            使用字符串代替对象
            使用数组代替集合
        10).使用高性能的库fastutil
            
    4.数据本地化
        数据本地化级别:
            1)    PROCESS_LOCAL:
                task处理的数据在本Executor进程的内存中
            2)    NODE_LOCAL
                task处理数据在本Executor所在的节点的磁盘中
                task处理的数据在本Executor所在节点的其他Executor的内存中
            3)    NO_PREF
                task处理的数据在数据库中
            4)    RACK_LOCAL
                task处理的数据在同机架不同节点的Executor内存中
                task处理的数据在同机架不同节点的磁盘上
            5)    ANY
                task处理的数据在其他机架上
        怎么调优:
            • spark.locality.wait 3s 
            • spark.locality.wait.process
            • spark.locality.wait.node
            • spark.locality.wait.rack

            
    5.内存调优
        堆内存不足造成的影响:
            1)    频繁的minor gc。
            2)    老年代中大量的短生命周期的对象会导致full gc。
            3)    gc 多了就会影响Spark的性能和运行的速度。
        怎么调?
            1)    提高Executor总体内存的大小
            2)    降低储存内存比例或者降低聚合内存比例
    
    6.Spark Shuffle调优
        spark.shuffle.file.buffer 32k
        spark.reducer.MaxSizeFlight 48M
        spark.shuffle.io.maxRetries 3
        spark.shuffle.io.retryWait 5s
        spark.shuffle.manager hash|sort
        spark.shuffle.sort.bypassMergeThreshold 200----针对SortShuffle
        spark.shuffle.consolidateFiles false----针对HashShuffle
    
    7.调节Executor的堆外内存
        节点之间连接默认等待时间:
            --conf spark.core.connection.ack.wait.timeout=300
        堆外内存调节:
            yarn下:
                --conf  spark.yarn.executor.memoryOverhead=2048 单位M
            standalone下:
                --conf  spark.executor.memoryOverhead=2048 单位M
    
    8.解决数据倾斜
        1).数据倾斜
            HDFS
                HDFS集群中某些节点数据非常多,某些节点数据非常少
            Hive
                某张表中某个key对应非常多的数据,某些key对应少量的数据
            Spark
                RDD中某个partition对应数据量非常多,某些partition对应数据量非常少
        2).解决数据倾斜
            a).使用Hive ETL预处理数据
                场景:Spark要读取的数据源头是Hive表(有数据倾斜),业务是要经常操作这张表
                解决:在Hive中先对这张表聚合,Spark频繁操作的是聚合后的结果
            b).过滤少数倾斜的Key
                场景:业务中去掉少数倾斜的Key反而使结果更正确
                解决:sample抽样过滤掉这些倾斜的Key
            c).提高shuffle并行度
                前提:有多个Key不同
            d).双重聚合
                将相同的key随机加前缀聚合一次,然后将聚合后的结果去前缀再聚合一次
            e).将reduce join转换成map join 
                彻底避免shuffle操作,避免数据倾斜
                前提:两个RDD要join或者两张表要join,有一个RDD比较小或者一张表比较小
                解决:将小的广播到Executor+Transformation类操作 避免shuffle
            f).采样倾斜key并分拆join操作
                膨胀N倍
            g).使用随机前缀和扩容RDD进行join
]        

猜你喜欢

转载自blog.csdn.net/wyqwilliam/article/details/81394695