版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/cn_yaojin/article/details/87777175
一、maven依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<storm.version>1.2.2</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!--本地运行时不需要,打包上传到集群时运行需要,因为在集群运行时已经有storm的依赖-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>storm_demo</finalName>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<archive>
<manifest>
程序入口
<!--打包命令: mvn package assembly:single 或者 mvn assembly:assembly -->
<mainClass>com.cn.storm.kafka.KafkaStormTopology</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
二、接受kafka消息的 blot
package com.cn.storm.bolt;
import com.cn.storm.util.StaticParamters;
import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.io.File;
import java.io.FileOutputStream;
public class Bolt1 extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger("adminUserLog");
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
byte[] binary = input.getBinary(0);
String line = new String(binary);
System.out.println(line);
logger.error("bolt 1 --->"+line);
try {
//保存到本地文件(方便测试,上传到storm集群后,测试方便一些)
File file = new File(StaticParamters.FILE_DIR+"demo1.log");
FileOutputStream fos = new FileOutputStream(file);
fos.write(line.getBytes("utf-8"));
fos.close();
}catch (Exception e){
}
// 发送数据到下一个bolt
collector.emit(new Values(line, 29));
}
/**
* 定义了发送到下一个bolt的数据包含两个属性
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("name", "age"));
}
}
三、另一个blot
package com.cn.storm.bolt;
import com.cn.storm.util.StaticParamters;
import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
public class Bolt2 extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger("adminUserLog");
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String name = input.getStringByField("name");
int age = input.getIntegerByField("age");
System.out.println(name);
System.out.println(age);
try {
File file = new File(StaticParamters.FILE_DIR+"demo2.log");
FileOutputStream fos = new FileOutputStream(file);
fos.write(name.getBytes("utf-8"));
fos.close();
}catch (Exception e){
}
logger.error("blot 2 接收到--->"+name);
logger.error("blot 2 时候到--->"+age);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
四、创建Topology
package com.cn.storm.kafka;
import com.cn.storm.bolt.Bolt1;
import com.cn.storm.bolt.Bolt2;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.TopologyBuilder;
public class KafkaStormTopology {
private final static Logger logger = Logger.getLogger("adminUserLog");
public static void main(String[] args) {
try {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpout kafkaSpout = createKafkaSpout();
builder.setSpout("id_kafka_spout", kafkaSpout);
builder.setBolt("bolt1", new Bolt1()).shuffleGrouping("id_kafka_spout"); // 指定数据来源(这里来自kafka)
builder.setBolt("bolt2", new Bolt2()).shuffleGrouping("bolt1");//指定数据来源(这里来自blot1)
// 使用builder构建topology
StormTopology topology = builder.createTopology();
String topologyName = KafkaStormTopology.class.getSimpleName(); // 拓扑的名称
Config config = new Config(); // Config()对象继承自HashMap,但本身封装了一些基本的配置
// 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
if (args == null || args.length < 1) { // 没有参数时使用本地模式,有参数时使用集群模式
LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster
localCluster.submitTopology(topologyName, config, topology);
} else {
// Map<String, String> map = new HashMap<>();
// map.put("auto.offset.reset", "latest");
// // 配置Kafka broker地址
// map.put("metadata.broker.list", "127.0.0.1:9091");
// // serializer.class为消息的序列化类
// map.put("serializer.class", "kafka.serializer.StringEncoder");
// config.put("kafka.broker.properties", map);
config.setDebug(false);
StormSubmitter.submitTopology(topologyName, config, topology);
}
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}
private static KafkaSpout createKafkaSpout() {
// String brokerZkStr = "127.0.0.1:2185";
String brokerZkStr = "192.168.0.109:2181";
BrokerHosts hosts = new ZkHosts(brokerZkStr);// 通过zookeeper中的/brokers即可找到kafka的地址
String topic = "storm_demo";
String zkRoot = "/" + topic;
String id = "storm_demo_log_1";
//zkRoot , id 需要在zookeeper中手动创建
SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置之后,刚启动时就不会把之前的消费也进行读取,会从最新的偏移量开始读取
return new KafkaSpout((spoutConf));
}
}