版权声明:本文为博主原创文章,想转就转,知识,谁学到就是谁的 https://blog.csdn.net/rain_web/article/details/82767531
使用MapReduce对日志数据进行简单的清理和总结
首先使用MapReduce对日志进行分割,将time,ip,url提取出来,在用reduce进行一个整合,根据ip地址的出现次数,打印到hdfs中。在整合中我使用了bean结构来存储数据,bean继承了WritableComparable接口。
使用时先将BaiduLog和LogBean两个类导入项目,并配置相应的Maven依赖,然后导出项目的jar到虚拟机中,将日志文件上传到HDFS中,使用命令运行
hadoop jar rain-hadoop-1.0-SNAPSHOT.jar com.rain.mapreduce.BaiduLog /data/baidu.log /data/log/clean5
BaiduLog.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class BaiduLog {
public static class BaiduLogMapper extends Mapper<LongWritable,Text, Text, LogBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// super.map(key, value, context);
String log = value.toString();
String str = "(cn.baidu.core.inteceptor.LogInteceptor:55)";
if (log.indexOf(str)!=-1){
String[] log_arr = log.split(str);
String time = log_arr[0].substring(1, 10);
String[] log_arr2 = log_arr[1].split("\t");
String ip = log_arr2[1];
String url = log_arr2[2];
if (url.equals("null")){
url = log_arr2[3];
}
LogBean logbean = new LogBean(time,ip,url);
context.write(new Text(ip),logbean);
}
}
}
public static class BaiduLogReducer extends Reducer<Text,LogBean,IntWritable,Text>{
@Override
protected void reduce(Text key, Iterable<LogBean> values, Context context) throws IOException, InterruptedException {
// super.reduce(key, values, context);
int sum = 0;
StringBuffer str = new StringBuffer();
int flag = 0;
for (LogBean logbean:values){
sum++;
if (flag==0){
str.append(logbean.toString());
flag = 1;
}
}
context.write(new IntWritable(sum),new Text(str.toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "avg");
job.setJarByClass(BaiduLog.class);
job.setMapperClass(BaiduLog.BaiduLogMapper.class);
job.setReducerClass(BaiduLog.BaiduLogReducer.class);
// job.setCombinerClass(BaiduLog.BaiduLogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LogBean.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
LogBean.java
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class LogBean implements WritableComparable<LogBean> {
private String time;
private String ip;
private String url;
public LogBean() {
super();
}
public LogBean(String time, String ip, String url) {
this.time = time;
this.ip = ip;
this.url = url;
}
@Override
public String toString() {
return "LogBean{" +
"time='" + time + ' ' +
", ip='" + ip + ' ' +
", url='" + url + ' ' +
'}';
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
@Override
public int compareTo(LogBean o) {
return 0;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(time);
out.writeUTF(ip);
out.writeUTF(url);
}
@Override
public void readFields(DataInput in) throws IOException {
time = in.readUTF();
ip = in.readUTF();
url = in.readUTF();
}
}