1 基本hadoop程序
2 输入输出格式
3 多个map reduce管道(已经实践,靠谱,可否支持写入不同的文件呢?)
4 数据join。
map端join
reduce端join(分组,打标志,以关联字段为key)
分布式缓存(非对称,小数据)
布隆过滤器(假负率为0,位图,缩小存储空间,非对称表join常用手段)
http://www.google.com.hk/ggblog/googlechinablog/2007/07/bloom-filter_7469.html
public class WordCountDemo { private static final Log LOG = LogFactory.getLog(WordCountDemo.class); public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(10); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); //LOG.error(key.toString()+":"+result.get()); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "word count"); job.setJarByClass(WordCountDemo.class); job.setMapperClass(TokenizerMapper.class); //job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path( "/c:/cygwin/usr/hadoop-1.0.3/input")); FileOutputFormat.setOutputPath(job, new Path( "/c:/cygwin/usr/hadoop-1.0.3/output")); long start=System.currentTimeMillis(); boolean done=job.waitForCompletion(true); LOG.error("cost:"+(System.currentTimeMillis()-start)); System.exit(done ? 0 : 1); } }
public class WordCountDemo2 { private static final Log LOG = LogFactory.getLog(WordCountDemo2.class); public static class CleanMapper extends MapReduceBase implements Mapper<Object, Text, Object, Text> { @Override public void map(Object key, Text value, OutputCollector<Object, Text> output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { String s = itr.nextToken(); if (s.contains("security")) { LOG.error("bingo:"+value.toString()); output.collect(key, value); break; } } } } public static class PostCleanMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> { @Override public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { if (key.toString().contains("security")) { output.collect(key, value); } } } public static class TokenizerMapper extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } public static class IntSumReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum+=values.next().get(); } result.set(sum); output.collect(key, result); //LOG.error(key.toString()+":"+result.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); JobConf job = new JobConf(conf); job.setJobName("ChainJob"); job.setJarByClass(WordCountDemo2.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path( "/c:/cygwin/usr/hadoop-1.0.3/input"),new Path( "/c:/cygwin/usr/hadoop-1.0.3/bin")); FileOutputFormat.setOutputPath(job, new Path( "/c:/cygwin/usr/hadoop-1.0.3/output")); JobConf map1Conf = new JobConf(false); JobConf map2Conf = new JobConf(false); //ChainMapper.addMapper(job, CleanMapper.class, Object.class, Text.class, Object.class, Text.class, false, map1Conf); ChainMapper.addMapper(job, TokenizerMapper.class, Object.class, Text.class, Text.class, IntWritable.class, true, map2Conf); JobConf reduceConf = new JobConf(false); ChainReducer.setReducer(job, IntSumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf); //ChainReducer.addMapper(job, PostCleanMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, false, map1Conf); long start=System.currentTimeMillis(); JobClient.runJob(job); LOG.error("cost:"+(System.currentTimeMillis()-start)); } }
public class DataJoin extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(DataJoin.class); public static class MapClass extends DataJoinMapperBase { protected Text generateInputTag(String inputFile) { String datasource = inputFile.split("@")[0]; LOG.error("datasource:" + datasource); return new Text(datasource); } protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); LOG.error("line:" + line); String[] tokens = line.split(","); String groupKey = tokens[0]; LOG.error("groupKey:" + groupKey); return new Text(groupKey); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable(); LOG.error("value:" + value); LOG.error("inputTag:" + inputTag); retv.setData((Writable) value); retv.setTag(this.inputTag); return retv; } } public static class Reduce extends DataJoinReducerBase { protected TaggedMapOutput combine(Object[] tags, Object[] values) { LOG.error("tags:" + tags); LOG.error("values:" + values); if (tags.length < 2) return null; String joinedStr = ""; for (int i = 0; i < values.length; i++) { if (i > 0) joinedStr += ","; TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); LOG.error("line2:" + line); String[] tokens = line.split(",", 2); joinedStr += tokens[1]; } TaggedWritable retv = new TaggedWritable(); LOG.error("joinedStr:" + joinedStr); LOG.error("tags[0]:" + tags[0]); retv.setData(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable() { this.tag = new Text(); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public Writable getData() { return data; } public void setData(Writable data) { this.data = data; } public void write(DataOutput out) throws IOException { this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(dataClz), null); } catch (ClassNotFoundException e) { LOG.error("readFields", e); } } this.data.readFields(in); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, DataJoin.class); FileInputFormat.setInputPaths(job, new Path( "/c:/cygwin/usr/hadoop-1.0.3/input/*.txt")); FileOutputFormat.setOutputPath(job, new Path( "/c:/cygwin/usr/hadoop-1.0.3/output")); job.setJobName("DataJoin"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DataJoin(), args); System.exit(res); } }
public class DataJoinDC extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(DataJoinDC.class); public static final String CACHE_LOCALFILES = "mapred.cache.localFiles"; public static final String CACHE_FILES = "mapred.cache.files"; public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DataJoinDC(), args); System.exit(res); } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, DataJoinDC.class); LOG.equals("cacheFile:" + new Path("/c:/cygwin/usr/hadoop-1.0.3/input/[email protected]") .toUri()); DistributedCache.addCacheFile(new Path( "/c:/cygwin/usr/hadoop-1.0.3/input/[email protected]").toUri(), conf); FileInputFormat.setInputPaths(job, new Path( "/c:/cygwin/usr/hadoop-1.0.3/input/[email protected]")); FileOutputFormat.setOutputPath(job, new Path( "/c:/cygwin/usr/hadoop-1.0.3/output")); LOG.equals("CACHE_LOCALFILES:" + conf.getStrings(CACHE_LOCALFILES)); LOG.equals("CACHE_LOCALFILES:" + conf.getStrings(CACHE_FILES)); job.setJobName("DataJoin with DistributedCache"); job.setMapperClass(MapClass.class); job.setNumReduceTasks(0); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.set("key.value.separator.in.input.line", ","); JobClient.runJob(job); return 0; } public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> { private Hashtable<String, String> joinData = new Hashtable<String, String>(); @Override public void configure(JobConf conf) { try { String line; String[] tokens; BufferedReader joinReader = new BufferedReader(new FileReader( "/c:/cygwin/usr/hadoop-1.0.3/input/[email protected]")); try { while ((line = joinReader.readLine()) != null) { LOG.error("line:" + line); tokens = line.split(",", 2); joinData.put(tokens[0], tokens[1]); } } finally { joinReader.close(); } } catch (IOException e) { System.err.println("Exception reading DistributedCache: " + e); } } public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { LOG.error("key:" + key); Set<Entry<String, String>> set = joinData.entrySet(); for (Entry<String, String> entry : set) { LOG.error("key:" + entry.getKey()); LOG.error("value:" + entry.getValue()); } String joinValue = joinData.get(key.toString()); LOG.error("joinValue:" + joinValue); LOG.error("value:" + value); if (joinValue != null) { output.collect(key, new Text(value.toString() + "," + joinValue)); } } } }
try数据统计:
试用品表和商品表的聚合数据(量小hive,表连接)
访问日志表,包括直接访问试用的日志,由试用引导到其他页面的数据。进行基本的uv统计,访问路径模式统计(hive过滤)。
试用交易表,试用商品对应的交易数据,引导交易数据(hive表连接)
试用用户表聚合uic表(hive表连接)
试用申请表用于同地址去重(mapreduce)