原始数据:
qR8WRLrO2aQ:mienge:406:People &
& 号 左右 各有一个空格
Blogs:599:2788:5:1:0:4UUEKhr6vfA:zvDPXgPiiWI
:TxP1eXHJQ2Q:k5Kb1K0zVxU:hLP_mJIMNFg
:tzNRSSTGF4o:BrUGfqJANn8:OVIc-mNxqHc:gdxtKvNiYXc
:bHZRZ-1A-qk
:GUJdU6uHyzU:eyZOjktUb5M
:Dv15_9gnM2A
:lMQydgG1N2k
:U0gZppW_-2Y:dUVU6xpMc6Y
:ApA6VEYI8zQ
:a3_boc9Z_Pc
:N1z4tYob0hM:2UJkU2neoBs
预处理之后的数据:
qR8WRLrO2aQ:mienge:406:People,Blogs:599:2788:5:1:0:
& 号 左右两边空格 去掉 并且 替换成 , 号
4UUEKhr6vfA
,zvDPXgPiiWI
,TxP1eXHJQ2Q
,k5Kb1K0zVxU
,hLP_mJIMNFg,tzNRSSTGF4
o,BrUGfqJANn8
,OVIc-mNxqHc
,gdxtKvNiYXc
,bHZRZ-1A-qk,GUJdU6uHyzU,eyZOjktUb5M,Dv15_9gnM2A,lMQydgG1N2k,U0gZppW_-2Y,dUVU6xpMc6Y,ApA6VEYI8zQ,a3_boc9Z_Pc,N1z4tYob0hM,2UJkU2neoBs
对原始数据进行预处理,格式为上面给出的预处理之后的示例数据。
通过观察原始数据形式,可以发现,
每个字段之间使用“:”分割,
视频可以有多个视频类别,
类别之间&符号分割,
且分割的两边有空格字符,
同时相关视频也是可以有多个,
多个相关视频也是用“:”进行分割。
为了分析数据时方便,我们首先进行数据重组清洗操作。
即:将每条数据的类别用“,”分割,同时去掉两边空格,多个“相关视频id”也使用“,”进行分割
pom 文件 : 创建 Maven 项目
<!--导包-->
<!-- 下载cdh版本jar的仓库地址 -->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<!-- 可以打包携带所有的jar包,提交到集群运行不用担心找不到jar包-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
MapperReduce :
package MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.util.Arrays;
/**
* Created by 一个蔡狗 on 2020/1/3.
*/
public class MyMApperReduce {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "MyMapperReduceDriver");
job.setJarByClass(MyMApperReduce.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("E:\\2019-传智资料2\\期末考试\\考试练习题\\video.txt"));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("E:\\2019-传智资料2\\期末考试\\考试练习题\\video2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String s = value.toString();
String[] sp = s.split(":");
String str = "";
String[] strings = Arrays.copyOfRange(sp, 0, 9);
for (String string : strings) {
str += string + ":";
// System.out.println(str);
}
for (int i = 0; i < sp.length; i++) {
if (sp[i].length() >= 9) {
str += sp[i] + ",";
// System.out.println(str);
}
}
// System.out.println(str.substring(0,str.length()));
// 因为是循环 多了个 , 号 执行 str.length()-1
// System.out.println(str.substring(0,str.length()-1));
context.write(new Text(sp[0]), new Text((str.substring(0, str.length() - 1).replace(" & ", ","))));
}
}
static class MyReduce extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//遍历value 进行 输出
for (Text value : values) {
context.write(value, null);
}
}
}
}