MapReduce数据练习

原始数据:
qR8WRLrO2aQ:mienge:406:People &  
                                 &  号 左右 各有一个空格 

Blogs:599:2788:5:1:0:4UUEKhr6vfA:zvDPXgPiiWI
:TxP1eXHJQ2Q:k5Kb1K0zVxU:hLP_mJIMNFg
:tzNRSSTGF4o:BrUGfqJANn8:OVIc-mNxqHc:gdxtKvNiYXc
:bHZRZ-1A-qk
:GUJdU6uHyzU:eyZOjktUb5M
:Dv15_9gnM2A
:lMQydgG1N2k
:U0gZppW_-2Y:dUVU6xpMc6Y
:ApA6VEYI8zQ
:a3_boc9Z_Pc
:N1z4tYob0hM:2UJkU2neoBs
预处理之后的数据:

qR8WRLrO2aQ:mienge:406:People,Blogs:599:2788:5:1:0: 
                        & 号 左右两边空格 去掉 并且 替换成 , 号
4UUEKhr6vfA
,zvDPXgPiiWI
,TxP1eXHJQ2Q
,k5Kb1K0zVxU
,hLP_mJIMNFg,tzNRSSTGF4
o,BrUGfqJANn8
,OVIc-mNxqHc
,gdxtKvNiYXc
,bHZRZ-1A-qk,GUJdU6uHyzU,eyZOjktUb5M,Dv15_9gnM2A,lMQydgG1N2k,U0gZppW_-2Y,dUVU6xpMc6Y,ApA6VEYI8zQ,a3_boc9Z_Pc,N1z4tYob0hM,2UJkU2neoBs

对原始数据进行预处理,格式为上面给出的预处理之后的示例数据。

通过观察原始数据形式,可以发现,
每个字段之间使用“:”分割,
视频可以有多个视频类别,
类别之间&符号分割,
且分割的两边有空格字符,
同时相关视频也是可以有多个,
多个相关视频也是用“:”进行分割。
为了分析数据时方便,我们首先进行数据重组清洗操作。

即:将每条数据的类别用“,”分割,同时去掉两边空格,多个“相关视频id”也使用“,”进行分割

pom 文件  : 创建  Maven 项目

  <!--导包-->


    <!-- 下载cdh版本jar的仓库地址 -->
    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.dom4j</groupId>
            <artifactId>dom4j</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>

            <plugin>
                <!-- 可以打包携带所有的jar包,提交到集群运行不用担心找不到jar包-->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

MapperReduce :

package MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.Arrays;

/**
 * Created by 一个蔡狗 on 2020/1/3.
 */
public class MyMApperReduce {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "MyMapperReduceDriver");

        job.setJarByClass(MyMApperReduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("E:\\2019-传智资料2\\期末考试\\考试练习题\\video.txt"));
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("E:\\2019-传智资料2\\期末考试\\考试练习题\\video2"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }


    static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String s = value.toString();
            String[] sp = s.split(":");

            String str = "";
            String[] strings = Arrays.copyOfRange(sp, 0, 9);
            for (String string : strings) {
                str += string + ":";
//                System.out.println(str);
            }

            for (int i = 0; i < sp.length; i++) {
                if (sp[i].length() >= 9) {
                    str += sp[i] + ",";
//                    System.out.println(str);
                }
            }
//            System.out.println(str.substring(0,str.length()));
//            因为是循环 多了个 , 号   执行 str.length()-1     
//            System.out.println(str.substring(0,str.length()-1));


            context.write(new Text(sp[0]), new Text((str.substring(0, str.length() - 1).replace(" & ", ","))));

        }
    }


        static class MyReduce extends Reducer<Text, Text, Text, Text> {
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

                //遍历value  进行 输出
                for (Text value : values) {
                    context.write(value, null);
                }
            }
        }



}
发布了177 篇原创文章 · 获赞 288 · 访问量 25万+

猜你喜欢

转载自blog.csdn.net/bbvjx1314/article/details/103814292