一、在map端表合并(DistributedCache分布式缓存)
1.适用场景
适合用于关联表中有小表的情形。
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行合并并输出最终结果,可以大大提高合并操作的并发速度,加快处理速度。
2.程序分析
-
加载缓存数据:job.addCacheFile(new URI("file://d:/pd.txt"));
-
map端join的逻辑不需要reduce阶段,设置reducetask数量为0:job.setNumReduceTask(0),言外之意MapReduce可以不需要reduce阶段,但是必须有Map阶段
-
在setup()中读取缓存文件,一行一行的读取文件,并缓存到集合中
-
在map()读取需要合并的表,与缓冲拼接。
3.编写Mapper代码
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> pdMap = new HashMap<>();
Text k = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//读取缓存文件
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "UTF-8"));
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
//切割数据
String[] fields = line.split("\t");
//将缓存插入到集合中
pdMap.put(fields[0], fields[1]);
}
//关闭流
reader.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行数据
String line = value.toString();
//切割数据
String[] fields = line.split("\t");
//获取pid
String pid = fields[1];
//根据pid获取pname
String pName = pdMap.get(pid);
//拼接
line = line + "\t" + pName;
k.set(line);
//输出
context.write(k, NullWritable.get());
}
}
4.编写Driver代码
public class DistributedCacheDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//获取job信息
Configuration entries = new Configuration();
Job job = Job.getInstance(entries);
//获取驱动jar包
job.setJarByClass(DistributedCacheDriver.class);
//设置使用的Mapper
job.setMapperClass(DistributedCacheMapper.class);
//设置Reducer的个数为0
job.setNumReduceTasks(0);
//设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置加载缓存文件
job.addCacheFile(new URI("file://d:/pd.txt"));
//设置输入输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//执行
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
二、ETL数据清洗
1.概述
在运行核心业务MapReduce程序之前,往往要对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行mapper程序,不需要运行Reducer程序。
三、数据清洗实例
1.需求
-
对web访问日志的个字段识别切分
-
取出日志中不合法的记录
-
根据统计需求,生成各类访问请求过滤数据
2.编写bean代码
public class LogBean{
/**
* 客户端ip地址
*/
private String remoteAddr;
/**
* 用户名称
*/
private String remoteUser;
/**
* 访问时间和访问时区
*/
private String timeLocal;
/**
* 请求的url和http协议
*/
private String request;
/**
* 请求状态
*/
private String status;
/**
* 发送给客户端文件主题内容大小
*/
private String bodyBytesSent;
/**
* 从哪个页面链接访问过来的
*/
private String httpReferer;
/**
* 客户浏览器的相关信息
*/
private String httpUserAgent;
/**
* 判断数据是否合法
*/
private boolean valid = true;
public String getRemoteAddr() {
return remoteAddr;
}
public void setRemoteAddr(String remoteAddr) {
this.remoteAddr = remoteAddr;
}
public String getRemoteUser() {
return remoteUser;
}
public void setRemoteUser(String remoteUser) {
this.remoteUser = remoteUser;
}
public String getTimeLocal() {
return timeLocal;
}
public void setTimeLocal(String timeLocal) {
this.timeLocal = timeLocal;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBodyBytesSent() {
return bodyBytesSent;
}
public void setBodyBytesSent(String bodyBytesSent) {
this.bodyBytesSent = bodyBytesSent;
}
public String getHttpReferer() {
return httpReferer;
}
public void setHttpReferer(String httpReferer) {
this.httpReferer = httpReferer;
}
public String getHttpUserAgent() {
return httpUserAgent;
}
public void setHttpUserAgent(String httpUserAgent) {
this.httpUserAgent = httpUserAgent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String toString() {
return "LogBean{" +
"remoteAddr='" + remoteAddr + '\'' +
", remoteUser='" + remoteUser + '\'' +
", timeLocal='" + timeLocal + '\'' +
", request='" + request + '\'' +
", status='" + status + '\'' +
", bodyBytesSent='" + bodyBytesSent + '\'' +
", httpReferer='" + httpReferer + '\'' +
", httpUserAgent='" + httpUserAgent + '\'' +
", valid=" + valid +
'}';
}
}
3.编写Mapper代码
public class LogMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取1行
String line = value.toString();
//分析日志是否合法
LogBean bean = pressLog(line);
if (!bean.isValid()){
return;
}
k.set(bean.toString());
//输出
context.write(k,NullWritable.get());
}
private LogBean pressLog(String line) {
LogBean logBean = new LogBean();
//截取
String[] fields = line.split(" ");
if (fields.length>1){
//封装数据
logBean.setRemoteAddr(fields[0]);
logBean.setRemoteUser(fields[1]);
logBean.setTimeLocal(fields[3].substring(1));
logBean.setRequest(fields[6]);
logBean.setStatus(fields[8]);
logBean.setBodyBytesSent(fields[9]);
logBean.setHttpReferer(fields[10]);
if (fields.length>12) {
logBean.setHttpUserAgent(fields[11]+""+fields[12]);
}else {
logBean.setHttpUserAgent(fields[11]);
}
//大于400,HTTP错误
if (Integer.parseInt(logBean.getStatus())>=400){
logBean.setValid(false);
}
}else {
logBean.setValid(false);
}
return logBean;
}
}
4.编写Driver代码
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//加载jar包
job.setJarByClass(LogDriver.class);
//关联mapper
job.setMapperClass(LogMapper.class);
//设置最终输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置输入输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//执行
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
四、计数器应用
Hadoop为每个作业维护若干内置计数器,以描述多项指标。列入,某些计数器记录已处理的字节数和记录数,使用户可以监控已处理的输入数据量和已产生的输出数据量。
1.采用枚举的方式统计计数
enum MyCounter(MALFORORMED,NORMAL)
//对枚举定义的自定义计数器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
2.采用计数器组、计数器名称的方式统计
context.getCounter("counterGroup","countera").increment(1);
组名和计数器名称随意起,但最好有意义。
3.计数寄过在程序运行后的控制台上可以查看。
4.改写ETL数据清理实例的map方法,添加计数器
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取1行
String line = value.toString();
//分析日志是否合法
LogBean bean = pressLog(line);
if (!bean.isValid()){
context.getCounter("map","false").increment(1);
return;
}
context.getCounter("map","true").increment(1);
k.set(bean.toString());
//输出
context.write(k,NullWritable.get());
}