通过一个简单的业务拓扑,将storm与kafka,redis,oracle的集成。
Topology说明:
spout: 从kafka读取消息
bolt_1: 检验消息格式
bolt_2: 从redis读取类型 匹配,将匹配未通过的消息,写入redis
bolt_3: 读取oracle读取指标值, 当超过指标值时,将消息写入oralce , 以及写入kafka.
Topology示意图
代码结构
pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<storm.version>1.1.3</storm.version>
<kafka.version>1.1.1</kafka.version>
<redis.client.version></redis.client.version>
<logback.version>1.1.2</logback.version>
<ojdbc8.version>12.2.0.1</ojdbc8.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-jdbc</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc8</artifactId>
<version>${ojdbc8.version}</version>
</dependency>
</dependencies>
Kafka
kafkaSpout
kafkaSpout的作用是实时的从kafka拉取消息作为数据源。
private void initKafkaSpout(Config config) {
//2.1 kafka
// 通过zookeeper中的/brokers即可找到kafka的地址
BrokerHosts hosts = new ZkHosts("localhost:2181");
String topic = "test";
String zkRoot = "/" + topic;
String consumerId = "consumer-1";
/**
* BrokerHosts hosts kafka集群列表
* String topic 要消费的topic主题
* String zkRoot kafka在zk中的目录(会在该节点目录下记录读取kafka消息的偏移量)
* String id 当前操作的标识id
*/
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerId);
// 配置KafkaBolt中的kafka.broker.properties
Properties props = new Properties();
String brokers = "localhost:9092";
props.put("metadata.broker.list", brokers);
props.put("serializer.class", "kafka.serializer.StringEncoder");
config.put("kafka.broker.properties", props);
//将kafka二进制消息转换为String
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
spoutConfig.startOffsetTime = OffsetRequest.LatestTime(); // 设置之后,刚启动时就不会把之前的消费也进行读取,会从最新的偏移量开始读取
spout = new KafkaSpout(spoutConfig);
}
MessageScheme
其中使用了MessageScheme
,它的作用是将消息字节码转换为String:
public class MessageScheme implements Scheme {
@Override
public List<Object> deserialize(ByteBuffer ser) {
CharBuffer charBuffer = null;
try {
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
charBuffer = decoder.decode(ser);
ser.flip();
return new Values(charBuffer.toString());
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
@Override
public Fields getOutputFields() {
return new Fields("msg");
}
}
KafkaBolt
KafkaBolt的作用是,发送storm计算过程中产生的消息至kafka.
private void initKafkaProducer() {
String brokers ="localhost:9092";
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector("sendTopic"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("type", "alarm"));
}
通常使用一个KafkaProducerUtil,来发送topic消息,而不是使用storm提供的KafkaBolt.
Redis
JedisPoolConfig
org.apache.storm.redis.common.config.JedisPoolConfig
,注意包名。
private void initRedis() {
AppProperties appProperties = AppProperties.getInstance();
String host = "localhost";
Integer port = 6379;
Integer db = 3; //select db 3
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
.setHost(host).setPort(port).setDatabase(db).build();
//TypeLegalityRedisBoltAbst继承AbstractRedisBolt
redisBolt = new TypeLegalityRedisBolt(jedisPoolConfig);
}
AbstractRedisBolt
public class TypeLegalityRedisBolt extends AbstractRedisBolt {
public TypeLegalityRedisBolt(JedisPoolConfig config) {
super(config);
}
public TypeLegalityRedisBolt(JedisClusterConfig config) {
super(config);
}
@Override
protected void process(Tuple input) {
JedisCommands jedisCommands = getInstance();
//redis do sth
}
}
JDBC
ConnectionProvider
private void initJdbc() {
Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName", "xx");
hikariConfigMap.put("dataSource.url", "xx");
hikariConfigMap.put("dataSource.user", "xx");
hikariConfigMap.put("dataSource.password", "xx");
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
jdbcBolt = new AlarmJdbcBolt(connectionProvider);
}
AbstractJdbcBolt
public class AlarmJdbcBolt extends AbstractJdbcBolt {
public AlarmJdbcBolt(ConnectionProvider connectionProviderParam) {
super(connectionProviderParam);
}
@Override
protected void process(Tuple input) {
String sql = " selct username from t_user where name = ?";
String queryParam = "trump";
//每一条记录的列数:构造成一个List<Column>, 多条记录则构造出:List<List<Column>>
List<List<Column>> list = jdbcClient.select(qryThredsholdSql, Lists.newArrayList(new Column("name", queryParam, Types.VARCHAR)));
List<List<Column>> columnLists = new ArrayList<>();
columnLists.add(Lists.newArrayList(new Column("id", UUID.randomUUID(), Types.VARCHAR),
new Column("name", "trump", Types.VARCHAR),
new Column("age", 67, Types.INTEGER)
));
jdbcClient.insert("t_user", columnLists);
}
}