上一讲 我们讲过了 BaseTransactionalSpout 如何实现 Tweet Transactional Topology, 这讲使用多Partition redis。
对一个spout来说,从一个分区集合中读取批次是很普通的。接着这个例子,你可能有很多redis数据库,而tweets可能会分别保存在这些redis数据库里。通过实现IPartitionedTransactionalSpout,Storm提供了一些工具用来管理每个分区的状态并保证重播的能力。
1. TweetsPartitionRedis 采用Jedis[]模拟多个分区
package com.john.learn.storm.transaction.tweets.analytics.redis; import java.math.BigInteger; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.utils.Utils; import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.TweetRedisCommiterBolt; import com.john.learn.storm.transaction.tweets.analytics.redis.spout.TransactionMetadata; import com.john.learn.storm.transaction.tweets.analytics.redis.spout.TweetsPartitionedTransactionalSpoutCoordinator; import clojure.main; import redis.clients.jedis.Jedis; public class TweetsPartitionRedis { public static final String NEXT_READ_POSTION = "Tweets.Redis.NEXT_READ_POSTION"; public static final String NEXT_WRITE_POSTION = "Tweets.Redis.NEXT_WRITE_POSTION"; public static final int Redis_Partition_Number = 4; private Jedis[] jedis; public TweetsPartitionRedis(String hostName, int port, String password) { // 模拟多个Redis , 此处使用Redis Databases jedis = new Jedis[Redis_Partition_Number]; for (int i = 0; i < Redis_Partition_Number; i++) { Jedis _jedis = new Jedis(hostName, port, 10000); if (!StringUtils.isEmpty(password)) { _jedis.auth(password); } _jedis.select(i); jedis[i] = _jedis; } } public TweetsPartitionRedis(String hostName, int port) { this(hostName, port, null); } public List<String> getMessages(int partition, long from, int quantity) { if (quantity == 0) { return Collections.EMPTY_LIST; } Jedis jedis = this.getJedis(partition); String[] keys = new String[quantity]; for (int i = 0; i < quantity; i++) { keys[i] = MESSAGE_ID_KEY + "." + (from + i); } return jedis.mget(keys); } public String get(int partition, String key) { Jedis jedis = this.getJedis(partition); return jedis.get(key); } public void close() { for (Jedis _jedis : jedis) { try { _jedis.disconnect(); } catch (Exception e) { } } } public int getRedisPartition() { return this.jedis.length; } public void clear() { for (Jedis _jedis : jedis) { String[] keys = _jedis.keys("Tweets.*").toArray(new String[0]); if (keys.length > 0) { _jedis.del(keys); } _jedis.del(NEXT_WRITE_POSTION); _jedis.del(NEXT_READ_POSTION); _jedis.del(TweetRedisCommiterBolt.LAST_COMMITED_TRANSACTION); } } public void addMessage(int partition, String message) { Jedis jedis = this.getJedis(partition); long index = jedis.incr(NEXT_WRITE_POSTION); jedis.set(MESSAGE_ID_KEY + "." + index, message); } public Jedis getJedis(int partition) { return this.jedis[partition % this.jedis.length]; } public long getNextWrite(int partition) { Jedis jedis = this.getJedis(partition); String position = jedis.get(NEXT_WRITE_POSTION); if (position == null) { return 1; } return Long.valueOf(position) + 1; } public long getNextRead(int partition) { Jedis jedis = this.getJedis(partition); String position = jedis.get(NEXT_READ_POSTION); if (position == null) { return 1; } return Long.valueOf(position); } public void setNextRead(int partition, long position) { Jedis jedis = this.getJedis(partition); jedis.set(NEXT_READ_POSTION, String.valueOf(position)); } public long getAvailableToRead(int partition, long current) { long items = getNextWrite(partition) - current; return items > 0 ? items : 0; } /** * 模拟继续发送 * * @param args */ public static void main2(String[] args) throws InterruptedException { TweetsPartitionRedis tweetsRedis = new TweetsPartitionRedis("10.10.103.188", 6379, "UATRedisAuth"); tweetsRedis.clear(); int maxCount = 10; int count = 0; int partition = 0; while (count < maxCount) { for (int i = 0; i < 100; i++) { long tx = System.currentTimeMillis(); tweetsRedis.addMessage(partition++, "@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system."); } count++; } for (String key : tweetsRedis.getJedis(0).keys("Tweets.Redis.*.Frequency")) { System.out.println("-------------" + key + "-------------"); System.out.println(tweetsRedis.getJedis(0).hgetAll(key)); } } public static void main(String[] args) { TweetsPartitionRedis tweetsRedis = new TweetsPartitionRedis("127.0.0.1", 6379, "test"); // 你可以清空数据,从来 // tweetsRedis.clear(); int partition = 0; tweetsRedis.addMessage(partition++, "Hi @Tom @Simith 1. I want to #USA# and #Hometown# city."); tweetsRedis.addMessage(partition++, "Hi @John @david @vivian 2. I want to #China# and #BeiJing# city."); tweetsRedis.addMessage(partition++, "@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system."); tweetsRedis.addMessage(partition++, "Hi @david @vivian 4. I want to #China# and #BeiJing# city."); System.out.println("------------- Test TweetsTransactionalSpoutCoordinator Start-------------"); // TweetsTransactionalSpoutCoordinator transactionalSpoutCoordinator = new // TweetsTransactionalSpoutCoordinator(); // // TransactionMetadata transactionMetadata = null; // // while (transactionalSpoutCoordinator.isReady()) { // // transactionMetadata = // transactionalSpoutCoordinator.initializeTransaction(BigInteger.valueOf(1), // transactionMetadata); // // System.out.println("SpoutCoordinator Initialize Transaction Meta: " + // transactionMetadata); // // } System.out.println("------------- Test TweetsTransactionalSpoutCoordinator End-------------"); } private static final String MESSAGE_ID_KEY = "Tweets.Message"; }
2. TweetsPartitionedTransactionalSpout
package com.john.learn.storm.transaction.tweets.analytics.redis.spout; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BasePartitionedTransactionalSpout; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.topology.base.BaseTransactionalSpout; import org.apache.storm.transactional.ITransactionalSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsPartitionRedis; public class TweetsPartitionedTransactionalSpout extends BasePartitionedTransactionalSpout<TransactionMetadata> { private static final long serialVersionUID = 1L; @Override public Coordinator getCoordinator(Map config, TopologyContext context) { return new TweetsPartitionedTransactionalSpoutCoordinator(); } @Override public Emitter<TransactionMetadata> getEmitter(Map config, TopologyContext context) { return new TweetsPartitionedTransactionalSpoutEmitter(); } @Override public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) { /** * Refer to Emitter collector.emit(new Values(transactionAttempt, * String.valueOf(tweetId), tweet)); */ fieldsDeclarer.declare(new Fields("txid", "tweetId", "tweet")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } private SpoutOutputCollector collector; }
3. TweetsPartitionedTransactionalSpoutCoordinator
package com.john.learn.storm.transaction.tweets.analytics.redis.spout; import java.math.BigInteger; import org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout.Coordinator; import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsPartitionRedis; public class TweetsPartitionedTransactionalSpoutCoordinator implements Coordinator { public TweetsPartitionedTransactionalSpoutCoordinator() { this.tweetsRedis = new TweetsPartitionRedis("127.0.0.1", 6379, "test"); } @Override public void close() { tweetsRedis.close(); } @Override public boolean isReady() { //check 是否还有新的数据 for (int i = 0; i < numPartitions(); i++) { if(tweetsRedis.getAvailableToRead(i, tweetsRedis.getNextRead(i))>0) { return true; } } return false; } @Override public int numPartitions() { return TweetsPartitionRedis.Redis_Partition_Number; } private transient TweetsPartitionRedis tweetsRedis; }
在这个例子里,协调器很简单。numPartitions方法,告诉Storm一共有多少分区。而且你要注意,不要返回任何元数据。对于IPartitionedTransactionalSpout,元数据由分发器直接管理。
4. TweetsPartitionedTransactionalSpoutEmitter
package com.john.learn.storm.transaction.tweets.analytics.redis.spout; import java.math.BigInteger; import org.apache.storm.coordination.BatchOutputCollector; import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsPartitionRedis; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout.Emitter; import org.apache.storm.tuple.Values; public class TweetsPartitionedTransactionalSpoutEmitter implements Emitter<TransactionMetadata> { public TweetsPartitionedTransactionalSpoutEmitter() { this.tweetsRedis = new TweetsPartitionRedis("10.10.103.188", 6379, "UATRedisAuth"); } @Override public void emitPartitionBatch(TransactionAttempt transactionAttempt, BatchOutputCollector collector, int partition, TransactionMetadata metadata) { if (metadata.quantity <= 0) { return; } long tweetId = metadata.from; for (String tweet : tweetsRedis.getMessages(partition, metadata.from, metadata.quantity)) { if (StringUtils.isEmpty(tweet)) { continue; } collector.emit(new Values(transactionAttempt, String.valueOf(tweetId), tweet)); tweetId++; } } @Override public TransactionMetadata emitPartitionBatchNew(TransactionAttempt transactionAttempt, BatchOutputCollector batchOutputCollector, int partition, TransactionMetadata lastPartitioonMeta) { long nextRead = 0; if (lastPartitioonMeta == null) { nextRead = tweetsRedis.getNextRead(partition); } else { nextRead = lastPartitioonMeta.from + lastPartitioonMeta.quantity; // 非常巧妙,保存上一次数据位置, 如果emitPartitionBatch 调用失败,将会重新读取数据 tweetsRedis.setNextRead(partition, nextRead); } long quantity = tweetsRedis.getAvailableToRead(partition, nextRead); quantity = Math.min(quantity, TRANSACTION_MAX_SIZE); TransactionMetadata metadata = new TransactionMetadata(nextRead, (int) quantity); emitPartitionBatch(transactionAttempt, batchOutputCollector, partition, metadata); return metadata; } @Override public void close() { tweetsRedis.close(); } private transient TweetsPartitionRedis tweetsRedis; private static final int TRANSACTION_MAX_SIZE = 1000; }
这里有两个重要的方法,emitPartitionBatchNew 和 emitPartitionBatch。对于emitPartitionBatchNew,从Storm接收分区参数,该参数决定应该从哪个分区读取批次。在这个方法中,决定获取哪些tweets,生成相应的元数据对象,调用emitPartitionBatch,返回元数据对象,并且元数据对象会在方法返回时立即保存到zookeeper
Storm会为每一个分区发送相同的事务ID,表示一个事务贯穿了所有数据分区。通过emitPartitionBatch读取分区中的tweets,并向拓扑分发批次。如果批次处理失败了,Storm将会调用emitPartitionBatch利用保存下来的元数据重复这个批次。
5. TweetRedisCommiterBolt
package com.john.learn.storm.transaction.tweets.analytics.redis.bolt; import java.util.HashMap; import java.util.Map; import org.apache.storm.coordination.BatchOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseTransactionalBolt; import org.apache.storm.transactional.ICommitter; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import com.john.learn.storm.transaction.tweets.analytics.redis.TweetsPartitionRedis; import redis.clients.jedis.Transaction; public class TweetRedisCommiterBolt extends BaseTransactionalBolt implements ICommitter { @Override public void prepare(Map config, TopologyContext context, BatchOutputCollector collector, TransactionAttempt transactionAttempt) { this.transactionAttempt = transactionAttempt; sourceCounterMapCounters = new HashMap<>(); // 避免重复初始化redis 连接 if (tweetsRedis == null) { this.tweetsRedis = new TweetsPartitionRedis("127.0.0.1", 6379, "test"); } hasMessage = false; } /** * topologyBuilder.setBolt("Users-Splitter", new UserSplitterBolt(), 4) * .shuffleGrouping("TweetsTransactionalSpout"); * * topologyBuilder.setBolt("TopicTags-Splitter", new TopicTagSplitterBolt(), 4) * .shuffleGrouping("TweetsTransactionalSpout"); * * topologyBuilder.setBolt("User-TopicTag-Join", new UserTopicTagJoinBolt(), 1) * .fieldsGrouping("Users-Splitter", "Users", new Fields("tweetId")) * .fieldsGrouping("TopicTags-Splitter", "TopicTags", new Fields("tweetId")); * */ @Override public void execute(Tuple tuple) { String source = tuple.getSourceComponent(); if (source.equalsIgnoreCase("Users-Splitter")) { count("Tweets.Redis.Users.Frequency", tuple.getStringByField("user"), 1); return; } if (source.equalsIgnoreCase("TopicTags-Splitter")) { count("Tweets.Redis.TopicTags.Frequency", tuple.getStringByField("topicTag"), 1); return; } if (source.equalsIgnoreCase("User-TopicTag-Join")) { count("Tweets.Redis.UserTopicTags.Frequency", tuple.getStringByField("user") + ":" + tuple.getStringByField("topicTag"), tuple.getIntegerByField("count")); } } private void count(String sourceCounterMapKey, String counterTag, Integer count) { hasMessage = true; if (!sourceCounterMapCounters.containsKey(sourceCounterMapKey)) { sourceCounterMapCounters.put(sourceCounterMapKey, new HashMap<>()); } Map<String, Integer> counters = sourceCounterMapCounters.get(sourceCounterMapKey); Integer prevTotalCount = counters.get(counterTag); if (prevTotalCount == null) { prevTotalCount = 0; } counters.put(counterTag, prevTotalCount + count); } @Override public void finishBatch() { String lastCommitTransaction = tweetsRedis.get(0, LAST_COMMITED_TRANSACTION); System.out.println( "this.transactionAttempt.getTransactionId() ======== " + this.transactionAttempt.getTransactionId()); System.out.println("lastCommitTransaction ===== " + lastCommitTransaction); if(hasMessage==false) { return; } if (String.valueOf(this.transactionAttempt.getTransactionId()).equals(lastCommitTransaction)) { return; } Transaction multi = tweetsRedis.getJedis(0).multi(); multi.set(LAST_COMMITED_TRANSACTION, String.valueOf(transactionAttempt.getTransactionId())); for (String sourceCounterKey : sourceCounterMapCounters.keySet()) { Map<String, Integer> sourceTotalMap = sourceCounterMapCounters.get(sourceCounterKey); for (String counterTag : sourceTotalMap.keySet()) { multi.hincrBy(sourceCounterKey, counterTag, sourceTotalMap.get(counterTag)); } } multi.exec(); printResult(); } private void printResult() { System.out.println("-------------printResult-------------"); for (String key : tweetsRedis.getJedis(0).keys("Tweets.Redis.*.Frequency")) { System.out.println("-------------" + key + "-------------"); System.out.println(tweetsRedis.getJedis(0).hgetAll(key)); } } @Override public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) { } private Map<String, Map<String, Integer>> sourceCounterMapCounters; private TransactionAttempt transactionAttempt; private static TweetsPartitionRedis tweetsRedis; private boolean hasMessage; public static final String LAST_COMMITED_TRANSACTION = "Tweets.Redis.LAST_COMMIT"; private static final long serialVersionUID = 1L; }
注意:我们使用Redis中 database=0数据库 保存最终的结果。 hasMessage 用于控制 是否有数据需要处理。
如果 TweetsPartitionedTransactionalSpoutCoordinator 中 isReady() 方法没有增加是否还有数据的检查,始终返回true的话,这里最好使用hasMessage 提高系统性能。
6. TweetsPartitionedTransactionalTopology
package com.john.learn.storm.transaction.tweets.analytics.redis; import java.text.SimpleDateFormat; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.transactional.TransactionalTopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.TopicTagSplitterBolt; import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.TweetRedisCommiterBolt; import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.UserSplitterBolt; import com.john.learn.storm.transaction.tweets.analytics.redis.bolt.UserTopicTagJoinBolt; import com.john.learn.storm.transaction.tweets.analytics.redis.spout.TweetsPartitionedTransactionalSpout; public class TweetsPartitionedTransactionalTopology { public static void main(String[] args) { TransactionalTopologyBuilder topologyBuilder = new TransactionalTopologyBuilder("TweetsTransactionalTopology", "TweetsTransactionalSpout", new TweetsPartitionedTransactionalSpout()); topologyBuilder.setBolt("Users-Splitter", new UserSplitterBolt(), 4) .shuffleGrouping("TweetsTransactionalSpout"); topologyBuilder.setBolt("TopicTags-Splitter", new TopicTagSplitterBolt(), 4) .shuffleGrouping("TweetsTransactionalSpout"); topologyBuilder.setBolt("User-TopicTag-Join", new UserTopicTagJoinBolt(), 8) .fieldsGrouping("Users-Splitter", "Users", new Fields("tweetId")) .fieldsGrouping("TopicTags-Splitter", "TopicTags", new Fields("tweetId")); topologyBuilder.setBolt("Tweet-RedisCommiter", new TweetRedisCommiterBolt()) .globalGrouping("User-TopicTag-Join").globalGrouping("Users-Splitter", "Users") .globalGrouping("TopicTags-Splitter", "TopicTags"); // Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TweetsTransactionalTopology", conf, topologyBuilder.buildTopology()); // Utils.sleep(5000); // // cluster.killTopology("TiwtterReachTopology"); // // cluster.shutdown(); } }
7. 运行结果
a. 先启动 TweetsPartitionedTransactionalTopology 中main 方法
b. 启动 TweetsPartitionRedis main 方法
public static void main(String[] args) { TweetsPartitionRedis tweetsRedis = new TweetsPartitionRedis("10.10.103.188", 6379, "UATRedisAuth"); // 你可以清空数据,从来 tweetsRedis.clear(); int partition = 0; tweetsRedis.addMessage(partition++, "Hi @Tom @Simith 1. I want to #USA# and #Hometown# city."); tweetsRedis.addMessage(partition++, "Hi @John @david @vivian 2. I want to #China# and #BeiJing# city."); tweetsRedis.addMessage(partition++, "@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system."); tweetsRedis.addMessage(partition++, "Hi @david @vivian 4. I want to #China# and #BeiJing# city."); }
执行结果
-------------printResult------------- -------------Tweets.Redis.UserTopicTags.Frequency------------- {@alex:#computation#=1, @vivian:#Storm#=1, @Simith:#computation#=1, @Tom:#realtime#=1, @Tom:#China#=1, @david:#USA#=1, @alex:#China#=1, @John:#Hometown#=1, @alex:#Storm#=1, @david:#Hometown#=1, @Simith:#USA#=1, @vivian:#Hometown#=1, @Simith:#Storm#=1, @John:#computation#=1, @alex:#USA#=1, @alex:#realtime#=1, @david:#China#=1, @alex:#Hometown#=1, @Simith:#realtime#=1, @John:#China#=1, @John:#realtime#=1, @david:#realtime#=1, @david:#BeiJing#=1, @Simith:#Hometown#=1, @Tom:#computation#=1, @vivian:#realtime#=1, @Tom:#BeiJing#=1, @david:#computation#=1, @alex:#BeiJing#=1, @John:#BeiJing#=1, @vivian:#BeiJing#=1, @Tom:#USA#=1, @Tom:#Storm#=1, @Simith:#BeiJing#=1, @vivian:#China#=1, @vivian:#USA#=1, @John:#Storm#=1, @Simith:#China#=1, @vivian:#computation#=1, @david:#Storm#=1, @John:#USA#=1, @Tom:#Hometown#=1} -------------Tweets.Redis.Users.Frequency------------- {@david=2, @John=2, @vivian=2, @Tom=1, @Simith=1, @alex=1} -------------Tweets.Redis.TopicTags.Frequency-------------注释main 方法, 打开main2 方法 模拟继续发送
/** * 模拟继续发送 * * @param args */ public static void main(String[] args) throws InterruptedException { TweetsPartitionRedis tweetsRedis = new TweetsPartitionRedis("10.10.103.188", 6379, "UATRedisAuth"); tweetsRedis.clear(); int maxCount = 10; int count = 0; int partition = 0; while (count < maxCount) { for (int i = 0; i < 100; i++) { long tx = System.currentTimeMillis(); tweetsRedis.addMessage(partition++, "@John @alex 3. Apache #Storm# is a free and open source distributed #realtime# #computation# system."); } count++; } for (String key : tweetsRedis.getJedis(0).keys("Tweets.Redis.*.Frequency")) { System.out.println("-------------" + key + "-------------"); System.out.println(tweetsRedis.getJedis(0).hgetAll(key)); } }
-------------printResult------------- -------------Tweets.Redis.UserTopicTags.Frequency------------- {@alex:#Storm#=289, @alex:#realtime#=289, @John:#realtime#=289, @alex:#computation#=289, @John:#computation#=289, @John:#Storm#=289} -------------Tweets.Redis.Users.Frequency------------- {@John=995, @alex=995} -------------Tweets.Redis.TopicTags.Frequency------------- {#computation#=995, #Storm#=995, #realtime#=995}