废话不多说 直接说案例,当然以下的案例只是入门教程,大神请绕道吧
使用的是spark-1.6.1-bin-without-hadoop.tgz 没有编译hadoop的 所以需要配置hadoop安装目录 这里使用的2.7.2
需要安装hadoop ,怎么安装去百度或者加我qq群:278947305 联系我
这里没有使用到集群,甚至没有使用到linux,(就是这么厉害),直接在window上跑,感觉跟大数据没关系,这里主要考虑代码,如果你想在集群中使用显然是最好的,前提是你有好的机器,可以使用以下的代码打包以后放入到spark集群中去使用
这里使用是scala2.10.5编写,如果你想使用Java来写自然也可以,不过代码就多了
案例一、单词统计(hellWord)
package org.zw.wc import org.apache.spark.SparkConf import org.apache.spark.SparkContext /** * 单词统计 */ object WcApp { def main(args: Array[String]) { System.setProperty("hadoop.home.dir", "D:/bigData/hadoop-2.7.2"); val conf = new SparkConf(); conf.setMaster("local[4]"); conf.setAppName("WCAPP") val sc = new SparkContext(conf); //设置日志级别 sc.setLogLevel("WARN") //加载数据 val lines = sc.textFile("D:/bigData/wc/data.txt"); //得到第一行的数据 println(lines.first()) //打印并发线程数据 val add = lines.map(x => { val tname = Thread.currentThread().getName(); println(tname + ":" + x); x } ) // val words = lines.flatMap { x => x.split(" ") } // val count = words.map(w => (w, 1)).reduceByKey { case (x, y) => x + y }; println(add.count()) //输出路径 // count.saveAsTextFile("D:/bigData/wc/resc7") } }数据集类型data.txt
hello hh htkl
hh bb kk kk jy hg bfd kk llgg f ujj ll ll
在集群使用把最后异性的注释打开,改成hdfs上的目录即可,统计结果就会在hdfs上报存
案例二、计算年龄平均值
使用随机数生成数据集
import java.io.File import scala.util.Random import java.io.FileWriter /** * @author Administrator */ object SampleDataFileGenerator { def main(args: Array[String]) { val writer = new FileWriter(new File("D:\\bigData\\agetest\\data.txt"), false) val rand = new Random() for (i <- 1 to 10000000) { writer.write(i + " " + rand.nextInt(100)) writer.write(System.getProperty("line.separator")) } writer.flush() writer.close() } }编程过程:先得到总行数,通过分隔符得到年龄值统计 然后再算平均值
object AvgAgeCalculator { def main(args: Array[String]) { //hadoop的安装目录 System.setProperty("hadoop.home.dir", "D:/bigData/hadoop-2.7.2"); // if (args.length < 1) { // println("Usage:AvgAgeCalculator datafile") // System.exit(1) // } val conf = new SparkConf() conf.setMaster("local[4]") conf.setAppName("Spark Exercise:Average Age Calculator") val sc = new SparkContext(conf) //设置日志级别 sc.setLogLevel("WARN") val dataFile = sc.textFile("D:\\bigData\\agetest\\data.txt", 5); //行数 val count = dataFile.count() //分隔后的第2个值 val ageData = dataFile.map(line => line.split(" ")(1)) //统计所有年龄 val totalAge = ageData.map(age => Integer.parseInt( String.valueOf(age))).collect().reduce((a, b) => a + b) println("Total Age:" + totalAge + ";Number of People:" + count) //平均年龄 val avgAge: Double = totalAge.toDouble / count.toDouble println("Average Age is " + avgAge) } }需要注意的是,要指定线程数,不然会报错
这些只是简单的案例,用来熟悉api,复杂的就要使用其他手段了,