hadoop编程笔记

1 基本hadoop程序

2 输入输出格式

3 多个map reduce管道(已经实践,靠谱,可否支持写入不同的文件呢?)

4 数据join。

map端join

reduce端join(分组,打标志,以关联字段为key)

分布式缓存(非对称,小数据)

布隆过滤器(假负率为0,位图,缩小存储空间,非对称表join常用手段)

http://www.google.com.hk/ggblog/googlechinablog/2007/07/bloom-filter_7469.html

public class WordCountDemo {
	private static final Log LOG = LogFactory.getLog(WordCountDemo.class);

	public static class TokenizerMapper extends
			Mapper<Object, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(10);
		private Text word = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			//LOG.error(key.toString()+":"+result.get());
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCountDemo.class);
		job.setMapperClass(TokenizerMapper.class);
		//job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.addInputPath(job, new Path(
				"/c:/cygwin/usr/hadoop-1.0.3/input"));
		FileOutputFormat.setOutputPath(job, new Path(
				"/c:/cygwin/usr/hadoop-1.0.3/output"));
		long start=System.currentTimeMillis();
		boolean done=job.waitForCompletion(true);
		LOG.error("cost:"+(System.currentTimeMillis()-start));
		System.exit(done ? 0 : 1);
	}
}
public class WordCountDemo2 {
	private static final Log LOG = LogFactory.getLog(WordCountDemo2.class);

	public static class CleanMapper extends MapReduceBase implements
			Mapper<Object, Text, Object, Text> {
		@Override
		public void map(Object key, Text value,
				OutputCollector<Object, Text> output, Reporter reporter)
				throws IOException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				String s = itr.nextToken();
				if (s.contains("security")) {
					LOG.error("bingo:"+value.toString());
					output.collect(key, value);
					break;
				}
			}
		}

	}

	public static class PostCleanMapper extends MapReduceBase implements
			Mapper<Text, IntWritable, Text, IntWritable> {
		@Override
		public void map(Text key, IntWritable value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			if (key.toString().contains("security")) {
				output.collect(key, value);
			}
		}

}
	public static class TokenizerMapper extends MapReduceBase implements
			Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		@Override
		public void map(Object key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				output.collect(word, one);
			}
		}

	}
	public static class IntSumReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> 
		{
		private IntWritable result = new IntWritable();


		@Override
		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			int sum = 0;
			while (values.hasNext()) {
				sum+=values.next().get();
			}
			
			result.set(sum);
			output.collect(key, result);
			//LOG.error(key.toString()+":"+result.get());
			
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		JobConf job = new JobConf(conf);
		job.setJobName("ChainJob");
		job.setJarByClass(WordCountDemo2.class);
		job.setInputFormat(TextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(
				"/c:/cygwin/usr/hadoop-1.0.3/input"),new Path(
						"/c:/cygwin/usr/hadoop-1.0.3/bin"));
		FileOutputFormat.setOutputPath(job, new Path(
				"/c:/cygwin/usr/hadoop-1.0.3/output"));
		JobConf map1Conf = new JobConf(false); 
		JobConf map2Conf = new JobConf(false); 
		//ChainMapper.addMapper(job, CleanMapper.class, Object.class, Text.class, Object.class, Text.class, false, map1Conf);
		ChainMapper.addMapper(job, TokenizerMapper.class,  Object.class, Text.class, Text.class, IntWritable.class, true, map2Conf);
		JobConf reduceConf = new JobConf(false);
		ChainReducer.setReducer(job, IntSumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);
		//ChainReducer.addMapper(job, PostCleanMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, false, map1Conf);
		long start=System.currentTimeMillis();
		JobClient.runJob(job);
		LOG.error("cost:"+(System.currentTimeMillis()-start));
	}
}
 
public class DataJoin extends Configured implements Tool {
	private static final Log LOG = LogFactory.getLog(DataJoin.class);

	public static class MapClass extends DataJoinMapperBase {
		protected Text generateInputTag(String inputFile) {
			String datasource = inputFile.split("@")[0];
			LOG.error("datasource:" + datasource);
			return new Text(datasource);
		}

		protected Text generateGroupKey(TaggedMapOutput aRecord) {
			String line = ((Text) aRecord.getData()).toString();
			LOG.error("line:" + line);
			String[] tokens = line.split(",");
			String groupKey = tokens[0];
			LOG.error("groupKey:" + groupKey);
			return new Text(groupKey);
		}

		protected TaggedMapOutput generateTaggedMapOutput(Object value) {
			TaggedWritable retv = new TaggedWritable();
			LOG.error("value:" + value);
			LOG.error("inputTag:" + inputTag);
			retv.setData((Writable) value);
			retv.setTag(this.inputTag);
			return retv;
		}
	}

	public static class Reduce extends DataJoinReducerBase {
		protected TaggedMapOutput combine(Object[] tags, Object[] values) {
			LOG.error("tags:" + tags);
			LOG.error("values:" + values);
			if (tags.length < 2)
				return null;
			String joinedStr = "";
			for (int i = 0; i < values.length; i++) {
				if (i > 0)
					joinedStr += ",";
				TaggedWritable tw = (TaggedWritable) values[i];
				String line = ((Text) tw.getData()).toString();
				LOG.error("line2:" + line);
				String[] tokens = line.split(",", 2);
				joinedStr += tokens[1];
			}
			TaggedWritable retv = new TaggedWritable();
			LOG.error("joinedStr:" + joinedStr);
			LOG.error("tags[0]:" + tags[0]);
			retv.setData(new Text(joinedStr));
			retv.setTag((Text) tags[0]);
			return retv;
		}
	}

	public static class TaggedWritable extends TaggedMapOutput {
		private Writable data;

		public TaggedWritable() {
			this.tag = new Text();
		}

		public TaggedWritable(Writable data) {
			this.tag = new Text("");
			this.data = data;
		}

		public Writable getData() {
			return data;
		}

		public void setData(Writable data) {
			this.data = data;
		}

		public void write(DataOutput out) throws IOException {
			this.tag.write(out);
			out.writeUTF(this.data.getClass().getName());
			this.data.write(out);
		}

		public void readFields(DataInput in) throws IOException {
			this.tag.readFields(in);
			String dataClz = in.readUTF();
			if (this.data == null
					|| !this.data.getClass().getName().equals(dataClz)) {
				try {
					this.data = (Writable) ReflectionUtils.newInstance(
							Class.forName(dataClz), null);
				} catch (ClassNotFoundException e) {
					LOG.error("readFields", e);
				}
			}
			this.data.readFields(in);
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		JobConf job = new JobConf(conf, DataJoin.class);
		FileInputFormat.setInputPaths(job, new Path(
				"/c:/cygwin/usr/hadoop-1.0.3/input/*.txt"));
		FileOutputFormat.setOutputPath(job, new Path(
				"/c:/cygwin/usr/hadoop-1.0.3/output"));
		job.setJobName("DataJoin");
		job.setMapperClass(MapClass.class);
		job.setReducerClass(Reduce.class);
		job.setInputFormat(TextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TaggedWritable.class);
		job.set("mapred.textoutputformat.separator", ",");
		JobClient.runJob(job);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new DataJoin(), args);
		System.exit(res);
	}
}
 
public class DataJoinDC extends Configured implements Tool {
	private static final Log LOG = LogFactory.getLog(DataJoinDC.class);
	public static final String CACHE_LOCALFILES = "mapred.cache.localFiles";
	public static final String CACHE_FILES = "mapred.cache.files";

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new DataJoinDC(), args);
		System.exit(res);
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		JobConf job = new JobConf(conf, DataJoinDC.class);
		LOG.equals("cacheFile:"
				+ new Path("/c:/cygwin/usr/hadoop-1.0.3/input/[email protected]")
						.toUri());
		DistributedCache.addCacheFile(new Path(
				"/c:/cygwin/usr/hadoop-1.0.3/input/[email protected]").toUri(),
				conf);
		FileInputFormat.setInputPaths(job, new Path(
				"/c:/cygwin/usr/hadoop-1.0.3/input/[email protected]"));
		FileOutputFormat.setOutputPath(job, new Path(
				"/c:/cygwin/usr/hadoop-1.0.3/output"));
		LOG.equals("CACHE_LOCALFILES:" + conf.getStrings(CACHE_LOCALFILES));
		LOG.equals("CACHE_LOCALFILES:" + conf.getStrings(CACHE_FILES));

		job.setJobName("DataJoin with DistributedCache");
		job.setMapperClass(MapClass.class);
		job.setNumReduceTasks(0);
		job.setInputFormat(KeyValueTextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);
		job.set("key.value.separator.in.input.line", ",");
		JobClient.runJob(job);
		return 0;
	}

	public static class MapClass extends MapReduceBase implements
			Mapper<Text, Text, Text, Text> {
		private Hashtable<String, String> joinData = new Hashtable<String, String>();

		@Override
		public void configure(JobConf conf) {
			try {
				String line;
				String[] tokens;
				BufferedReader joinReader = new BufferedReader(new FileReader(
						"/c:/cygwin/usr/hadoop-1.0.3/input/[email protected]"));
				try {
					while ((line = joinReader.readLine()) != null) {
						LOG.error("line:" + line);
						tokens = line.split(",", 2);
						joinData.put(tokens[0], tokens[1]);
					}
				} finally {
					joinReader.close();
				}
			} catch (IOException e) {
				System.err.println("Exception reading DistributedCache: " + e);
			}
		}

		public void map(Text key, Text value,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
			LOG.error("key:" + key);
			Set<Entry<String, String>> set = joinData.entrySet();
			for (Entry<String, String> entry : set) {
				LOG.error("key:" + entry.getKey());
				LOG.error("value:" + entry.getValue());
			}
			String joinValue = joinData.get(key.toString());
			LOG.error("joinValue:" + joinValue);
			LOG.error("value:" + value);
			if (joinValue != null) {
				output.collect(key,
						new Text(value.toString() + "," + joinValue));
			}
		}
	}
}
 

try数据统计:

试用品表和商品表的聚合数据(量小hive,表连接)

访问日志表,包括直接访问试用的日志,由试用引导到其他页面的数据。进行基本的uv统计,访问路径模式统计(hive过滤)。

试用交易表,试用商品对应的交易数据,引导交易数据(hive表连接)

试用用户表聚合uic表(hive表连接)

试用申请表用于同地址去重(mapreduce)

猜你喜欢

转载自hill007299.iteye.com/blog/1674720