bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:
Pv、UV、访问深度(按每个session_id 的浏览数)
2014-05-01 UV数(按日期统计)
既然去重,必须持久化。两种持久化数据:
1、内存(适用中小型数据)
数据结构Map
2、no-sql 分布式数据库,如Hbase(适用大型数据)
1、数据源
public class SourceSpout implements IRichSpout{ /** * 数据源Spout */ private static final long serialVersionUID = 1L; Queue<String> queue = new ConcurrentLinkedQueue<String>(); SpoutOutputCollector collector = null; String str = null; public void nextTuple() { if (queue.size() >= 0) { collector.emit(new Values(queue.poll())); } try { Thread.sleep(500) ; } catch (InterruptedException e) { e.printStackTrace(); } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (int i = 0; i < 20; i++) { queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } catch (Exception e) { e.printStackTrace(); } } public void close() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("log")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void ack(Object msgId) { // TODO Auto-generated method stub System.out.println("spout ack:"+msgId.toString()); } public void activate() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub System.out.println("spout fail:"+msgId.toString()); } }
2、日期格式化处理类
public class FmtLogBolt implements IBasicBolt{ /** * 格式化日期 */ private static final long serialVersionUID = 1L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date","session_id")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } String eachLog = null; public void execute(Tuple input, BasicOutputCollector collector) { eachLog=input.getStringByField("log"); if (eachLog != null && eachLog.length() > 0 ) { collector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2],DateFmt.date_short),eachLog.split("\t")[1])) ;// 日期, session_id } } public void cleanup() { // TODO Auto-generated method stub } }
3、多线程局部汇总深度数据
public class DeepVisitBolt implements IBasicBolt{ /** * 多线程局部汇总深度数据 */ private static final long serialVersionUID = 1L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_session_id","count")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple input, BasicOutputCollector collector) { String dateString =input.getStringByField("date"); String session_id = input.getStringByField("session_id"); Integer count = counts.get(dateString+"_"+session_id); if (count == null) { count = 0; } count ++ ; counts.put(dateString+"_"+session_id,count) ; collector.emit(new Values(dateString+"_"+session_id,count)) ; } public void cleanup() { // TODO Auto-generated method stub }; }
4、单线程汇总数据
public class UVSumBolt implements IBasicBolt{ /** * 单线程汇总数据 */ private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { cur_date = DateFmt.getCountDate("2014-01-07", DateFmt.date_short); } long beginTime = System.currentTimeMillis() ; long endTime = 0; String cur_date = null; public void execute(Tuple input, BasicOutputCollector collector) { try { endTime = System.currentTimeMillis() ; long PV = 0;// 总数 long UV = 0; // 个数,去重后 String dateSession_id = input.getString(0); Integer count = input.getInteger(1); //清空不是当天的数据 if (!dateSession_id.startsWith(cur_date) && DateFmt.parseDate(dateSession_id.split("_")[0]).after( DateFmt.parseDate(cur_date))) { cur_date = dateSession_id.split("_")[0]; counts.clear(); } counts.put(dateSession_id, count); if (endTime - beginTime >= 2000) {//两秒输出一次 // 获取word去重个数,遍历counts 的keySet,取count Iterator<String> i2 = counts.keySet().iterator(); while (i2.hasNext()) { String key = i2.next(); if (key != null) { if (key.startsWith(cur_date)) { UV++; PV += counts.get(key); } } } System.err.println("PV=" + PV + "; UV="+ UV); } } catch (Exception e) { throw new FailedException("SumBolt fail!"); } } public void cleanup() { // TODO Auto-generated method stub } }
5、topoly类
public class UVTopo { /** * topoly类 */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SourceSpout(), 1); builder.setBolt("FmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout"); // Fields Grouping:按Field分组,比如按word来分组, 具有同样word的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 builder.setBolt("sumBolt", new DeepVisitBolt(),4).fieldsGrouping("FmtLogBolt", new Fields("date","session_id")); builder.setBolt("UvSum", new UVSumBolt(), 1).shuffleGrouping("sumBolt") ; Config conf = new Config() ; conf.setDebug(true); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
6、pom.xml文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
7、日期处理类
public class DateFmt { /* * 日期处理类 */ public static final String date_long = "yyyy-MM-dd HH:mm:ss" ; public static final String date_short = "yyyy-MM-dd" ; public static SimpleDateFormat sdf = new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton) { SimpleDateFormat sdf = new SimpleDateFormat(patton); Calendar cal = Calendar.getInstance(); if (date != null) { try { cal.setTime(sdf.parse(date)) ; } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception { return sdf.parse(dateStr); } public static void main(String[] args) throws Exception{ // System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short)); System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01"))); } }
8、测试结果