一、Spark集群的测试
当我们安装好Spark后,需要对Spark做一些测试,看新安装的Spark集群是否可以使用,这个时候有如下几种方法验证和测试:
①使用jps命令查看下进程,在主节点上org.apache.spark.deploy.master.Master进程会启动,从节点上则会启动org.apache.spark.deploy.worker.Worker进程,如果启动了history server,进程中还会有org.apache.spark.deploy.history.HistoryServer进程,如果都存在这可以说明Spark集群启动没有问题;
②访问下web页面,看能否正常访问,访问spark的ui界面http://${master.ip}:8080/,访问spark history server的ui界面http://${history.server.ip}:18080/,如果启动了spark-shell,则可以访问Spark Jobs的WEB页面http://${master.ip}:4040/;
③也可以提交下Spark自带的示例代码,Spark Pi,提交后看是否可以正常运行,例如Spark 2.1.2安装成功后,会在安装目录下$SPARK_HOME/examples/jars/目录下有一个spark-examples_2.11-2.1.2.jar的jar包,提交次jar运行,以测试集群是否可用,因为是测试,所以以standalone-client模式提交Spark Pi:
[root@cdh4 spark]# ./bin/spark-submit --master spark://cdh4:7077 --deploy-mode client \
--executor-memory 2g --total-executor-cores 2 --driver-memory 2G \
--class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.2.jar 100
试,所以以st成功运行,结果如下,表示集群安装成功
同时还可以查看Spark UI,如下
二、准备Spark Pi源码
1、首先我们本地需要创建一个Spark项目,我们可以用Maven来创建,相对详细的创建过程可以参考我的另一篇博客https://blog.csdn.net/github_39577257/article/details/81151137
2、打开Spark安装目录,进入如下目录
$SPARK_HOME/examples/src/main/scala/org/apache/spark/examples/
在这个目录下我们会看到有很多官方提供的源码示例,本次主要分析SparkPi.scala的源码,因此将源码SparkPi.scala打开复制到我们创建的Spark项目。
三、源码
在本地可以创建一个SparkPIDemo 的Scala类,继承App,这样这个类中写的代码可以直接运行,相当于写在了main方法中,
将源码复制进来,方便查看
package com.yore.spark
import scala.math.random
import org.apache.spark.sql.SparkSession
/**
*
* 求圆周率的方法是--利用概率求圆周率
* 假设有一个单位正四边形,内有个一个内切圆,
* 那么落入圆内的概率就是S圆/S正
*
* Created by yore on 2017-12-14 15:06
*/
object SparkPIDemo extends App {
/**
*
* Spark源码中的计算pi的测试代码
*
* @author yore
* @param args 传入的参数,切片数
* @return void
*/
def SparkPi(args: Array[String]): Unit ={
val spark = SparkSession
.builder
.master("local[2]")
.appName("Spark Pi")
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 2
// avoid overflow
val n = math.min(100000L * slices, Int.MaxValue).toInt
println("$$$\t" + slices + "\t" + n)
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1))
spark.stop()
}
/*
* 2 200000 3.1396556982784913
* 20 2000000 3.143369571684786
* 200 20000000 3.141911757095588
*
*/
SparkPi(Array("2000"));
}
四、图解分析
首先我们先不用看代码,首先如果是我们,我们该如何去计算π呢?圆周率是一个无穷不循环小数,Spark中用到的是一个很巧的方法来近似的得到圆周率π—利用概率求圆周率。
假设有一个单位正四边形,内有一个内接圆,如下图:
设一个点a(x,y),x在区间[-1,1],y也在区间[-1,1],那么随机生成一个a点,落入内切圆的概率P为:
然后我们假设生成了n-1个点,有m个点落入园内,那么落入内切圆的概率P为:
联立以上两等式,可得:
所以圆周率π为:
然后我们再分析下Spark Pi的主要代码
//切片数,判断传入的参数,如果有值,将值转成int型,否则设置成默认值2
val slices = if (args.length > 0) args(0).toInt else 2
// 定义n值,控制迭代的次数,它的值是我们的分片数的10W倍,但是为了防止超过Int最大值,最大值不超过Int的最大值
val n = math.min(100000L * slices, Int.MaxValue).toInt
// 通过sparkContext的parallelize方法生成RDD,序列值为1直到n(不包括n,也就是有n-1个值),并行度为分片数,
// 然后进行map运算,此算子会对RDD每个值进行一次计算,每次计算时随机生成一个x和y值,
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
// 随机生成一个[-1,1)的值,random生成[0,1)的double值
val x = random * 2 - 1
val y = random * 2 - 1
// 如果生成的点到原点的距离小于1,则落在园内,返回1,否则返回0
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _) //reduce统计落入园内的点的个数
// 则π的值大致为4*count/(n-1),打印结果
println("Pi is roughly " + 4.0 * count / (n - 1))
// 停止底层的SparkContext
spark.stop()