#前言
企业级项目开发流程
项目调研:技术?业务?
产品经理、非常熟悉业务、项目经理
需求分析:做什么 做成什么样
用户提出来的:显式
隐式
方案设计
概设
详设
系统设计
功能开发
开发
测试:单元测试 CICD
测试
功能
联调
性能
用户 试用
部署上线
试运行 DIFF 稳定性
正式上线 灰度
后期
二期、三期、四期等运维保障 功能开发 bug修复(涉及到功能开发的话,以上流程就又需要再走一遍)
企业级大数据应用分类
数据分析:自研 + 商业
搜索/爬虫
机器学习/深度学习
人工智能
离线:批处理
实时:流处理
#基于Maven构建大数据开发项目
对CDN的点击日志数据清洗,进行MR编程,对日志文件数据进行清洗,并将清洗后的数据加载到hive外部表
手动造数据库
package com.leo.hadoop.utils;
import java.io.*;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.Random;
public class insertdata {
public static void main(String[] args) throws IOException, ParseException {
int i =0;
File file = new File("20190406.txt");
Writer out = new FileWriter(file);
while (i<20000) {
out.write(newData());
out.write("\n");
i++;
}
out.close();
}
public static String newData() throws ParseException {
DateFormat sourceFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
Random random = new Random();
String log = "baidu\tCN\tA\tE\t[17/Jul/2018:17:07:50 +0800]\t2\t223.104.18.110\t-\t112.29.213.35:80\t0\tv2.go2yd.com\tGET\thttp://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4\tHTTP/1.1\t-\tbytes 13869056-13885439/25136186\tTCP_HIT/206\t112.29.213.35\tvideo/mp4\t17168\t16384\t-:0\t0\t0\t-\t-\t-\t11451601\t-\t\"JSP3/2.0.14\"\t\"-\"\t\"-\"\t\"-\"\thttp\t-\t2\tv1.go2yd.com\t0.002\t25136186\t16384\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t1531818470104-11451601-112.29.213.66#2705261172\t644514568";
String[] split = log.split("\t");
StringBuilder builder = new StringBuilder("");
String oldTime = split[4];
Date parse = sourceFormat.parse("17/Jul/2018:00:00:00");
long time = parse.getTime();
long ts = time + random.nextInt(60)*6000+600000*random.nextInt(35);
String newTime = "["+sourceFormat.format(ts)+" "+"+0800]";
split[4] = newTime;
// System.out.println(newTime);
String oldIp = split[6];
String newIp = random.nextInt(255)+"."+random.nextInt(255)+"."+random.nextInt(255)+"."+random.nextInt(255);
split[6] = newIp;
// System.out.println(newIp);
String oldDomain = split[10];
String newDomain = "v"+(random.nextInt(4)+1)+".go2yd.com";
split[10] = newDomain;
// System.out.println(newDomain);
String oldTraffic = split[20];
String newTraffic = String.valueOf(random.nextInt(100000));
split[20] = newTraffic;
// System.out.println(newTraffic);
for (int i = 0 ;i<72;i++){
if(i == 71){
builder.append(split[i]);
}else{
builder.append(split[i]).append("\t");
}
}
String newLog = builder.toString();
// System.out.println(newLog);
return newLog;
}
}
一条数据
baidu CN A E [17/Jul/2018:17:07:50 +0800] 2 223.104.18.110 - 112.29.213.35:80 0 v2.go2yd.com GET http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 17168 16384 -:0 0 0 - - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.go2yd.com 0.002 25136186 16384 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1531818470104-11451601-112.29.213.66#2705261172 644514568
按照tab分割后的字段
cdn的厂商,cn中国,A忽略,第四个字段是level: 有E M … 或者其他的,访问产生的时间,忽略,访问的ip,忽略,服务端的ip,0,域名,url地址,—,TCP_***[关注此字段(这是从cache里拿的)],17168是所耗费的流量 #很多视频并不是直接从服务器上拿的,从cash上拿
IDEA创建maven项目 【开始清洗数据】
1、Project SDK 处选择JDK安装路径,勾选上Create from archetype,然后选择maven-archetype-quickstart ,然后下一步
2、只需把1.0-SNAPSHOT改成1.0,新建后点击下一步
3、下图注意选择自己的本地maven仓库【要修改为本地自定义repository】
$MAVEN_HOME/conf/setting.xml
D:\FWZ\software\apache-maven-3.3.9\maven-repository
4、修改项目名称和项目路径
5、点击Finish后会自动跳转下以下窗口
6、Maven是需要联网才能下载的,当出现BUILD SUCCESS 说明成功
7、src下面的main以及test下面都有个app这个可以删掉
需要添加hadoop的依赖
<properties>
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
</properties>
<!--为什么要添加hadoop.version这个变量呢?-方便添加其他Hadoop依赖-->
<!--添加CDH的仓库-->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>
<!--添加Hadoop的依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
问题:一定需要和你的这个hadoop版本一致吗? NO
maven工程打包:胖包【jar包和代码全打包】、瘦包 【只打包自己开发的代码不管jar包】(一般是打成瘦包,本次范例也是使用瘦包)
解析日志代码
清洗数据的util
1、在com.leo.hadoop项目上新建utils以及mapreduce,在utils里新建JAVA CLASS LogUtils,代码如下:
package com.leo.hadoop.utils;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
public class LogUtils {
DateFormat sourceFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);//源时间
DateFormat targetFormat = new SimpleDateFormat("yyyyMMddHHmmss");//转后时间
/**
* 日志文件解析,对内容进行字段的处理
* 按\t分割
* 只抽取我们所需要的字段
*/
public String parse(String log) {
String result = "";
try {
String[] splits = log.split("\t");
String cdn = splits[0];//baidu
String region = splits[1];//CN
String level = splits[3];//E
String timeStr = splits[4];//[17/Jul/2018:17:07:50 +0800]
String time = timeStr.substring(1,timeStr.length()-7);
time = targetFormat.format(sourceFormat.parse(time));
String ip = splits[6];//223.104.18.110
String domain = splits[10];//v2.go2yd.com
String url = splits[12];//http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4
String traffic = splits[20];//17168
// System.out.println(cdn);
// System.out.println(region);
// System.out.println(level);
// System.out.println(time);
// System.out.println(ip);
// System.out.println(domain);
// System.out.println(url);
// System.out.println(traffic);
//解析出来的日志 <== external table location是给外部表用的,所以用tab键拼接隔开
StringBuilder builder = new StringBuilder("");
builder.append(cdn).append("\t")
.append(region).append("\t")
.append(level).append("\t")
.append(time).append("\t")
.append(ip).append("\t")
.append(domain).append("\t")
.append(url).append("\t")
.append(traffic);
result = builder.toString();
} catch (ParseException e) {
e.printStackTrace();
}
return result;
}
}
2、在test下建立同名utils以及Test JAVA CLASS,进行测试:
package com.leo.hadoop.utlis;
import com.leo.hadoop.utils.LogUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestLogUtils {
private LogUtils utils ;
@Test
public void testLogParse() {
String log = "baidu\tCN\tA\tE\t[17/Jul/2018:17:07:50 +0800]\t2\t223.104.18.110\t-\t112.29.213.35:80\t0\tv2.go2yd.com\tGET\thttp://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4\tHTTP/1.1\t-\tbytes 13869056-13885439/25136186\tTCP_HIT/206\t112.29.213.35\tvideo/mp4\t17168\t16384\t-:0\t0\t0\t-\t-\t-\t11451601\t-\t\"JSP3/2.0.14\"\t\"-\"\t\"-\"\t\"-\"\thttp\t-\t2\tv1.go2yd.com\t0.002\t25136186\t16384\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t-\t1531818470104-11451601-112.29.213.66#2705261172\t644514568\n";
String result = utils.parse(log);
System.out.println(result);
}
@Before
public void setUp(){
utils = new LogUtils();
}
@After
public void tearDown(){
utils = null;
}
}
3、测试结果
baidu CN E 20180717170750 223.104.18.110 v2.go2yd.com http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4 16384
Process finished with exit code 0
面试题:StringBuilder和SringBuffer的区别
StringBuilder:线程非安全的
StringBuffer:线程安全的
**开发mapreduce **
在mapreduce里新建mapper的package,然后在mapper里新建LogETLMapper的CLASS
package com.leo.hadoop.mapreduce.mapper;
import com.leo.hadoop.utils.LogUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogETLMapper extends Mapper<LongWritable,Text,NullWritable,Text>{//我们要的是value,key没用故用NullWritable
/**
* 通过mapreduce框架的map方式进行数据清洗
* 进来一条数据就按照我们的解析规则清洗完以后输出
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
int length = value.toString().split("\t").length;
if(length == 72) {
LogUtils utils = new LogUtils();
String result = utils.parse(value.toString());
if(StringUtils.isNotBlank(result)) {//判断是否为空
context.write(NullWritable.get(), new Text(result));
}
}
}
}
开发一个Driver,作为入口
package com.leo.hadoop.mapreduce.driver;
import com.kun.hadoop.mapreduce.mapper.LogETLMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.util.SystemClock;
public class LogETLDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("please input 2 params: input output");
System.exit(0);
}
String input = args[0];
String output = args[1]; //output/d=20180717
//在本地运行的window环境需要加上 而打包到服务器注释掉本行
// System.setProperty("hadoop.home.dir", "D:\hadoop-common-2.2.0-bin-master");
Configuration configuration = new Configuration();
// 判断文件系统是否存在,如果存在就删除
FileSystem fileSystem = FileSystem.get(configuration);
Path outputPath = new Path(output);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
Job job = Job.getInstance(configuration);
job.setJarByClass(LogETLDriver.class);
job.setMapperClass(LogETLMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
}
}
本地测试
ipnut 是输入文件的目录;output/d=20180711是输出文件
在本地win运行项目会有坑;所以先准备:
1、下载好对应版本的hadoop;配置hadoop对应版本的环境变量即HADOOP_HOME和%HADOOP_HOME/bin【path】
2、下载对应的win下运行环境包 https://github.com/steveloughran/winutils
3、将‘hadoop.dll’和‘winutils.exe’两个文件放入本地%HADOOP_HOME/bin下;同时将hadoop.dll放入C:\Windows\System32文件夹下
4、重新运行driver端即可得到结果
服务器测试
将代码打包过程:
打包之前先把这句代码注释掉:
//在本地运行的window环境需要加上 而打包到服务器注释掉本行
//System.setProperty("hadoop.home.dir","D:/IDEAMaven/hadoop-2.6.0-cdh5.7.0")
打包过程中自动会运行我们自己的单元测试;完成打包后的图示:
把本地jar包上传到服务器上
将日志文件上传到hdfs集群
[hadoop@hadoop-01 data]$ rz
rz waiting to receive.
Starting zmodem transfer. Press Ctrl+C to cancel.
Transferring 20180717.log...
100% 9573 KB 4786 KB/sec 00:00:02 0 Errors
[hadoop@hadoop-01 data]$ hdfs dfs -put /home/hadoop/data/20180717.log /data/
19/04/14 10:35:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[hadoop@hadoop-01 data]$ hdfs dfs -ls /data
19/04/14 10:35:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rw-r--r-- 1 hadoop supergroup 9803062 2019-04-14 10:35 /data/20180717.log
[hadoop@hadoop-01data]$
然后运行
hadoop jar /home/hadoop/app/hadoop-2.6.0-cdh5.7.0/lib/g6-hadoop-1.0.jar com.ruozedata.hadoop.mapreduce.driver.LogETLDriver /data/20180717.log /output
运行完之后可以查看一下output目录
[hadoop@hadoop-01 data]$ hdfs dfs -ls /output
19/04/14 10:41:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2019-04-14 10:40 /output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 2963062 2019-04-14 10:40 /output/part-r-00000
[hadoop@hadoop-01 data]$ hadoop fs -du -s -h /output
19/04/14 10:43:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2.8 M 2.8 M /output
[hadoop@hadoop-01 data]$
创建运行本作业的shell脚本:
#!/bin/bash
process_data=20180717
echo "step1:mapreduce etl"
#安装常理输出到分区里;输出参数加上day=20180717
hadoop jar /home/hadoop/data/hadoop_train-1.0-SNAPSHOT.jar com.kun.hadoop.mapreduce.driver.LogETLDriver /input /home/hadoop/data/output/day=$process_data
然后运行脚本:
[hadoop@hadoop-01 data]$ vim hadoop-train.sh
[hadoop@hadoop-01 data]$ chmod u+x hadoop-train.sh
[hadoop@hadoop-01 data]$ ./hadoop-train.sh
[hadoop@hadoop-01 data]$ hadoop dfs -ls /home/hadoop/data/
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2019-03-29 12:20 /home/hadoop/data/output
[hadoop@hadoop data]$ hadoop dfs -ls /home/hadoop/data/output
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2019-03-29 11:38 /home/hadoop/data/output/_SUCCESS
drwxr-xr-x - hadoop supergroup 0 2019-03-29 12:21 /home/hadoop/data/output/day=20180717
-rw-r--r-- 1 hadoop supergroup 72432 2019-03-29 11:38 /home/hadoop/data/output/part-r-00000
[hadoop@hadoop data]$ hadoop dfs -ls /home/hadoop/data/output/day=20180717
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2019-03-29 12:21 /home/hadoop/data/output/day=20180717/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 72432 2019-03-29 12:21 /home/hadoop/data/output/day=20180717/part-r-00000
Hive完成最基本的统计分析
创建外部表
location指定的不是mapreduce作业的输出路径 why?因为会覆盖掉
create external table hadoop_access (
cdn string,
region string,
level string,
time string,
ip string,
domain string,
url string,
traffic bigint
) partitioned by (day string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/home/hadoop/data/clear'
移动数据到外部表对应的目录
[hadoop@hadoop data]$ hadoop fs -ls /home/hadoop/data/clear
ls: `/home/hadoop/clear': No such file or directory
[hadoop@hadoop data]$ hadoop fs -mkdir /home/hadoop/data/clear
[hadoop@hadoop data]$ hadoop fs -ls /home/hadoop/data/clear
[hadoop@hadoop data]$ hadoop fs -mkdir -p /home/hadoop/data/clear/day=20180717
[hadoop@hadoop data]$ hadoop fs -mv /home/hadoop/data/output/day=20180717/part-r-00000 /home/hadoop/data/clear/day=20180717
[hadoop@hadoop data]$ hadoop fs -ls /home/hadoop/data/clear/day=20180717
Found 1 items
-rw-r--r-- 1 hadoop supergroup 72432 2019-03-29 12:21 /home/hadoop/data/clear/day=20180717/part-r-00000
[hadoop@hadoop data]$
刷元数据到hive中
hive (d6_test)> alter table g6_access add if not exists partition(day='20180717');
OK
Time taken: 0.2 seconds
统计每个域名的traffic之和
hive (d6_test)> select domain,sum(traffic) from g6_access group by domain;
Query ID = hadoop_20190414115050_17d05247-7e1f-455c-a018-ffb626e1d555
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1551521482026_0058, Tracking URL = http://10-9-140-90:18088/proxy/application_1551521482026_0058/
Kill Command = /home/hadoop/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop job -kill job_1551521482026_0058
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-04-14 12:08:25,157 Stage-1 map = 0%, reduce = 0%
2019-04-14 12:08:35,115 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.77 sec
2019-04-14 12:08:46,160 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 3.49 sec
MapReduce Total cumulative CPU time: 3 seconds 490 msec
Ended Job = job_1551521482026_0058
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 3.49 sec HDFS Read: 2970723 HDFS Write: 92 SUCCESS
Total MapReduce CPU Time Spent: 3 seconds 490 msec
OK
domain _c1
v1.go2yd.com 252434700
v2.go2yd.com 252076506
v3.go2yd.com 250212070
v4.go2yd.com 248064592
Time taken: 37.966 seconds, Fetched: 4 row(s)
hive (d6_test)>
补全hadoop-train.sh脚本 然后执行【./hadoop-train.sh】
process_data=20180717
echo "step1:mapreduce etl"
#安装常理输出到分区里;输出参数加上day=20180717
hadoop jar /home/hadoop/data/hadoop_train-1.0-SNAPSHOT.jar com.kun.hadoop.mapreduce.driver.LogETLDriver /input /home/hadoop/data/output/day=$process_data
echo "step2:hdfsdata mv hive"
hadoop fs -test -e /home/hadoop/data/clear/day=$process_data
if [ $? -eq 0 ] ;then
hadoop fs -rm -r /home/hadoop/data/clear/day=$process_data
hadoop fs -mkdir -p /home/hadoop/data/clear/day=$process_data
hadoop fs -mv /home/hadoop/data/output/day=$process_data/part-r-00000 /home/hadoop/data/clear/day=$process_data
echo "remove and mkdir dir"
else
hadoop fs -mkdir -p /home/hadoop/data/clear/day=$process_data
hadoop fs -mv /home/hadoop/data/output/day=$process_data/part-r-00000 /home/hadoop/data/clear/day=$process_data
echo "mkdir dir"
fi
echo "step3:Brush the metadata"
hive -e "alter table hadoop_access add if not exists partition(day=$process_data)"