一、Spark简介 ---------------------------------------------------------- 1.快如闪电的集群计算 2.大规模快速通用的计算引擎 3.速度: 比hadoop 100x,磁盘计算快10x 4.使用: java / Scala /R /python 5.提供80+算子(操作符),容易构建并行应用。 6.通用: 组合SQL ,流计算 + 复杂分析。 7.运行: Hadoop, Mesos, standalone, or in the cloud,local. 8.DAG //direct acycle graph,有向无环图 二、Spark模块 -------------------------------------------------------- Spark core //核心模块 Spark SQL //SQL Spark Streaming //流计算 Spark MLlib //机器学习 Spark graph //图计算 三、安装Spark -------------------------------------------------------- 1.下载spark-2.1.0-bin-hadoop2.7.tgz .. 2.解压 .. 3.环境变量 [/etc/profile] SPARK_HOME=/soft/spark PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin /soft/spark/bin /soft/spark/sbin [source] $>source /etc/profile 4.验证spark $>cd /soft/spark $>./spark-shell 5.webui http://s100:4040/ 四、使用Spark -------------------------------------------------------- 1.进入终端 $> 2.sc ==> spark程序的入口点,封装了整个spark运行环境的信息 scala> sc sc res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2b5ca574 五、一句话实现WorldCount ---------------------------------------------------------- //加载文件,返回RDD[按行切割] scala> val rdd1 = sc.textFile("/home/ubuntu/downloads/1.txt"); rdd1: org.apache.spark.rdd.RDD[String] = /home/ubuntu/downloads/1.txt MapPartitionsRDD[1] at textFile at <console>:24 //开始按照',' 进行切割,然后压扁成一个集合,单个单词的集合 rdd1.flatMap(line => {line.split(",")}) //进行 单词和数量的映射 map(word => (word,1)) //reduce聚合,按照key val rdd2 = reduceByKey(_ + _) //查看单词统计的结果 scala> rdd2.collect res2: Array[(String, Int)] = Array((tom1,1), (4,1), (14,1), (7,1), (15,1), (5,1), (tom2,1), (6,1), (tom6,1), (2,1), (16,1), (3,1), (tom3,1), (tom4,1), (17,1), (12,1), (13,1), (tom5,1), (1,1), (11,1), (tom7,1)) //一句话实现wc scala> sc.textFile("/home/ubuntu/downloads/1.txt").flatMap(line => {line.split(",")}).map(word => (word,1)).reduceByKey(_ + _).collect //加单词过滤,屏蔽"tom"关键字 scala> sc.textFile("/home/ubuntu/downloads/1.txt") .flatMap(line => {line.split(",")}) .filter(!_.contains("tom")) .map(word => (word,1)) .reduceByKey(_ + _) .collect res6: Array[(String, Int)] = Array((4,1), (14,1), (7,1), (15,1), (5,1), (6,1), (2,1), (16,1), (3,1), (17,1), (12,1), (13,1), (1,1), (11,1)) 六、API ------------------------------------------------------ 1.[SparkContext] Spark功能的主要入口点。代表到Spark集群的连接,可以创建RDD、累加器和广播变量. 每个JVM只能激活一个SparkContext对象,在创建sc之前需要stop掉active的sc。 2.[RDD] resilient distributed dataset,弹性分布式数据集。等价于集合。以换行符作为文件分割 3.[SparkConf] spark配置对象,设置Spark应用各种参数,kv形式 七、scala编程 -- idea 引入spark类库,完成wordcount ------------------------------------------------------------- 1.创建spark模块 2.添加maven支持 <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies> 3.添加scala支持 4.编写Object -- WorldCountDemo import org.apache.spark.{SparkConf, SparkContext} object WorldCountDemo { def main(args: Array[String]): Unit = { //创建spark配置对象 val conf = new SparkConf(); //设置appname conf.setAppName("sparkwc"); //设置本地模式 conf.setMaster("local"); //创建核心 -- 上下文 val sc = new SparkContext(conf); val rdd1 = sc.textFile("d:\\calllog.log"); val rdd2 = rdd1.flatMap(line => line.split(",")); val rdd3 = rdd2.map(word => (word,1)); val rdd4 = rdd3.reduceByKey(_ + _); val r = rdd4.collect(); r.foreach(e => println(e)); } } 5.运行app,查看结果 6.java实现 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.*; /** * 单词统计java版 */ public class WorldCountDemoJava { public static void main(String [] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("wcjava"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD jrdd1 = jsc.textFile("d:\\calllog.log"); //压扁成单个单词 JavaRDD jrdd2 = jrdd1.flatMap(new FlatMapFunction<String,String>() { public Iterator call(String s) throws Exception { List<String> list = new ArrayList<String>(); String [] strs = s.split(","); for(String ss : strs) { list.add(ss); } return list.iterator(); } }); //完成 单词到数量的映射(word -- (word,1)) JavaPairRDD<String,Integer> jrdd3 = jrdd2.mapToPair(new PairFunction<String,String,Integer>() { public Tuple2<String,Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }); //开始统计 Map<String, Long> map = jrdd3.countByKey(); Set<String> set = map.keySet(); for(String s : set) { System.out.println(s + ":" + map.get(s)); } } } 八、提交作业到spark集群上运行 ------------------------------------------------ 1.导出jar包 2.复制到共享目录下 3.使用spark-submit提交命令,运行jar $> spark-submit --master local --name wc --class com.spark.demo.java.WorldCountDemoJava TestSpark-1.0-SNAPSHOT.jar /home/ubuntu/downloads/1.txt $> spark-submit --master local --name wc --class com.spark.demo.scala.WorldCountDemoScala TestSpark-1.0-SNAPSHOT.jar /home/ubuntu/downloads/1.txt 九、部署spark集群 ----------------------------------------------- 1.local nothing! spark-shell --master local; //默认 2.standalone 独立。 a)复制spark目录到其他主机 b)配置其他主机的所有环境变量 [/etc/profile] SPARK_HOME PATH c)配置master节点s100的slaves,并分发到所有节点 [/soft/spark/conf/slaves] s202 s203 s204 d)在s100上启动spark集群 /soft/spark/sbin/start-all.sh [为了避免和hadoop集群混淆,要进入到sbin目录下执行sh文件] e)查看进程 $>xcall.jps jps master //s100 worker //s200 worker //s300 worker //s400 e)webui http://s100:8080/ 十、提交作业jar作业到完全分布式spark集群 -------------------------------------------------------- 1.启动hadoop的hdfs集群 $> start-dfs.sh 2.put要进行单词统计的文件到hdfs 3.运行spark-submit $> spark-submit --master spark://s100:7077 --name wc --class com.spark.demo.java.WorldCountDemoJava TestSpark-1.0-SNAPSHOT.jar hdfs://s500:8020/data/spark/1.txt $> spark-submit --master spark://s100:7077 --name wc --class com.spark.demo.scala.WorldCountDemoScala TestSpark-1.0-SNAPSHOT.jar hdfs://s500:8020/data/spark/1.txt 十一、脚本分析 -------------------------------------------------------- [start-all.sh] sbin/spark-config.sh sbin/spark-master.sh //启动master进程 sbin/spark-slaves.sh //启动worker进程 [start-master.sh] sbin/spark-config.sh org.apache.spark.deploy.master.Master spark-daemon.sh start org.apache.spark.deploy.master.Master --host --port --webui-port ... [spark-slaves.sh] sbin/spark-config.sh slaves.sh //conf/slaves [slaves.sh] for conf/slaves{ ssh host start-slave.sh ... } [start-slave.sh] CLASS="org.apache.spark.deploy.worker.Worker" sbin/spark-config.sh for (( .. )) ; do start_instance $(( 1 + $i )) "$@" done $>cd /soft/spark/sbin $>./stop-all.sh //停掉整个spark集群. $>./start-all.sh //启动整个spark集群. $>./start-master.sh //启动master节点 $>./start-slaves.sh //启动所有worker节点 $s400>./start-slave.sh spark://s100:7077 //在s400上启动单个worker节点
大数据之Spark(一)--- Spark简介,模块,安装,使用,一句话实现WorldCount,API,scala编程,提交作业到spark集群,脚本分析
猜你喜欢
转载自blog.csdn.net/xcvbxv01/article/details/83716737
今日推荐
周排行