粒子物理前传:使用Spark统计能谱数据
由此我将撰写之前的“粒子物理三部曲”的前传,前传中会和当时粒子物理系列中的技术相呼应。现在我们需要对粒子物理的能谱数据进行分类,构建算法分析其对应元素。那么第一步我们需要一个大数据开发工具。我选择了Spark。
本文中使用的技术一览
- sbt构建工具
- 利用python解析数据
- Spark
- Docker镜像
为什么使用Sbt?因为SBT是一个现代化的构建工具。依赖管理还算出色,对Scala语言的支持比较好。
为什么使用Docker?开箱即用,对于对性能要求不高的环境下使用上爬让开是个免去部署麻烦的选择。
获取Spark的Docker镜像并运行
我建议读者使用Docker,这样你不需要受环境部署的困扰。虽然我在本文实际上使用的是自己笔记本的Spark。
我们使用Github和DockerHub上很火的一个docker镜像。
[root@master ]# docker pull sequenceiq/spark:1.6.0
[root@master ]# docker run -it -p 8088:8088 -p 8042:8042 -p 4040:4040 -h sandbox sequenceiq/spark:1.6.0 bash
这样你相当于开了台沙盒虚拟机跑spark,运行一个计算PI值的示例。你可以使用下列命令来跑spark或退出关闭。
[root@master ]# spark-shell \
--master yarn-client \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1
... ...
scala> sc.parallelize(1 to 1000).count()
... ...
[root@master ]# docker kill <ContainerID>;
[root@master ]# docker rm <ContainerID>;
篇幅原因更多安装配置可以自行看官方文档。
利用Sbt新建项目
我现在想要建立一个命为NucData(核素数据)的统计项目。
[root@master ]# mkdir NucData;cd NucData;mkdir src;mkdir src/main;mkdir src/main/scala;
填写项目文件,依赖信息。
[root@master ]# vim pom.sbt
name := "NucData"
version := "1.0"
scalaVersion := "2.11.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.2"
注意版本信息。
编写主程序,统计一下为0的道址数目。
[root@master ]# vim src/main/scala/AppData.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object AppData{
def main(args:Array[String]){
val dataFile = "./Co60.csv";
val conf = new SparkConf().setAppName("AppData");
val sc = new SparkContext(conf);
val logData = sc.textFile(dataFile,2).cache();
val num = logData.filter(line=>line.contains("0")).count();
println("道址为0 :%s".format(num))
}
}
现在下载依赖包,构建项目。
[root@master ]# sbt update
[root@master ]# sbt package
是不是很像Maven。
用Python正则表达式转换数据
我之前的博文里,说道Geant4的能谱数据可以写入xml文件,为了方便分析,我编写了一个python程序来将其转化为csv格式方便分析。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import re
import csv
def xml2str(path_xml):
fh = open(path_xml, "r")
fh_str = fh.read()
return fh_str
def getTurple(xml_str):
ValueAndCnt_r = re.compile(r'<Double_t (.*)/>')
ValueAndCnt = ValueAndCnt_r.findall(xml_str)
cnt_r = re.compile(r'cnt="(.*?)"')
val_r = re.compile(r'v="(.*?)"')
xml_tur = []
for m in ValueAndCnt:
if 'cnt=' in m:
xml_tur.append(( float(val_r.findall(m)[0]),int(cnt_r.findall(m)[0]) ))
else:
xml_tur.append(( float(val_r.findall(m)[0]),1 ))
return xml_tur
def getArr(xml_tur):
Arr = []
for m in xml_tur:
Arr+=[m[0]]*m[1]
if len(Arr)>=1500:break
return np.array(Arr[0:1500])*10
def test():
path_xml = 'Example.xml'
Arr = getArr(getTurple(xml2str(path_xml)))
print len(np.array(Arr)*10)
print np.array(Arr)*10
print len(Arr)
def writeToCsv(Arr,way):
csvFile = open(way,'w') # 设置newline,否则两行之间会空一行
writer = csv.writer(csvFile)
for m in Arr:
writer.writerow([str(m)])
csvFile.close()
if __name__ == '__main__':
#test()
writeToCsv(getArr(getTurple(xml2str('Example.xml'))),'./Co60.csv')
提交到Spark
现在我们使用spark运行编译好的程序,目前我们的项目是这样。
[root@master ]# ls
Co60.csv pom.sbt project src target
现在提交任务到Spark:
[root@master ]# /usr/lib/spark/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --class "AppData" --master local target/scala-2.11/nucdata_2.11-1.0.jar
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/12/05 21:53:15 INFO SparkContext: Running Spark version 2.1.0
17/12/05 21:53:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
... ...
道址为0 :304
... ...
接下来,就应该考虑特征提取和数据分析的具体算法了。