版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Simon_09010817/article/details/83619423
编写主函数启动类的Topo
package com.simon.storm.kafka;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.MultiScheme;
import org.apache.storm.spout.RawMultiScheme;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
/**
* Created by Simon on 2018/11/1.
*/
public class OldKafkaSpout {
public static String topic = "test";
public static String zkHosts="192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181";
public static String zkRoot="/simon/offsets";
public static String zkAddress="192.168.1.101,192.168.1.102,192.168.1.103";
public static int zkPort= 2181;
public static int fetchSizeBytes=1048576;
public static int socketTimeoutMs=30000;
public static int fetchMaxWait=30000;
public static int bufferSizeBytes=1048576;
public static boolean useStartOffsetTimeIfOffsetOutOfRange=true;
public static int metricsTimeBucketSizeInSecs=60;
public static void main(String[] args) {
//创建TopologyBuilder
TopologyBuilder builder = new TopologyBuilder();
/**
* 创建BrokerHosts对象
*/
BrokerHosts brokerHosts = new ZkHosts(zkHosts);
/**
* 创建SpoutConfig
*/
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, UUID.randomUUID().toString());
//配置spoutConfig
/**
* zkServer
*/
spoutConfig.zkServers = Arrays.asList(zkAddress.split(","));
/**
* zkPort
*/
spoutConfig.zkPort = zkPort;
/**
* 发给Kafka的每个FetchRequest中,
* 用此指定想要的response中总的消息的大小
*/
spoutConfig.fetchSizeBytes = fetchSizeBytes;
/**
* 与Kafka broker的连接的socket超时时间
*/
spoutConfig.socketTimeoutMs = socketTimeoutMs;
/**
* 当服务器没有新消息时,消费者会等待这些时间
*/
spoutConfig.fetchMaxWait = fetchMaxWait;
/**
* SimpleConsumer所使用的SocketChannel的读缓冲区大小
*/
spoutConfig.bufferSizeBytes = bufferSizeBytes;
/**
* 从Kafka中取出的byte[],该如何反序列化
*/
//MultiScheme scheme = new RawMultiScheme();
/**
* 如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime
*/
spoutConfig.useStartOffsetTimeIfOffsetOutOfRange =useStartOffsetTimeIfOffsetOutOfRange;
/**
* 多长时间统计一次metrics
*/
spoutConfig.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs;
/**
* 拉取策略
*/
//强制从最先开始
//spoutConfig.ignoreZkOffsets = true;
//spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
//从最新开始
spoutConfig.ignoreZkOffsets = false;
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
/**
* KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息
*/
//spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
/**
* 设置失败重试拉取
*/
//spoutConfig.retryLimit = 0;
/**
* 接收消息转码
*/
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
/** setSpot */
builder.setSpout("kafkaSpoutLCCont", new KafkaSpout(spoutConfig));
/** setBolt */
builder.setBolt("KafkaSpoutBolt", new KafkaSpoutBolt()).localOrShuffleGrouping("kafkaSpout");
Config config = new Config();
/**
* 设置supervisor和worker之间的通信超时时间.
* 超过这个时间supervisor会重启worker (秒)
*/
config.put("supervisor.worker.timeout.secs",600000);
/**
* 设置storm和zookeeper之间的超时时间.
*/
config.put("storm.zookeeper.session.timeout",1200000000);
/**
* 设置debug模式 日志输出更全
* 只能在本地LocalCluster模式下启用
*/
config.setDebug(true);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("OldKafkaSpout", config, builder.createTopology());
Utils.sleep(Long.MAX_VALUE);
localCluster.shutdown();
}
}
自定义消息转码类MessageSchema
package com.simon.storm.schema;
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* Created by Administrator on 2018/1/17.
*/
public class MessageScheme implements Scheme {
private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
public static final String STRING_SCHEME_KEY = "str";
@Override
public List<Object> deserialize(ByteBuffer byteBuffer) {
return new Values(deserializeString(byteBuffer));
}
public static String deserializeString(ByteBuffer bytes) {
if (bytes.hasArray()) {
int base = bytes.arrayOffset();
return new String(bytes.array(), base + bytes.position(), bytes.remaining());
} else {
return new String(Utils.toByteArray(bytes), UTF8_CHARSET);
}
}
@Override
public Fields getOutputFields() {
return new Fields(STRING_SCHEME_KEY);
}
}
编写逻辑处理类Bolt
package com.simon.storm.kafka;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
/**
* Created by Simon on 2018/10/23.
*/
public class KafkaSpoutBolt extends BaseBasicBolt {
@Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//只做一个输出
String string = tuple.getString(0);
System.out.println(string);
}
@Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}