/** * 普通事务Spout */ public class MyTxSpout implements ITransactionalSpout<MyMata>{ private static final long serialVersionUID = 1L; /** * 数据源 */ Map<Long, String> dbMap = null; public MyTxSpout() { Random random = new Random(); dbMap = new HashMap<Long, String> (); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (long i = 0; i < 100; i++) { dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx","log")); } public Map<String, Object> getComponentConfiguration() { return null; } public org.apache.storm.transactional.ITransactionalSpout.Coordinator<MyMata> getCoordinator(Map conf, TopologyContext context) { /** * 发射该metadata(事务tuple)到“batch emit”流 */ return new MyCoordinator(); } public org.apache.storm.transactional.ITransactionalSpout.Emitter<MyMata> getEmitter(Map conf, TopologyContext context) { /** * 逐个发射实际batch的tuple */ return new MyEmitter(dbMap); } }
2、事务Spout创建一个新的事务(元数据)metadata
2、1元数据定义
public class MyMata implements Serializable{ /** * metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。 */ private static final long serialVersionUID = 1L; private long beginPoint ;//事务开始位置 private int num ;//batch 的tuple个数 @Override public String toString() { return getBeginPoint()+"----"+getNum(); } public long getBeginPoint() { return beginPoint; } public void setBeginPoint(long beginPoint) { this.beginPoint = beginPoint; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } }
2、2 获得(元数据)metadata,逐个发射实际batch的tuple
public class MyEmitter implements ITransactionalSpout.Emitter<MyMata> { Map<Long, String> dbMap = null; public MyEmitter(Map<Long, String> dbMap) { this.dbMap = dbMap; } //逐个发射实际batch的tuple public void emitBatch(TransactionAttempt tx, MyMata coordinatorMeta, BatchOutputCollector collector) { long beginPoint = coordinatorMeta.getBeginPoint();// 从上一个批次获得开始位置 int num = coordinatorMeta.getNum();// 从批次中获取批次数量 for (long i = beginPoint; i < num + beginPoint; i++) { if (dbMap.get(i) == null) { continue; } collector.emit(new Values(tx, dbMap.get(i))); } } public void cleanupBefore(BigInteger txid) { } public void close() { } }
3、事务Bolt,会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。
/** * 事务Bolt */ public class MyTransactionBolt extends BaseTransactionalBolt { private static final long serialVersionUID = 1L; public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.collector = collector; System.err.println("MyTransactionBolt prepare " + id.getTransactionId() + " attemptid" + id.getAttemptId()); } Integer count = 0; BatchOutputCollector collector; TransactionAttempt tx; // 会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。 public void execute(Tuple tuple) { tx = (TransactionAttempt) tuple.getValue(0); System.err.println( "MyTransactionBolt TransactionAttempt " + tx.getTransactionId() + " attemptid" + tx.getAttemptId()); String log = tuple.getString(1); if (log != null && log.length() > 0) { count++; } } // 批处理提交 public void finishBatch() { System.err.println("finishBatch " + count); collector.emit(new Values(tx, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx", "count")); } }
4、Icommitter,batch之间强制按照顺序进行提交
public class MyCommitter extends BaseTransactionalBolt implements ICommitter { /** * 接口Icommitter:标识IBatchBolt 或BaseTransactionalBolt是否是一个committer */ private static final long serialVersionUID = 1L; public static final String GLOBAL_KEY = "GLOBAL_KEY"; public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>(); int sum = 0; TransactionAttempt id; BatchOutputCollector collector; public void execute(Tuple tuple) { sum += tuple.getInteger(1); } public void finishBatch() { DbValue value = dbMap.get(GLOBAL_KEY); DbValue newValue; if (value == null || !value.txid.equals(id.getTransactionId())) { // 更新数据库 newValue = new DbValue(); newValue.txid = id.getTransactionId(); if (value == null) { newValue.count = sum; } else { newValue.count = value.count + sum; } dbMap.put(GLOBAL_KEY, newValue); } else { newValue = value; } System.err.println("total==========================:" + dbMap.get(GLOBAL_KEY).count); // collector.emit(tuple) } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.id = id; this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { } public static class DbValue { BigInteger txid; int count = 0; } }
5、topo类
public class MyTopo { /** * 事务topo */ public static void main(String[] args) { TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "spoutid", new MyTxSpout(), 1); builder.setBolt("bolt1", new MyTransactionBolt(), 3).shuffleGrouping("spoutid"); builder.setBolt("committer", new MyCommitter(), 1).shuffleGrouping("bolt1"); Config conf = new Config(); conf.setDebug(true); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.buildTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.buildTopology()); } } }
6、测试结果
引用
启动一个事务:0----10
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 3
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 4
finishBatch 3
total==========================:10
启动一个事务:10----10
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 3
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 4
finishBatch 3
total==========================:20