实现功能:
将前面整合Redis的一样,只不过是将结果写入到Mysql数据库中
运行环境跟前面的案例一样,只需要加上storm-jdbc的依赖包即可
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-jdbc</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.31</version>
实现代码:
package cn.ysjh.drpc;
import com.google.common.collect.Maps;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.sql.Types;
import java.util.*;
public class StormJDBC {
private static class DataSourceSpout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
public static final String[] words = new String[]{"apple", "ysjh", "shjkl", "ueyowir", "tiuyh"};
@Override
public void nextTuple() {
Random random = new Random();
String word = words[random.nextInt(words.length)];
this.spoutOutputCollector.emit(new Values(word));
System.out.println("数据" + word);
Utils.sleep(1500);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("lines"));
}
}
/*
词频分割Bolt
*/
private static class SplitBolt extends BaseRichBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
/*
对lines按照逗号进行切分
*/
@Override
public void execute(Tuple tuple) {
String lines = tuple.getStringByField("lines");
this.outputCollector.emit(new Values(lines));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("words"));
}
}
/*
词频统计Bolt
*/
private static class CountBolt extends BaseRichBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector=outputCollector;
}
Map<String, Integer> map = new HashMap<>();
@Override
public void execute(Tuple tuple) {
String words = tuple.getStringByField("words");
Integer count = map.get(words);
if (count == null) {
count = 0;
}
count++;
map.put(words, count);
//输出
this.outputCollector.emit(new Values(words,map.get(words)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","count"));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url", "jdbc:mysql://118.89.108.116:3306/storm");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","root");
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
String tableName = "wc";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName(tableName)
.withQueryTimeoutSecs(10);
builder.setBolt("JdbcInsertBolt",userPersistanceBolt).shuffleGrouping("CountBolt");
builder.setBolt("MysqlCountBolt",new MysqlCountBolt()).shuffleGrouping("CountBolt");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormJDBC", new Config(), builder.createTopology());
}
}
运行上边代码你会发现在Mysql数据库中一直有数据写入,但是不会累加,相同的数据都会重复出现在数据库中,这样肯定是不行的,需要对代码进行改进
能累加的代码:
package cn.ysjh.drpc;
import com.google.common.collect.Maps;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.sql.Types;
import java.util.*;
public class StormJDBC {
private static class DataSourceSpout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
public static final String[] words = new String[]{"apple", "ysjh", "shjkl", "ueyowir", "tiuyh"};
@Override
public void nextTuple() {
Random random = new Random();
String word = words[random.nextInt(words.length)];
this.spoutOutputCollector.emit(new Values(word));
System.out.println("数据" + word);
Utils.sleep(1500);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("lines"));
}
}
/*
词频分割Bolt
*/
private static class SplitBolt extends BaseRichBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
/*
对lines按照逗号进行切分
*/
@Override
public void execute(Tuple tuple) {
String lines = tuple.getStringByField("lines");
this.outputCollector.emit(new Values(lines));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("words"));
}
}
/*
词频统计Bolt
*/
private static class CountBolt extends BaseRichBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector=outputCollector;
}
Map<String, Integer> map = new HashMap<>();
@Override
public void execute(Tuple tuple) {
String words = tuple.getStringByField("words");
Integer count = map.get(words);
if (count == null) {
count = 0;
}
count++;
map.put(words, count);
//输出
this.outputCollector.emit(new Values(words,map.get(words)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","count"));
}
}
public static class MysqlCountBolt extends BaseRichBolt{
private OutputCollector collector;
private JdbcClient jdbcClient;
private ConnectionProvider connectionProvider;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/storm");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","root");
connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
//对数据库连接池进行初始化
connectionProvider.prepare();
jdbcClient = new JdbcClient(connectionProvider, 30);
}
Map<String,Integer> map = new HashMap<String,Integer>();
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = input.getIntegerByField("count");
List<Column> list = new ArrayList();
list.add(new Column("word", word, Types.VARCHAR));
List<List<Column>> select = jdbcClient.select("select word from wc where word = ?",list);
Long n = select.stream().count();
if(n>=1){
//update
jdbcClient.executeSql("update wc set count = "+count+" where word = '"+word+"'");
}else{
//insert
jdbcClient.executeSql("insert into wc values( '"+word+"',"+count+")");
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void cleanup() {
connectionProvider.cleanup();
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
builder.setBolt("JdbcInsertBolt",userPersistanceBolt).shuffleGrouping("CountBolt");
builder.setBolt("MysqlCountBolt",new MysqlCountBolt()).shuffleGrouping("CountBolt");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormJDBC", new Config(), builder.createTopology());
}
}
这里就可以实现累加了,跟前面的整合Redis数据库的运行结果一样
注意:
这里pom文件中的mysql-connector-java依赖必须是5.1.31,但是我的本地数据库是8.0版本的,使用这个驱动包会报错,所以我使用的是远程服务器上的数据库,如果你在运行中出现一样的问题,可以考虑将本地数据库版本降低