Spark day01
[
1.什么是Spark?与MR的区别?
Spark是开源的通用的计算框架,目的是为了使数据分析更快。MR也是计算框架。
区别?
1).MR是基于磁盘迭代,Spark是基于内存迭代。
2).Spark中有DAG有向无环图。
3).MR中只有map,reduce两个类,相当于Spark中两个算子。Spark中有三类算子(转换算子,行动算子,持久化算子)。
4).MR是细粒度资源申请,Spark是粗粒度资源申请。
2.Spark运行模式?
1).Local:在eclipse/IDEA中编写代码,在本地运行
2).Standalone:Spark自带的资源调度框架,支持分布式搭建。
3).Yarn:Hadoop生态圈内的资源调度框架。
4).Mesos:资源调度框架。
3.Spark核心RDD
1).RDD(Resilient Distributed Dateset),弹性分布式数据集。Spark底层操作数据都是基于RDD。
2).RDD五大特性:
a).RDD由一系列Partition组成。
b).函数(算子)是作用在partition上的。
c).RDD之间有依赖关系。
d).分区器是作用在K,V格式的RDD上。
e).partition提供最佳计算位置,利于处理数据的本地化。符合“计算移动,数据不移动”
3).注意:
a).sc.textFile(...)读取HDFS中文件的方法,底层调用的是MR读取HDFS中文件的方法,首先会split,每个split大小默认
与一个block大小相同,每个split与RDD中的一个partition对应。
b).什么是K,V格式的RDD?
RDD中元素是一个个的tuple2 二元组,这个RDD就是K,V格式的RDD。
c).哪里体现了RDD的弹性(容错)?
i).RDD之间有依赖关系
ii).partition个数可多可少。
d).哪里体现了RDD的分布式?
partition是分布在多个节点上的。
4.Spark代码流程?
1).val conf = new SparkConf().setMaster(...).setAppName(...)
2).val sc = new SparkContext(conf)
3).创建RDD。
4).对RDD使用Transformation类算子进行数据转换。
5).使用Action类算子触发Transformation类算子执行。
6).sc.stop()
5.Spark算子
1).Transformations ,转换算子,懒执行,需要Action类算子触发。
map/mapToPair,flatMap,filter,reduceByKey,sample,sortBy/sortByKey,groupByKey,join,leftOutJoin,rightOuterJoin,fullOuterJoin,distinct,union,intersection,subtract,repartition,coalesce,zip,zipWithIndex,mapPartitions,
mapPartitionWithIndex,cogroup,mapValues,aggreagateByKey,combineByKey
2).Action,行动算子,触发Action类算子执行。Spark应用程序中(Spark Application)有一个Action算子就有了一个job。
take,frist,foreach,count,collect,reduce,foreachPartition,countByKey,countByValue
3).持久化算子。
a).cache
默认将数据持久化到内存,cache()=persist()=persist(StorageLevel.MEMORY_ONLY)
b).persist
可以手动指定数据持久化级别。
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
"_2"代表有副本数,尽量避免使用"DISK_ONLY"级别。
c).checkpoint
将数据可以持久化到磁盘,指定的checkpoint目录中,切断checkpointRDD之前的依赖关系,使之后的RDD依赖于checkpoint目录中的数据。需要设置checkpoint路径。
RDD lineage 非常长,每一个RDD之间逻辑复杂,计算耗时。对一个RDD进行checkpoint之前最好先cache下。
注意:
a).cache和persist注意事项:
i).cache和persist是懒执行,需要Action算子触发。
ii).对一个RDD进行cache/persist之后,可以赋值给一个变量,下次直接使用这个变量就是使用的持久化的数据。
iii).cache/persist之后不能紧跟Action类算子。
b).checkpoint执行流程:
i).Spark任务执行完成之后,会从后往前回溯,找到CcheckpointRDD做标记。
ii).回溯完成之后,重新计算标记RDD的数据,将数据放入checkpoint目录中。
iii).切断RDD之间的依赖关系。
6.Spark Standalone集群搭建
1).解压,上传
2).在../conf/slaves 中配置Worker信息
3).在../conf/spark-env.sh 中配置Master信息
SPARK_MASTER_IP
SPARK_MASTER_PORT
SPARK_WORKER_CORES
SPARK_WORKER_MEMORY
4).发送到其他节点 scp -r /spark1.6 nodex:`pwd`
5).在Master节点启动集群:../sbin/start-all.sh
7.搭建客户端
原封不动将Spark安装包复制到一台新的节点,在这个节点中../bin/spark-submit提交任务,这个节点就是客户端。
8.SparkPI 提交任务
9.创建RDD?
java:
sc.textFile(xxx,minNumPartitions)
sc.parallelize(xx,numpartition)
sc.parallelizePairs(seq[Tuple2<k,v>]) 将数据转换成K,V格式的RDD
scala:
sc.textFile(xxx,minNumPartitions)
sc.parallelize(xx,numpartition)
sc.makeRDD(xx,numpartition)
10.Spark 基于Yarn 提交任务:
在客户端 ../conf/spark-env.sh 中配置 HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
]
Spark day02
[
1.Spark基于Standalone提交任务
1).client
命令:
./spark-submit --master spark://node1:7077 --class ... jar ... 参数
./spark-submit --master spark://node1:7077 --deploy-mode client --class .. jar .. 参数
过程:
a).在客户端提交Spark应用程序,会在客户端启动Driver。
b).客户端向Master申请资源,Master找到资源返回。
c).Driver发送task。
注意:
client方式提交任务,在客户端提交多个application,客户端会为每个application都启动一个Driver,Driver与集群Worker节点有大量通信,这样会造成客户端网卡流量激增。client方式提交任务适用于程序测试,不适用于真实生产环境。在客户端可以看到task执行情况和计算结果。
2).cluster
命令:
./spark-submit --master spark://node1:7077 --deploy-mode cluster --class ... jar ... 参数
过程:
a).客户端提交application,客户端首先向Master申请启动Driver
b).Master收到请求之后,随机在一台Worker节点上启动Driver
c).Driver启动之后,向Master申请资源,Master返回资源。
d).Driver发送task.
注意:
cluster方式提交任务,Driver在集群中的随机一台Worker节点上启动,分散了client方式的网卡流量激增问题。
cluster方式适用于真实生产环境,在客户端看不到task执行情况和执行结果,要去WEBUI中去查看。
2.Spark基于Yarn提交任务
1).client
命令:
./spark-submit --master yarn --class ... jar ... ....
./spark-submit --master yarn-client --class ...jar ....
./spark-submit --master yarn --deploy-mode client --class ..jar ...
过程:
a).客户端提交application,Driver会在客户端启动
b).客户端向ResourceManager申请启动ApplicationMaster
c).ResourceManager收到请求之后,随机在一台NodeManager中启动ApplicationMaster
d).ApplicationMaster启动之后,向ResourceManager申请资源,用于启动Executor
e).ResourceManager收到请求之后,找到资源返回给ApplicationMaster
f).ApplicationMaster连接NodeManager启动Executor
g).Executor启动之后会反向注册给Driver
h).Driver发送task到Executor执行
注意:
client方式提交任务,在客户端提交多个application,客户端会为每个application都启动一个Driver,Driver与集群Worker节点有大量通信,这样会造成客户端网卡流量激增。client方式提交任务适用于程序测试,不适用于真实生产环境。在客户端可以看到task执行情况和计算结果。
总结:
Driver功能:
i).发送task
ii).监控task,回收结果
iii).申请资源
2).cluster
命令:
./spark-submit --master yarn-cluster --class ...jar .... ..
./spark-submit --master yarn --deploy-mode cluster --class ..jar ... ..
过程:
a).客户端提交Application,首先客户端向ResourceManager申请启动ApplicationMaster
b).ResourceManager收到请求之后,随机在一台NodeManager中启动ApplicationMaster,这里ApplicationMaster就相当于是Driver
c).ApplicationMaster启动之后,向ResourceManager申请资源,用于启动Executor
d).ResourceManager收到请求之后,找到资源返回给ApplicationMaster
e).ApplicationMaster连接NodeManager启动Executor
f).Executor启动之后会反向注册给ApplicationMaster(Driver)
g).Driver发送task到Executor执行
注意:
cluster方式提交任务,Driver在集群中的随机一台Worker节点上启动,分散了client方式的网卡流量激增问题。
cluster方式适用于真实生产环境,在客户端看不到task执行情况和执行结果,要去WEBUI中去查看。
总结:
ApplicationMaster的作用:
i).申请资源
ii).启动Executor
iii).任务调度
3.术语解释
任务层面:application->job->stage->task
资源层面:Master->Worker->Executor->ThreadPool
4.RDD宽窄依赖
宽依赖(shuffle):
父RDD与子RDD partition之间的关系是一对多
窄依赖:
父RDD与子RDD partition之间的关系是一对一
父RDD与子RDD partition之间的关系是多对一
5.Stage
1).Stage是由一组并行的task组成的。RDD之间依赖关系形成一个DAG有向无环图,DAG有向无环图按照RDD之间的宽窄依赖划分stage。
2).spark计算模式:
pipeline管道计算模式,相当于高阶函数展开方式计算
思想,1+1+1=3
MR:1+1=2 2+1=3
3).在管道中的数据什么时候落地?
a).shuffle write
b).对RDD持久化
4).Stage的并行度?由stage中的finalRDD中partition个数决定的。
5).如何提高stage的并行度?
reduceByKey(xx,numpartitions),groupByKey(xx,numpartitions),join(xx,numpartition)
6.Spark任务调度和资源调度
1).资源调度
a).集群启动,Worker向Master汇报资源,Master掌握了集群资源情况
b).当在客户端提交任务的时候,运行任务,new SparkContext,会创建两个对象:DAGScheduler和TaskScheduler
c).TaskScheduler向Master申请资源
d).Master找到满足资源的节点启动Executor
e).Executor启动之后,反向注册给Driver,Driver掌握了集群计算资源
2).任务调度
a).当application运行到Action算子时,触发job,开始任务调度
b).每个job中有一系列RDD的依赖关系,形成一个DAG有向无环图。
c).DAG有向无环图被DAGScheduler按照RDD之间的宽窄依赖关系切割job划分stage
d).DAGScheduler将stage以taskSet的形式提交给TaskScheduler
e).TaskScheduler遍历set,拿到一个个task,发送到Executor中ThreadPool执行
f).TaskScheduler监控task回收结果
注意:
a).TaskScheduler发送task会失败,重试3次,如果依然失败,由DAGScheduler重试stage,重试4次,如果依然失败,stage所在的job就失败了,job失败,application就失败了。
b).TaskScheduler不仅可以重试失败的task,还可以重试执行缓慢的task,这就是Spark推测执行机制,默认关闭的,对于ETL的业务一定关闭推测执行。对于数据一直执行不完,首先看有没有数据倾斜,是否开启了推测执行。
7.粗粒度资源申请和细粒度资源申请
粗粒度资源申请:
任务执行之前,先将所有的资源申请到,task执行的时候不需要自己申请资源,加快了执行速度。如果多数task执行完成,只有一个task没有执行完,那么这批申请到的资源不会被释放,只有所有的task执行完成之后才会释放所有资源。会有集群资源不能充分利用的情况。
细粒度资源申请:
任务执行之前,不会申请所有的资源,task执行时,自己申请资源,自己释放资源,任务执行就慢了,但是集群资源可以充分利用
]
Spark day03
[
1.PV&UV(统计24小时的pv,uv)
PV:page view 有重复
UV:unquie Vistor 无重复
2.spark-submit参数
--master
--class
--deploy-mode
--executor-cores
--executor-memory
--total-executor-cores
--driver-cores
--driver-memory
--files
--jars 多个jar包之间用逗号隔开
--driver-class-path 多个jar包之间用分号隔开
3.源码
资源调度源码:
结论:
1.Executor在集群中是分散启动的
2.如果提交任务不指定--executor-cores
集群会在每台Worker节点上启动一个Executor,这个Executor会使用这台节点的所有的core和1G内存
3.如果想要在一个节点上启动多个Executor,一定要指定--executor-cores,执行任务要指定--total-executor-cores
4.启动Executor不仅和core有关,还和内存有关
任务调度源码:
从Action算子看
4.二次排序
自己封装一个类,实现Comparable<SecondSortKey>接口,将要排序的字段当做属性放入类,类中实现compareTo方法,然后使用spark对Key 使用sortByKey排序。
排序的字段大于2以上的都叫二次排序。
5.分组取topN
1).原生的集合工具Collections.sort(list,new comparator<xxx>)
2).自己定义定长数组
]
Spark day04
[
1.广播变量
当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。
如果使用广播变量在每个Executor端中只有一份Driver端的变量副本。
val broadcast = sc.broadCast(...)
broadcast.value()
注意:
1).不能将RDD广播出去,可以将RDD的结果广播出去
2).广播变量在Driver定义,在Exector端不可改变,在Executor端不能定义
2.累计器
相当于集群中的统筹的大变量。
自定义累计器 ,实现AccumulatorParam<xxx> 实现三个方法zero,addAccumulator,addInPlace
val accumulator = sc.accumulator(xxx,new AccumulatorParam...)
accumulator.add(xxxx)
注意:
累计器只能在Driver定义初始化,在Executor端更新,在Executor不能accumulator.value获取值。
3.Spark WEBUI
【Jobs,Stages,StorageLevel,Environment,Executors,SQL,Streaming 】
4040端口可以看到当前application中的所有的job,点击job可以看到当前job下对应的stage,点击stage出现stage下的task。
4.搭建HistoryServer
1).在客户端中../conf/spark-defaults.conf文件中配置:
//开启记录事件日志的功能
spark.eventLog.enabled true
//设置事件日志存储的目录
spark.eventLog.dir hdfs://node1:9000/spark/test
//设置HistoryServer加载事件日志的位置
spark.history.fs.logDirectory hdfs://node1:9000/spark/test
//日志优化选项,压缩日志
spark.eventLog.compress true
2).启动HistoryServer:
在../conf/sbin/start-history-server.sh
3).访问:node4:18080
5.端口:
50070:
8020:
9000:
2181:
9083:
9092:
8080:
8081:
4040:
7077:
18080:
6.Master HA
1).System File
2).Zookeeper
a).在集群的每台节点../conf/spark-env.sh 中配置:
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181
-Dspark.deploy.zookeeper.dir=/sparkmaster0821"
b).在StandBy-Master节点中修改../conf/spark-env.sh:SPARK_MASTER_IP = node2
c).启动Zookeeper
d).在Alive-Master节点上启动集群:../sbin/start-all.sh
e).在StandBy-Master节点上启动StandBy-Master ../sbin/start-master.sh
d).测试。
注意:
a).这里的Master HA是针对的standAlone集群
b).主备切换过程中,不影响已经在集群中运行的程序
c).主备切换过程中,不能申请资源。
7.Spark shuffle
Spark1.2之前默认使用的是HashShuffle(hashPartitioner),1.2之后使用的sortShuffle(rangePartitioner)。
1).HashShuffle
a).普通机制
产生磁盘小文件的个数:
M(map task个数)*R(reduce task个数)
流程:
i).map task处理完数据之后,每个task将结果写入buffer缓存区(与reduce task个数一致),每个buffer缓冲区大小是32K
ii).每个buffer满32k,溢写磁盘,每个buffer对应磁盘一个小文件。
iii).reduce task 到不同节点拉取数据
问题:产生磁盘小文件太多?
i).写磁盘文件的对象多
ii).拉取数据读磁盘文件对象多
iii).创建对象多,容易造成gc,gc还不满足内存使用,就会OOM
OOM问题:
i).Driver端回收RDD数据
ii).Executor 端创建对象非常多,可能会有OOM(0.2 task内存)
iii).Executor 端拉取shuffle数据,如果5个task一次拉取的数据量在Executor0.2的shuffle内存中放不下
iiii).Executor端对RDD进行缓存或者广播变量的RDD数据量比较大(0.6内存)
b).合并机制
产生磁盘小文件:
C(core 的个数)*R(reduce task个数)
过程:
与普通机制一样,只是一个core内运行的task 会公用一份buffer缓存区。
2).SortShuffle
a).普通机制
产生磁盘小文件个数:
2*M(map task个数)
流程:
i).map task 将处理的结果写入一个5M的内存结构中
ii).SortShuffle中会估算这个内存结构大小,当下一次结果放不下时,会申请2*估计-当前
iii).如果申请的到内存,继续往数据结构中写数据,如果申请不到,溢写磁盘,每批次是1万条溢写,溢写过程中会有排序。
iv).溢写的数据在磁盘上最终形成两个文件:一个索引文件一个数据文件
v).reduce 拉取数据首先解析索引文件,再去拉取数据
b).bypass机制
产生磁盘小文件个数:
2*M(map task个数)
流程:
与普通的机制对比,少了排序。当reduce task个数小于spark.shuffle.sort.bypassMergeThreshold (默认200)会开启bypass机制。
8.shuffle 文件的寻址
1).MapOutputTracker:磁盘小文件
MapOutputTrackerMaster(Driver端)
MapOutputTrackerWorker(Executor端)
2).BlockManager:块管理者
BlockManagerMaster(Driver端):
DiskStore:管理磁盘数据
MemoryStroe:管理内存数据
ConnectionManager:负责连接其他BlockManager
BlockTransferService:负责拉取数据
BlockManagerSlaves(Executor端):
DiskStore:管理磁盘数据
MemoryStroe:管理内存数据
ConnectionManager:负责连接其他BlockManager
BlockTransferService:负责拉取数据
3).寻址:
a).map task处理完的数据,将结果和数据位置封装到MapStatus对象中,通过MapOutputTrackerWorker汇报给Driver中的
MapOutputTrackerMaster。Driver中掌握了数据位置。
b).reduce 端处理数据,首先向本进程中的MapOutputTrackerWorker要磁盘文件位置,再向Driver中的MapOutputTrackerMaster
要磁盘数据位置,Driver返回磁盘数据位置。
c).reduce 拿到数据位置之后,通过BlockManager中的ConnectionManager连接数据所在的节点,连接上之后,通过BlockManager
中的BlockTransferService拉取数据
d).BlockTransferService拉取数据默认启动5个task,这5个task默认一次拉取的数据量不能超过48M。
e).拉取过来的数据放在Executor端的shuffle聚合内存中(spark.shuffle.memeoryFraction 0.2),
如果5个task一次拉取的数据放不到shuffle内存中会有OOM,如果放下一次,不会有OOM,以后放不下的会放磁盘。
9.内存管理
Spark1.6之前使用的是静态内存管理,spark1.6之后使用的是统一内存管理
静态内存管理:
0.2:task运行
0.2:
0.2*0.2:预留
0.2*0.8:shuffle聚合内存
0.6:
0.1*0.6:预留
0.9*0.6:
0.6*0.9*0.2:反序列化数据
0.6*0.9*0.8:RDD缓存和广播变量
统一内存管理:
300M预留
(总-300M)*0.25:task运行
(总-300M)*0.75:
(总-300M)*0.75*0.5:shuffle聚合内存
(总-300M)*0.75*0.5:RDD缓存和广播变量
10.Shuffle 调优
1).
spark.shuffle.file.buffer 32k
spark.reducer.maxSizeInFlight 48M
spark.shuffle.io.maxRetries 3
spark.shuffle.io.retryWait 5s
spark.shuffle.memoryFraction 0.2
spark.shuffle.manager hash|sort
spark.shuffle.sort.bypassMergeThreshold 200 --sortShuffle
spark.shuffle.consolidateFiles true|false --hashShuffle
2).使用
在conf.set(k,v) 级别最高
在提交任务 ./spark-submit --conf 第二
在../conf/spark-defaults.conf中配置 第三
]
Spark day05
[
1.SparkSQL
可以使用sql对分布式的数据查询,先有Hive->Shark->SparkSQL。
要想使用SQL对分布式数据查询,首先要创建出来DataFrame,创建DataFrame使用SQLContext()
conf.set("spark.sql.shuffle.partitions", "200")----SparkSQL sql有join,groupBy 默认分区数
2. Hive on Spark & Spark on Hive
Hive on Spark:
Hive:解析,优化,存储
Spark:执行引擎
Spark on Hive:
Hive:存储
Spark:解析,优化,执行引擎
3.DataFrame
SparkSQL操作基于DataFrame,DataFrame底层是RDD<Row>,既有数据,又有列的Schema信息
4.创建DataFrame的方式
1).读取json格式的文件
a).json文件不能嵌套
b).读取的两种方式:
DataFrame df = sqlContext.read().format("json").load("./sparksql/json");
DataFrame df2 = sqlContext.read().json("sparksql/json");
c).加载过来的DataFrame 列会按照Ascii码排序
d).可以使用DataFrame的API操作DataFrame,也可以将DataFrame注册成临时表
df.registerTempTable("jtable");
2).读取json格式的RDD
3).读取普通的RDD加载成DataFrame
a).反射的方式(少)
JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
private static final long serialVersionUID = 1L;
@Override
public Person call(String line) throws Exception {
Person p = new Person();
p.setId(line.split(",")[0]);
p.setName(line.split(",")[1]);
p.setAge(Integer.valueOf(line.split(",")[2]));
return p;
}
});
DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
i).自定类要实现序列化接口
ii).自定义类的访问级别是public
iii).加载过来的DataFrame列也会按照Ascii码排序
b).动态创建Schema(多)
List<StructField> asList =Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
i).加载过来的DataFrame列不会按照Ascii码排序
4).读取parquent文件加载成DataFrame
读取:
DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet");
load = sqlContext.read().parquet("./sparksql/parquet");
存储:
df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
df.write().mode(SaveMode.Ignore).parquet("./sparksql/parquet");
5).读取Mysql中的数据加载成DataFrame
读取:
a).
Map<String, String> options = new HashMap<String,String>();
options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "123456");
options.put("dbtable", "person");
DataFrame person = sqlContext.read().format("jdbc").options(options).load();
b).
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "123456");
reader.option("dbtable", "score");
DataFrame score = reader.load();
存储:
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "123456");
result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties);
6).读取Hive中的数据加载成DataFrame
要配置Spark on Hive,如果SparkSQL要读取数据是Hive中数据,要使用HiveContext,HiveContext是SQLContext的子类。
读取:
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("USE spark");
DataFrame df = hiveContext.table("good_student_infos");
存储:
hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
5.配置Spark on Hive
1).在客户端创建../conf/hive-site.xml
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1:9083</value>
</property>
</configuration>
2).启动Hive,在服务端启动metaStore服务,hive --service metastore
3).spark-shell 测试
6.UDF & UDAF
1).UDF:用户自定义函数,user defined function
可以自定义类实现UDFX接口,传几个参数,对应实现UDFX
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1) throws Exception {
return t1.length();
}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();
2).UDAF:用户自定义聚合函数,user defined aggreagatefunction
sqlContext.udf().register("StringCount",new UserDefinedAggregateFunction() {
/**
*
*/
private static final long serialVersionUID = 1L;
/**
* 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0);
}
/**
* 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑
* buffer.getInt(0)获取的是上一次聚合后的值
* 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合
* 大聚和发生在reduce端.
* 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
*/
@Override
public void update(MutableAggregationBuffer buffer, Row arg1) {
buffer.update(0, buffer.getInt(0)+1);
}
/**
* 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
* buffer1.getInt(0) : 大聚合的时候 上一次聚合后的值
* buffer2.getInt(0) : 这次计算传入进来的update的结果
* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
}
/**
* 在进行聚合操作的时候所要处理的数据的结果的类型
*/
@Override
public StructType bufferSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer", DataTypes.IntegerType, true)));
}
/**
* 最后返回一个和dataType方法的类型要一致的类型,返回UDAF最后的计算结果
*/
@Override
public Object evaluate(Row row) {
return row.getInt(0);
}
/**
* 指定UDAF函数计算后返回的结果类型
*/
@Override
public DataType dataType() {
return DataTypes.IntegerType;
}
/**
* 指定输入字段的字段及类型
*/
@Override
public StructType inputSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));
}
/**
* 确保一致性 一般用true,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。
*/
@Override
public boolean deterministic() {
return true;
}
});
sqlContext.sql("select name ,StringCount(name) as strCount from user group by name").show();
7.开窗函数
sql中分组取topN问题。
row_number() over (partition by xxx order by xxx ) rank rank从1开始
DataFrame result = hiveContext.sql("select riqi,leibie,jine "
+ "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
]
Spark day06
[
1.SparkStreaming
1).SparkStreaming是流式处理框架,7*24小时不间断运行,微批处理。
2).与Storm的区别:
i).Storm是纯实时处理数据,SparkStreaming是微批处理数据
ii).Storm擅长处理汇总型业务,SparkStreaming擅长处理复杂的业务,可以使用SpakrCore,SparkSQL
iii).Storm可以动态调度资源,SparkStreaming1.2之后可以
v).Storm事务相对SparkStreaming比较完善
3).接受处理数据过程:
要使用一个task接受数据,SparkStreaming将一段时间内的接受来的数据当做一批次处理,这个一段时间就是BatchInterval,
将接受数据每隔BatchInterval封装到一个batch中,这个batch又被封装到一个RDD中,这个RDD又被封装到一个DStream中。
每隔batchInterval,就会产生一个DStream,后面要有task处理这个DStream。
假设产生一批次的时间是5s,集群处理这批次数据的时间是3s,集群休息了2s.
假设产生一批次的时间是5s,集群处理这批次数据的时间是8s,集群中就会有任务堆积。
参照WEBUI看有没有任务堆积。
2.SparkStreaming读取Socket端口数据
1.代码:
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountOnline");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node5", 9999);
// JavaSparkContext sc = new JavaSparkContext(conf);
// JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(5));
// JavaSparkContext sparkContext = jsc.sparkContext();
2.注意:
* 1、local的模拟线程数必须大于等于2 因为一条线程被receiver(接受数据的线程)占用,另外一个线程是job执行
* 2、Durations时间的设置,就是我们能接受的延迟度,这个我们需要根据集群的资源情况以及
监控每一个job的执行时间来调节出最佳时间。
* 3、 创建JavaStreamingContext有两种方式 (sparkconf、sparkcontext)
* 4、业务逻辑完成后,需要有一个output operator
* 5、JavaStreamingContext.start()straming框架启动之后是不能在次添加业务逻辑
* 6、JavaStreamingContext.stop()无参的stop方法会将sparkContext一同关闭,stop(false) ,默认为true,会一同关闭
* 7、JavaStreamingContext.stop()停止之后是不能在调用start
3.SparkStreaming 算子
Transformation类:
updateStateByKey:更新SparkSteaming程序启动以来所有的key的状态。
1).要设置checkpoint
2).多久往checkpoint中写一份数据
如果batchInterval小于10s,10s写一次,如果batchInterval大于10s,那么batchInterval写一次。
reduceByKeyAndWindow:
1).
window length:窗口长度
slide length:滑动间隔
2).普通:
i).代码:
JavaPairDStream<String, Integer> searchWordCountsDStream =
searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}, Durations.seconds(15), Durations.seconds(5));
ii).不用设置checkpoint目录
3).优化:
i).代码:
JavaPairDStream<String, Integer> searchWordCountsDStream =
searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
},new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 - v2;
}
}, Durations.seconds(15), Durations.seconds(5));
ii).必须要设置checkpoint目录
transform:
1).transform 可以拿到DStream中的RDD,做DStream中RDD类型的转换
2).transform
算子内,拿到的RDD算子外,代码是在Driver端执行的,每个batchInterval执行一次,可以做到动态改变广播变量。
OutPutOperator类:
print:
foreachRDD:
1).拿到DStream中的RDD,对RDD要使用RDD的action类算子触发执行。
2).foreachRDD算子内,拿到的RDD的算子外的代码,是在Driver端执行的,可以做到动态改变广播变量。
saveAsTextFile:
saveAsHadoopFile:
saveAdObjectFile:
4.Kafka
1).kafka是分布式消息系统,生产者消费者模式,数据默认保存7天。
2).概念:
producter:
消息的生产者,自己决定往哪个partition中生产数据,i).轮循 ii).hash
consumer:
消息消费者,自己在zookeeper中维护消费者偏移量,每个消费者都有自己的消费者组,不同的消费者组消费同一个topic互不
影响。同一个消费者组内消费同一个topic,这个topic中的数据只能被消费一次。
broker:
组成kafka集群的节点,之间没有主从关系,通过zookeeper协调。一个broker可以管理多个partitioin,负责消息的读写,
存储。
topic:
一类消息/消息队列,topic是由partition组成的,有多少个?创建的时候可以指定。topic中的一个partition只能被同一个消费者组
中的一个消费者同时消费。
partition:
存储数据的文件,partition中有消息的offset,每个partition都由一个broker管理(Leader),每个partition都有副本,
有几个?创建topic的时候可以指定。
zookeeper:
存储原数据,topic,broker,partition,offset
3).集群搭建:
a).上传kafka_2.10-0.8.2.2.tgz包到三个不同节点上,解压。
b).配置../ kafka_2.10-0.8.2.2/config/server.properties文件
broker.id:Integer值 从0开始
log.dirs:真实存储数据位置
zookeeper.connect:zookeeper集群
c).启动zookeeper集群,在每个节点上启动kafka:
bin/kafka-server-start.sh config/server.properties
4).命令:
创建topic:
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic topic2017 --partitions 3 --replication-factor 3
用一台节点控制台来当kafka的生产者:
./kafka-console-producer.sh --topic topic2017 --broker-list node1:9092,node2:9092,node3:9092
用另一台节点控制台来当kafka的消费者:
./kafka-console-consumer.sh --zookeeper node3:2181,node4:2181,node5:2181 --topic topic2017
查看kafka中topic列表:
./kafka-topics.sh --list --zookeeper node3:2181,node4:2181,node5:2181
查看kafka中topic的描述:
./kafka-topics.sh --describe --zookeeper node3:2181,node4:2181,node5:2181 --topic topic2017
5).删除kafka中的数据
a).在kafka集群中删除topic,当前topic被标记成删除。在每台broker节点上删除当前这个topic对应的真实数据。
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --delete --topic t1205
b).进入zookeeper客户端,删除topic信息
rmr /brokers/topics/t1205
c).删除zookeeper中被标记为删除的topic信息
rmr /admin/delete_topics/t1205
6).leader的均衡机制
5.Driver HA
/**
* checkpoint 保存:
* 1.配置信息
* 2.DStream操作逻辑
* 3.job的执行进度
* 4.offset
*/
final String checkpointDirectory = "./checkpoint";
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(checkpointDirectory,conf);
}
};
/**
* 获取JavaStreamingContext 先去指定的checkpoint目录中去恢复JavaStreamingContext
* 如果恢复不到,通过factory创建
*/
JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
6.SparkStreaming+kafka整合
1).Receiver模式
a).receiver模式使用zookeeper管理offset,要使用一个task接收kafka中的数据,会有丢失数据的问题,开启WAL机制将数据备份到checkpoint目录中一份,避免数据丢失,开启WAL机制之后会降低任务总体执行效率,延长时间。
b).receiver模式并行度:
spark.sparkstreaming.blockInterval 200 ,降低这个值,提高了生成DStream中的RDD的partition个数,建议blockInterval不能小于50ms
2).Direct模式
a).Direct模式spark自己管理offset,放在内存中,如果设置了checkpoint目录,那么在checkpoint中也有一份offset
b).Direct模式并行度
与读取的topic中的partition个数一致,增加topic中partition的个数就可以提高并行度
7.SparkStreaming配置项:
spark.streaming.receiver.writeAheadLog.enable 默认false没有开启
spark.streaming.blockInterval 默认200ms
spark.streaming.backpressure.enabled 默认false
spark.streaming.receiver.maxRate 默认没有设置
spark.streaming.stopGracefullyOnShutdown false
spark.streaming.kafka.maxRatePerPartition not set
8.使用zookeeper手动管理checkpoint
...
]
Spark 调优
[
1.资源调优
1).搭建集群
在spark安装包的conf下spark-env.sh
SPARK_WORKER_CORES
SPARK_WORKER_MEMORY
SPARK_WORKER_INSTANCE
2).提交任务的时候
提交命令选项:(在提交Application的时候使用选项)
--executor-cores
--executor-memory
--total-executor-cores
配置信息:(在Application的代码中设置
在Spark-default.conf中设置)
spark.executor.cores
spark.executor.memory
spark.max.cores
2.并行度调优
1).降低读取文件的block ,sc.textFile(xxx)
2).sc.textFile(xxx,minnumPartitions)
3).sc.parallelize(xx,numpartitions)
4).sc.markRDD(xx,numpartitions)
5).sc.parallelizePairs(xx,numpartitions)
6).reduceByKey(xx,numpartitions)/join(xxx,numpartitions)
7).repartition(coalesce(numparititons,true))/coalesce
8).自定义分区器
9).spark.default.parallelism 生成的RDD的默认并行度
10).spark.sql.shuffle.partitions 默认200,sql执行有shuffle类的操作的分区数
11).SparkStreaming+Receiver spark.streaming.blockInterval=200ms
12).SparkStreming+Direct 读取的topic中的partition个数一致
3.代码调优
1).避免创建重复的RDD
2).复用同一个RDD
3).对多次使用的RDD进行持久化
4).尽量避免使用shuffle类的算子 广播变量+filter/map...
5).使用map-side预聚合的shuffle操作,尽量使用有combiner的shuffle类算子。
combiner概念:
在map端,每一个map task计算完毕后进行的局部聚合
combiner好处:
a) 降低shuffle write写磁盘的数据量。
b) 降低shuffle read拉取数据量的大小。
c) 降低reduce端聚合的次数。
6).尽量使用高性能的算子
使用reduceByKey替代groupByKey
使用mapPartitions替代map
使用foreachPartition替代foreach
filter后使用coalesce减少分区数
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
使用repartition和coalesce算子操作分区
7).使用广播变量
8).使用Kryo优化序列化性能
9).优化数据结构
使用基本数据类型代替字符串
使用字符串代替对象
使用数组代替集合
10).使用高性能的库fastutil
4.数据本地化
数据本地化级别:
1) PROCESS_LOCAL:
task处理的数据在本Executor进程的内存中
2) NODE_LOCAL
task处理数据在本Executor所在的节点的磁盘中
task处理的数据在本Executor所在节点的其他Executor的内存中
3) NO_PREF
task处理的数据在数据库中
4) RACK_LOCAL
task处理的数据在同机架不同节点的Executor内存中
task处理的数据在同机架不同节点的磁盘上
5) ANY
task处理的数据在其他机架上
怎么调优:
• spark.locality.wait 3s
• spark.locality.wait.process
• spark.locality.wait.node
• spark.locality.wait.rack
5.内存调优
堆内存不足造成的影响:
1) 频繁的minor gc。
2) 老年代中大量的短生命周期的对象会导致full gc。
3) gc 多了就会影响Spark的性能和运行的速度。
怎么调?
1) 提高Executor总体内存的大小
2) 降低储存内存比例或者降低聚合内存比例
6.Spark Shuffle调优
spark.shuffle.file.buffer 32k
spark.reducer.MaxSizeFlight 48M
spark.shuffle.io.maxRetries 3
spark.shuffle.io.retryWait 5s
spark.shuffle.manager hash|sort
spark.shuffle.sort.bypassMergeThreshold 200----针对SortShuffle
spark.shuffle.consolidateFiles false----针对HashShuffle
7.调节Executor的堆外内存
节点之间连接默认等待时间:
--conf spark.core.connection.ack.wait.timeout=300
堆外内存调节:
yarn下:
--conf spark.yarn.executor.memoryOverhead=2048 单位M
standalone下:
--conf spark.executor.memoryOverhead=2048 单位M
8.解决数据倾斜
1).数据倾斜
HDFS
HDFS集群中某些节点数据非常多,某些节点数据非常少
Hive
某张表中某个key对应非常多的数据,某些key对应少量的数据
Spark
RDD中某个partition对应数据量非常多,某些partition对应数据量非常少
2).解决数据倾斜
a).使用Hive ETL预处理数据
场景:Spark要读取的数据源头是Hive表(有数据倾斜),业务是要经常操作这张表
解决:在Hive中先对这张表聚合,Spark频繁操作的是聚合后的结果
b).过滤少数倾斜的Key
场景:业务中去掉少数倾斜的Key反而使结果更正确
解决:sample抽样过滤掉这些倾斜的Key
c).提高shuffle并行度
前提:有多个Key不同
d).双重聚合
将相同的key随机加前缀聚合一次,然后将聚合后的结果去前缀再聚合一次
e).将reduce join转换成map join
彻底避免shuffle操作,避免数据倾斜
前提:两个RDD要join或者两张表要join,有一个RDD比较小或者一张表比较小
解决:将小的广播到Executor+Transformation类操作 避免shuffle
f).采样倾斜key并分拆join操作
膨胀N倍
g).使用随机前缀和扩容RDD进行join
]