代码地址 :
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/myInputFormat
需求:
现有一些原始日志需要做增强解析处理,流程:
1、 从原始日志文件中读取数据
2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志
3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录
分析:
程序的关键点是要在一个mapreduce程序中根据数据的不同输出类型结果到不同的目录,这类灵活的输出需求可以通过自定义的OutputFormat来实现。
DBLoader : 连接数据库,从数据库中将字典数据缓存出来
package com.thp.bigdata.logEnhance;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* 数据库连接
* java.sql包中的接口,它是sun公司为了简化,统一对数据库的操作,定义了一套java操作数据库的规范,
* 由各个数据库公司自己实现,数据库有mysql oracle等,
* 而com.mysql.jdbc包中的类是mysql自己实现规范接口的类,
* 不同的数据库有不同的实现,为了能够只写一套代码,实现跨数据库使用,
* 书写传统jdbc需要导入的包就使用java.sql包,而不用考虑具体的实现类。
* @author 汤小萌
*
*/
public class DBLoader {
/**
* 从数据库中将url对应的内容全部放到HashMap中进行缓存
*/
public static void dbLoader(Map<String, String> urlContentMap) {
Connection con = null;
Statement st = null;
ResultSet rs = null;
try {
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/urldb", "root", "root");
st = con.createStatement();
rs = st.executeQuery("select url, content from url_rule");
while(rs.next()) {
urlContentMap.put(rs.getString(1), rs.getString(2));
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
rs = null;
}
}
if(st != null) {
try {
st.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
st = null;
}
}
if(con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
con = null;
}
}
}
}
// 测试数据库连接成功
public static void main(String[] args) {
HashMap<String, String> urlContentMap = new HashMap<String, String>();
dbLoader(urlContentMap);
// Set<Entry<String, String>> entrySet = urlContentMap.entrySet();
for(Entry<String, String> entrySet : urlContentMap.entrySet()) {
System.out.println(entrySet.getKey() + " : " + entrySet.getValue());
}
}
}
自定义的OutputFormat : 可以实现跟数据类型的不同向不同的目录输出:
package com.thp.bigdata.logEnhance;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 日志增强的自定义OutputFormat
* 根据数据的不同输出类型到不同的输出目录
* @author 汤小萌
*
*/
public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(job.getConfiguration());
Path enhancePath = new Path("f:/enhancelog/output_en/log.txt");
Path tocrawlPath = new Path("f:/enhancelog/output_crw/url.txt");
FSDataOutputStream enhanceOS = fs.create(enhancePath);
FSDataOutputStream tocrawlOS = fs.create(tocrawlPath);
return new EnhanceRecordWriter(enhanceOS, tocrawlOS);
}
/**
* 这个RecordWriter类才是真正往外写文件的
* 需要往这个类的构造方法中传递输出流,在getRecordWriter()方法中就要进行构建这两个输出流
* @author 汤小萌
*
*/
static class EnhanceRecordWriter extends RecordWriter<Text, NullWritable> {
FSDataOutputStream enhanceOS = null;
FSDataOutputStream tocrawlOS = null;
public EnhanceRecordWriter(FSDataOutputStream enhanceOS, FSDataOutputStream tocrawlOS) {
super();
this.enhanceOS = enhanceOS;
this.tocrawlOS = tocrawlOS;
}
// 往外写文件的逻辑
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String dataLine = key.toString();
if(dataLine.contains("tocrawl")) {
// 如果写的数据里面包含 "tocrawl",那么就是不完全的数据,需要写入待爬清单文件
tocrawlOS.write(dataLine.getBytes());
} else {
// 如果写的数据没有包含"tocrawl",就说明写出的数据是增强日志,那么就需要写入增强日志的文件
enhanceOS.write(dataLine.getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if(enhanceOS != null) {
enhanceOS.close();
}
if(tocrawlOS != null) {
tocrawlOS.close();
}
}
}
}
MapReduce 执行 :
package com.thp.bigdata.logEnhance;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
/**
* 日志增强:
* 写入不同的文件
* @author 汤小萌
*
*/
public class LogEnhance {
static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> urlContentMap = new HashMap<String, String>();
Text k = new Text();
NullWritable v = NullWritable.get();
/**
* 从数据库中加载数据到HashMap缓存下来
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
DBLoader.dbLoader(urlContentMap);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 获取一个计数器 - (这个计数器是全局的计数器) 记录不合法的日志行数,组名,计数器名称
Counter counter = context.getCounter("malFormed", "malFormedCounter");
String line = value.toString();
String[] fields = line.split("\t");
try {
String url = fields[28];
System.out.println(url);
String content_tag = urlContentMap.get(url);
// System.out.println(content_tag);
if(content_tag == null) { // 从知识库中根据对应的url查询的内容为空,
k.set(url + "\t" + "tocrawl" + "\n"); // 自定义的输出流没有包装,不能换行
context.write(k, v);
} else {
k.set(line + "\t" + content_tag + "\n");
System.out.println(k.toString());
context.write(k, v);
}
} catch (Exception e) {
// 有的数据可能是不合法的,长度不够,不完整的数据
e.printStackTrace();
System.err.println("数据不合法");
counter.increment(1);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogEnhance.class);
job.setMapperClass(LogEnhanceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 要控制不同的文件内容写往不同的目标路径,采用自定义的OutputStream
job.setOutputFormatClass(LogEnhanceOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("f:/enhancelog/input"));
// 尽管我们在自定义的OutputFormat里面已经设置好了输出的路径
// 但是在FileOutputFormat中,必须输出一个_success文件,所以还需要设置输出path
FileOutputFormat.setOutputPath(job, new Path("f:/enhancelog/output"));
// 现在只是做日志的清洗,还不需要reduce task
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Test
public void test() {
// String str = "1374609560.11 1374609560.16 1374609560.16 1374609560.16 110 5 8615038208365 460023383869133 8696420056841778 2 460 0 14615 54941 10.188.77.252 61.145.116.27 35020 80 6 cmnet 1 221.177.218.34 221.177.217.161 221.177.218.34 221.177.217.167 ad.veegao.com http://ad.veegao.com/veegao/iris.action Apache-HttpClient/UNAVAILABLE (java 1.4) POST 200 593 310 4 3 0 0 4 3 0 0 0 0 http://ad.veegao.com/veegao/iris.action 5903903079251243019 5903903103500771339 5980728";
String str = "1374609557.12 1374609557.15 1374609557.15 1374609557.74 110 5 8615093268715 460023934411519 3588660433773101 2 460 0 14822 29343 10.188.77.164 223.203.194.156 42384 80 6 cmnet 1 221.177.218.41 221.177.217.161 221.177.218.41 221.177.217.167 ugc.moji001.com http://ugc.moji001.com/sns/GetNewestShare/100/489?UserID=42958568&Platform=Android&Version=10023802&BaseOSVer=10&PartnerKey=5007&Model=GT-S7500&Device=phone&VersionType=1&TS= Apache-HttpClient/UNAVAILABLE (java 1.4) GET 200 421 363 3 2 0 0 3 2 0 0 0 0 http://ugc.moji001.com/sns/GetNewestShare/100/489?UserID=42958568&Platform=Android&Version=10023802&BaseOSVer=10&PartnerKey=5007&Model=GT-S7500&Device=phone&VersionType=1&TS= 5903903047315243019 5903903087191863307 5980488";
String[] fields = str.split("\t");
int count = 1;
for(String field : fields) {
System.out.println(count + " > " + field);
count++;
}
}
}
日志数据:
https://pan.baidu.com/s/1xzlfGQ8R67bsDsTqTsOcrQ
数据库中的字典数据的sql文件:
https://pan.baidu.com/s/1SrExNEebLBVZqtr-MasjLA