方案需要考虑分析多线程下,注意线程安全问题。
线程安全:多线程处理的结果和单线程一致
如下是否可行?
不可行方案:
定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。
可行方案两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
二、实现
注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。
1、bolt1进行多并发(局部)汇总处理类
public class PVBolt1 implements IRichBolt{ /** * bolt1进行多并发(局部)汇总 */ OutputCollector collector = null; private static final long serialVersionUID = 1L; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } String logString; String session_id; long pv = 0; public void execute(Tuple input) { logString = input.getString(0); session_id = logString.split("\t")[1]; if(session_id !=null){ pv ++; } collector.emit(new Values(Thread.currentThread().getId(),pv)); System.err.println("threadId = "+ Thread.currentThread().getId()+"; pv="+pv); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("threadId", "count")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
2、bolt2单线程进行全局汇总处理类
public class PVBolt2 implements IRichBolt{ /** * bolt2单线程进行全局汇总 */ private static final long serialVersionUID = 1L; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub } Map<Long,Long>counts = new HashMap<Long,Long>(); public void execute(Tuple input) { Long thread_id = input.getLong(0); Long pv = input.getLong(1); counts.put(thread_id,pv); System.err.println(" threadId="+thread_id+"-------------pv="+pv); long word_sum = 0; //获取总数,遍历counts 的values,进行sum Iterator<Long> i = counts.values().iterator() ; while(i.hasNext()) { word_sum += i.next(); } System.err.println("PVBolt2-------------pv="+word_sum+"\r"); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
3、topology运行main类
public class Main { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new MySpout(), 1); builder.setBolt("bolt1", new PVBolt1(),4).shuffleGrouping("spout"); builder.setBolt("bolt2", new PVBolt2(),1).shuffleGrouping("bolt1"); Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); }catch (AuthorizationException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
-------------------------------其它辅助类---------------------------
4、数据读取spout处理类
public class MySpout implements IRichSpout{ /** * 数据读取spout处理类 */ private static final long serialVersionUID = 1L; FileInputStream fis; InputStreamReader isr; BufferedReader br; SpoutOutputCollector collector = null; String str = null; public void nextTuple() { try { while ((str = this.br.readLine()) != null) { // 过滤动作 collector.emit(new Values(str)); // Thread.sleep(3000); //to do } } catch (Exception e) { // TODO: handle exception } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; this.fis = new FileInputStream("track.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); } catch (Exception e) { e.printStackTrace(); } // 打开文件 } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 发射数据格式,与bolt接收数据一致 declarer.declare(new Fields("log")); } public Map<String, Object> getComponentConfiguration() { // 与ope方法中的map对应 return null; } public void ack(Object msgId) { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub } }
5、pom文件引用前几篇文章
6、处理结果
引用
threadId=156-------------pv=44
PVBolt2-------------pv=44
threadId=156-------------pv=45
PVBolt2-------------pv=45
threadId=156-------------pv=46
PVBolt2-------------pv=46
threadId=156-------------pv=47
PVBolt2-------------pv=47
threadId=152-------------pv=1
PVBolt2-------------pv=48
threadId=215-------------pv=1
PVBolt2-------------pv=49
9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
threadId=227-------------pv=1
PVBolt2-------------pv=50
PVBolt2-------------pv=44
threadId=156-------------pv=45
PVBolt2-------------pv=45
threadId=156-------------pv=46
PVBolt2-------------pv=46
threadId=156-------------pv=47
PVBolt2-------------pv=47
threadId=152-------------pv=1
PVBolt2-------------pv=48
threadId=215-------------pv=1
PVBolt2-------------pv=49
9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
threadId=227-------------pv=1
PVBolt2-------------pv=50