Storm集成Redis
一、 新建项目
二、 导入Pom依赖
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.0</version> <scope>provided</scope> <!--<scope>compile</scope>--> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.simon.storm.URLCountTopology</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
三、 定义Spout
继承BaseRishSpout,重写方法。
public static class URLCountSpout extends BaseRichSpout { //模拟数据 String[] phones = { "xiaomi", "apple", "huawei", "moto", "nokia", "sansung", "LG" ,"htc","Iphone", "dell", "huasuo", "hongqi", "lianxiang", "lenovo","shunfeng","meituan", "alibaba", "wangyi", "qq", "luoji", "leise","nongfu", "yibao","rongyao", "oppo", "vivo"}; private SpoutOutputCollector collector; // 从外部数据源(比如kafka消息缓存中)获取url数据,源源不断获取 @Override public void nextTuple() { //休眠一下 Utils.sleep(500); //模拟产生数据 int index = new Random().nextInt(20); String phone = phones[index]; // 将拿到的数据封装为tuple发送出去 //collector在open方法中,需要设置为全局 // 一个tuple中可以封装多个数据(类似list的元素) // collector.emit(new Values("value1","value2","value3")); String url = phone + UUID.randomUUID(); collector.emit(new Values(url)); } @Override public void open(Map config, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 为发出去的tuple中的每一个字段定义一个名字 // declarer.declare(new Fields("field1","field2","field3")); declarer.declare(new Fields("url")); } }
四、定义Bolt
public static class URLCountRedisBolt extends BaseRichBolt{ private Jedis jedis = null; @Override public void prepare(Map conifg, TopologyContext context, OutputCollector collector) { jedis = new Jedis("10.10.56.138",6379); } @Override public void execute(Tuple tuple) { String word = tuple.getString(0); //截取url中的网站名字 if(word != null){ //将url写入到redis的一个zset集合中 // 参数1: zset的主键 参数2:要增加的分数 参数3:url中的网站部分——site jedis.zincrby("kafkawords", 1, word); } } /** * 没有后续组件了,所以不需要定义输出tuple的schema */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
五、定义Topology主函数
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { TopologyBuilder builder = new TopologyBuilder(); //2、添加spout和bolt builder.setSpout("urlspout", new URLCountSpout()); builder.setBolt("rediscountbolt", new URLCountRedisBolt()).shuffleGrouping("urlspout"); StormTopology topology = builder.createTopology(); Config config = new Config(); config.setNumWorkers(8); //集群模式 StormSubmitter.submitTopology("topwords",config,topology); }
六、打包
七、上传
八、运行jar
./storm jar /usr/apps/testsource/storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.simon.storm.URLCountTopology myTest
九、运行成功
十、查看Redis