直接上数据格式、问题、代码。先按照Java来复习下。复习完再复习Scala
一、数据准备、格式介绍.
1、评分数据。数据为 UserId::MovieId::Rating::timestamp. 文件名rating.bat
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
2、电影数据。数据为 MovieID::Title::Gengres
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
3、用户数据。数据为 UserId::Gender::Age::Occupation::Zip-code
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
二、解决问题
1、打印所有电影中评分最高的前十个电影名和平均分。
解答代码:(测试完毕)
/**
* 电光火石体验Spark
* 1、所有电影中评分最高的前十名
* 2、前十名的平均评分
* 3、带出电影名字
*/
public class Spark_Analyzer {
public static void main(String[] args) {
String basicPath = "spark-demo/src/main/resources/";
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("Spark_Analyzer");
sparkConf.set("spark.testing.memory", "2147480000");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
javaSparkContext.setLogLevel("trace");
JavaRDD<String> userRdd = javaSparkContext.textFile(basicPath + "users.dat");
JavaRDD<String> moviesRdd = javaSparkContext.textFile(basicPath + "movies.dat", 3);
JavaRDD<String> ratingsRdd = javaSparkContext.textFile(basicPath + "ratings.dat", 3);
JavaPairRDD<String, String> movieCache = moviesRdd.mapToPair(line -> new Tuple2(line.split("::")[0], line.split("::")[1])).cache();
JavaPairRDD<String, Integer> avgRatings = ratingsRdd.mapToPair(new PairFunction<String, String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Tuple2<String, Integer>> call(String line) throws Exception {
String[] split = line.split("::");
return new Tuple2(split[1], new Tuple2(split[2], 1));
}
}).reduceByKey(new Function2<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
Integer rating = Integer.parseInt(t1._1.toString()) + Integer.parseInt(t2._1.toString());
int num = t1._2 + t2._2;
return new Tuple2(rating.toString(), num);
}
}).mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Integer>>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Tuple2<String, Integer>> item) throws Exception {
Integer rating = Integer.parseInt(item._2._1);
int rangeRat = rating / item._2._2;
String movieId = item._1;
return new Tuple2(movieId, rangeRat);
}
});
List<Tuple2<Integer, String>> top = avgRatings.join(movieCache).mapToPair(new PairFunction<Tuple2<String, Tuple2<Integer, String>>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Tuple2<Integer, String>> item) throws Exception {
return new Tuple2(item._2._1, item._2._2);
}
}).sortByKey(false).take(10);
top.stream().forEach(t -> {
System.out.println("movieId" + t._2 + " avg: " + t._1);
});
javaSparkContext.close();
}
}
附一份Maven依赖:
<properties>
<spark.version>2.4.3</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.12-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>0.98.12-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.5</version>
</dependency>
</dependencies>
2、最受男生喜欢的Top10电影,最受女生喜欢的top10电影。(只写下女生的topN 数据中F代表女生) 经测试ok
/**
* 1、在原来基础上join User数据。洗出性别
* 2、topn
* 3、组成对象的部分用TupleN更好
*/
public class Spark_Analyzer_Gender {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_Analyzer_Gender").set("spark.testing.memory", "2147480000");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
javaSparkContext.setLogLevel("trace");
String basicPath = "spark-demo/src/main/resources/";
JavaRDD<String> userRdd = javaSparkContext.textFile(basicPath + "users.dat");
JavaRDD<String> moviesRdd = javaSparkContext.textFile(basicPath + "movies.dat", 3);
JavaRDD<String> ratingsRdd = javaSparkContext.textFile(basicPath + "ratings.dat", 3);
JavaPairRDD<String, String> userCache = userRdd.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String line) throws Exception {
String[] split = line.split("::");
String userId = split[0];
String gender = split[1];
return new Tuple2<>(userId, gender);
}
}).cache();
JavaPairRDD<String, String> movieCache = moviesRdd.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String line) throws Exception {
String[] split = line.split("::");
String movieId = split[0];
String movieName = split[1];
return new Tuple2(movieId, movieName);
}
}).cache();
List<Tuple2<Integer, Object>> result = ratingsRdd.mapToPair(new PairFunction<String, String, Rating>() {
@Override
public Tuple2<String, Rating> call(String line) throws Exception {
String[] split = line.split("::");
String userId = split[0];
String movieId = split[1];
Integer rating = Integer.parseInt(split[2]);
Long timestamp = Long.parseLong(split[3]);
Rating ratingObj = new Rating(userId, movieId, rating, timestamp);
return new Tuple2(userId, ratingObj);
}
}).join(userCache).filter(new Function<Tuple2<String, Tuple2<Rating, String>>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Tuple2<Rating, String>> item) throws Exception {
return item._2._2.equals("F");
}
}).mapToPair(new PairFunction<Tuple2<String, Tuple2<Rating, String>>, String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Tuple2<Rating, String>> item) throws Exception {
Rating rating = item._2._1;
String gender = item._2._2;
return new Tuple2(rating.getMovieId(), new Tuple2<>(rating.getRating(), 1));
}
}).reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> item1, Tuple2<Integer, Integer> item2) throws Exception {
int ratingSum = item1._1 + item2._1;
int count = item1._2 + item2._2;
return new Tuple2(ratingSum, count);
}
}).mapToPair(new PairFunction<Tuple2<String, Tuple2<Integer, Integer>>, String, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Tuple2<Integer, Integer>> item) throws Exception {
return new Tuple2(item._1, new Tuple2(item._2._1 / item._2._2, item._2._1));
}
}).join(movieCache).mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<Integer, Integer>, String>>, Integer, Object>() {
@Override
public Tuple2<Integer, Object> call(Tuple2<String, Tuple2<Tuple2<Integer, Integer>, String>> item) throws Exception {
String movieId = item._1;
String movieName = item._2._2;
Tuple2<Integer, Integer> data = item._2._1;
Integer avg = data._1;
Integer sum = data._1;
return new Tuple2(avg, movieName);
}
}).sortByKey(false).take(10);
result.stream().forEach(t -> {
System.out.println("movie : " + t._2 + " avg : " + t._1);
});
}
}
3、关于二次排序
/**
* Java版二次排序
*/
public class Spark_Analyzer_Sort {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_Analyzer_Gender").set("spark.testing.memory", "2147480000");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
javaSparkContext.setLogLevel("trace");
String basicPath = "spark-demo/src/main/resources/";
JavaRDD<String> userRdd = javaSparkContext.textFile(basicPath + "users.dat");
JavaRDD<String> moviesRdd = javaSparkContext.textFile(basicPath + "movies.dat", 3);
JavaRDD<String> ratingsRdd = javaSparkContext.textFile(basicPath + "ratings.dat", 3);
List<Tuple2<Rating, String>> data = ratingsRdd.mapToPair(new PairFunction<String, Rating, String>() {
@Override
public Tuple2<Rating, String> call(String line) throws Exception {
String[] split = line.split("::");
String userId = split[0];
String movieId = split[1];
Integer rating = Integer.parseInt(split[2]);
Long timestamp = Long.parseLong(split[3]);
Rating data = new Rating(userId, movieId, rating, timestamp);
return new Tuple2(data, movieId);
}
}).sortByKey(false).take(10);
data.stream().forEach(t -> {
System.out.println("movieId : " + t._2 + " data : " + t._1);
});
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Rating implements Comparable, Serializable {
private String userId;
private String movieId;
private Integer rating;
private Long timestamp;
@Override
public int compareTo(Object o) {
Rating tmp = (Rating)o;
int i = this.rating.compareTo(tmp.getRating());
if(i == 0){
return this.timestamp.compareTo(tmp.getTimestamp());
}
return i;
}
}