思想:flume发送,storm接受,调用webpy服务完成中文分词
storm代码:
stormKafka.java:
package stormHttp;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
public class stormKafka {
public static void main(String[] args) throws Exception {
String topic = "badou_storm_kafka_test";
String zkRoot = "/badou_storm_kafka_test";
String spoutId = "kafkaSpout";
BrokerHosts brokerHosts = new ZkHosts("master:2181");
SpoutConfig kafkaConf = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
kafkaConf.forceFromStart = true;
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout, 2);
builder.setBolt("feature_extract", new FeatureExtractBolt()).shuffleGrouping("spout");
Config config = new Config();
config.setDebug(false);
if (args != null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, builder.createTopology());
// Thread.sleep(10000);
// cluster.shutdown();
}
}
}
HttpclientDemo.java:
package stormHttp;
import java.io.IOException;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
public class HttpclientDemo {
public static void main(String[] args) throws Exception {
CloseableHttpClient httpClient = HttpClients.createDefault();
String content = "中秋节快乐";
System.out.println(content);
HttpGet httpGet = new HttpGet("http://192.168.87.10:8808/?content=" + content);
try {
CloseableHttpResponse response = httpClient.execute(httpGet);
HttpEntity entity = response.getEntity();
System.out.println(response.getStatusLine());
if (entity != null) {
System.out.println("Response content length: " + entity.getContentLength());
String responseString = new String(EntityUtils.toString(entity));
responseString = new String(responseString.getBytes("ISO-8859-1"), "utf-8");
System.out.println(responseString);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
FeatureExtractBolt.java:
package stormHttp;
import java.io.IOException;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class FeatureExtractBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple);
String content = tuple.getString(0);
CloseableHttpClient httpClient = HttpClients.createDefault();
System.out.println(content);
HttpGet httpGet = new HttpGet("http://192.168.87.10:8808/?content=" + content);
try {
CloseableHttpResponse response = httpClient.execute(httpGet);
HttpEntity entity = response.getEntity();
System.out.println(response.getStatusLine());
if (entity != null) {
System.out.println("Response content length: " + entity.getContentLength());
String responseString = new String(EntityUtils.toString(entity));
responseString = new String(responseString.getBytes("ISO-8859-1"), "utf-8");
System.out.println(responseString);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
启动storm命令:
启动kafka
./bin/kafka-server-start.sh config/server.properties
启动flume
启动webpy:
测试及结果: