1.使用idea新建java maven项目,在pom.xml添加spark依赖,并在打包时指定主函数入口,这样就可以不用在提交spark任务时指定入口方法
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- 打包jar文件时,配置manifest文件,加入lib包的jar依赖 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classesDirectory>target/classes/</classesDirectory>
<archive>
<manifest>
<!-- 主函数的入口 -->
<mainClass>worldcount</mainClass>
<!-- 打包时 MANIFEST.MF文件不记录的时间戳版本 -->
<useUniqueVersions>false</useUniqueVersions>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
<manifestEntries>
<Class-Path>.</Class-Path>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
2.java代码
**
* Author: hezishan
* Date: 2018/5/23.
* Description:
**/
public class worldcount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("searchword").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> yesterdayList = new ArrayList<>();
yesterdayList.add("油田天");
yesterdayList.add("卡车司机");
yesterdayList.add("房山区");
yesterdayList.add("左良玉");
JavaRDD<String> yesterdayData = sc.parallelize(yesterdayList);
JavaPairRDD<String, Integer> yesterdayFreq = yesterdayData.mapToPair(word -> new Tuple2<String, Integer>(word, 1))
.reduceByKey((a, b) -> a + b);
System.out.println(yesterdayFreq.collect());
System.out.println("llllllllllllllllllllllllllllllll");
}
}
在这里使用下面两行创建sparkContext, setAppName设置spark任务名,setMaster设置spark运行模式,在java中必须设置否则会报错,这里使用 local是本地模式,具体可参考博客:https://blog.csdn.net/gamer_gyt/article/details/51833681
SparkConf conf = new SparkConf().setAppName("searchword").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> yesterdayData = sc.parallelize(yesterdayList); 使用parallelize从集合中获取数据,转成有string组合形成的rdd
JavaPairRDD<String, Integer> yesterdayFreq = yesterdayData.mapToPair(word -> new Tuple2<String, Integer>(word, 1))
.reduceByKey((a, b) -> a + b);
首先->表达式时java 8的lamb特性,mapToPair主要时将一个string转变成(string,1)map对,例如:A->(A,1)
reduceBykey是根据key进行reduce操作,实际就是对 value的操作,这里(a,b)->a+b =将相同key的value相加
如若对java 8 lamb特性不熟悉,也可以使用全函数,建议先查用全函数再调整为lamb更为容易理解。
3.这里使用amberi搭建了大数据平台,并配置hue界面管理,在hue上提交spark任务
使用maven进行打包 jar(在打包之前把运行模式修改为 yarn-client,运行模式因环境而定),然后在hue界面提交jar,运行即可,若在打包时候没有指定入口类,需要在运行spark任务时指定,否则报没有入口类错误