39.大数据之旅——网站流量统计项目之数据存储(HBase)

版权声明:版权归零零天所有 https://blog.csdn.net/qq_39188039/article/details/86589062

数据存入HBase


FluxInfo代码:
这是一个javabean,用来封装tuple中各字段信息,然后存入hbase中。一条访问记录就是一个实例对象。
有一处需要注意:在插入hbase表时,需要指定行键。我们规定的行键规则为: sstime_uvid_ssid_随机数;

package cn.tarena.domain;
 
public class FluxInfo {
private String url;
private String urlname;
private String uvid;
private String ssid;
private String sscount;
private String sstime;
private String cip;
 
public String getRK(){
return sstime+"_"+uvid+"_"+ssid+"_"+(int)(Math.random()*100);
}
 
 
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUrlname() {
return urlname;
}
public void setUrlname(String urlname) {
this.urlname = urlname;
}
public String getUvid() {
return uvid;
}
public void setUvid(String uvid) {
this.uvid = uvid;
}
public String getSsid() {
return ssid;
}
public void setSsid(String ssid) {
this.ssid = ssid;
}
public String getSscount() {
return sscount;
}
public void setSscount(String sscount) {
this.sscount = sscount;
}
public String getSstime() {
return sstime;
}
public void setSstime(String sstime) {
this.sstime = sstime;
}
public String getCip() {
return cip;
}
public void setCip(String cip) {
this.cip = cip;
}
 
}

HBaseBolt代码示例:
此类是一个Bolt组件,是最下游的Tuple。用于将数据存入Hbase中。

package cn.tarena.weblog;
 
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import cn.tarena.dao.HBaseDao;
import cn.tarena.domain.FluxInfo;
 
/*
 * 
 */
public class HBaseBolt extends BaseRichBolt{
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
 
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
 
FluxInfo fi = new FluxInfo();
fi.setUrl(input.getStringByField("url"));
fi.setUrlname(input.getStringByField("urlname"));
fi.setUvid(input.getStringByField("uvid"));
fi.setSsid(input.getStringByField("ssid"));
fi.setSstime(input.getStringByField("sstime"));
fi.setSscount(input.getStringByField("sscount"));
fi.setCip(input.getStringByField("cip"));
 
//自定义的dao层工具类,用于存储数据以及其他的dao层操作
HBaseDao.getHbaseDao().saveToHbase(fi);
collector.ack(input);
} catch (Exception e) {
e.printStackTrace();
collector.fail(input);
}
 
 
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
 
 
}
 
}

HBaseDao 代码示例:

package cn.tarena.dao;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
 
import cn.tarena.domain.FluxInfo;
 
public class HBaseDao {
 
private static HBaseDao hbaseDao = new HBaseDao();
private HBaseDao() {
}
 
public static HBaseDao getHbaseDao(){
return hbaseDao;
}
 
/**
 * 将信息写入hbase
 * @param fi 封装了日志信息的bean
 */
public void saveToHbase(FluxInfo fi){
HTable tab = null;
try {
//1.创建hbase配置对象
Configuration conf = new Configuration();
//--其他用默认配置 至少要配置zookeeper的地址 客户端通过链接zookeeper获取元数据信息
conf.set("hbase.zookeeper.quorum",
"192.168.150.137:2181,192.168.150.138:2181,192.168.150.139:2181");
 
//2.创建HTable对象
tab = new HTable(conf,"flux".getBytes());
 
//3.向表中存入数据
Put put = new Put(fi.getRK().getBytes());
put.add("cf1".getBytes(), "url".getBytes(), fi.getUrl().getBytes());
put.add("cf1".getBytes(), "urlname".getBytes(), fi.getUrlname().getBytes());
put.add("cf1".getBytes(), "uvid".getBytes(), fi.getUvid().getBytes());
put.add("cf1".getBytes(), "ssid".getBytes(), fi.getSsid().getBytes());
put.add("cf1".getBytes(), "sscount".getBytes(), fi.getSscount().getBytes());
put.add("cf1".getBytes(), "sstime".getBytes(), fi.getSstime().getBytes());
put.add("cf1".getBytes(), "cip".getBytes(), fi.getCip().getBytes());
tab.put(put);
 
} catch (IOException e) {
e.printStackTrace();
 
} finally {
//4.关闭连接
try {
tab.close();
} catch (IOException e) {
 
e.printStackTrace();
}
}
}
}
 

注:清空habase表指令: truncate ‘表名’

PvBolt+UvBolt+VvBolt的改造


业务说明
对于pv、uv和vv的统计,我们要实现的效果是:
1)每当一个用户访问一次,storm就接收一条记录
2)pv的处理:让每条记录的pv=1,即并不是马上做叠加处理
3)uv的处理:根据每条记录的uvid,去hbase中去查询,如果没有查到记录,让每条记录的uv=1。如果查到,则让每条记录的uv=0
4)vv的处理:根据每条记录的sscount,如果ssount=0,让每条记录的vv=1,反之vv=0
5)前面4步并没有做指标的叠加统计,所以我们最后会将所有记录都插入到数据库中,如下:
在这里插入图片描述
这么做的目的在于我们可以从数据库中,查询一段时间内的总的pv、uv等指标数据。

PvBolt代码示例:

import java.util.List;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
 
public class PvBolt extends BaseRichBolt{
 
private OutputCollector collector;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
 
}
 
@Override
public void execute(Tuple input) {
try {
List<Object> values=input.getValues();
 
//--令每条记录的pv=1
values.add(1);
//--锚定
collector.emit(input,values);
collector.ack(input);
 
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
 
}
 
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url","urlname","uvid","ssid","sscount","sstime","cip","pv"));
 
}
 
}

UvBolt代码示例:

package cn.tarena.store.weblog;
 
import java.util.Calendar;
import java.util.List;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import cn.tarena.dao.HBaseDao;
import cn.tarena.domain.FluxInfo;
 
public class UvBolt extends BaseRichBolt{
private OutputCollector collector;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
 
}
 
@Override
public void execute(Tuple input) {
try {
//--获取uvid
String uvid = input.getStringByField("uvid");
//--获取sstime
long endTime = Long.parseLong(input.getStringByField("sstime"));
//--基于当前时间向前寻找今天0点的时间值
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(endTime);
calendar.set(Calendar.HOUR, 0);
calendar.set(Calendar.MINUTE,0);
calendar.set(Calendar.SECOND,0);
calendar.set(Calendar.MILLISECOND,0);
long beginTime = calendar.getTimeInMillis();
 
//--从hbase中查询 今天0点到当前日志时间的数据 中uvid和当前uvid相同的数据
String regex="^\\d{13}_"+uvid+"_\\d{10}_\\d{2}$";
List<FluxInfo> list = HBaseDao.getHbaseDao().queryByRange((beginTime+"").getBytes(), (endTime+"").getBytes(), regex);
 
//--如果找不到 则说明uvid今天第一次出现 uv为1 否则不是第一次出现 则uv为0
int uv = list.size() == 0 ? 1 : 0;
 
//--输出结果
List<Object> values = input.getValues();
values.add(uv);
collector.emit(input,values);
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
 
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url","urlname","uvid","ssid","sscount","sstime","cip","pv","uv"));
 
}
 
}

VvBolt代码示例:

public class VvBolt extends BaseRichBolt {
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
String sscount = input.getStringByField("sscount");
int vv = "0".equals(sscount) ? 1 : 0;
List<Object> values = input.getValues();
values.add(vv);
collector.emit(input,values);
collector.ack(input);
} catch (Exception e) {
e.printStackTrace();
collector.fail(input);
}
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url","urlname","uvid","ssid","sscount","sstime","cip","pv","uv","vv"));
}
 
}

HBaseDao代码示例:

package cn.tarena.dao;
 
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
 
import cn.tarena.domain.FluxInfo;
 
public class HBaseDao {
 
private static HBaseDao hbaseDao = new HBaseDao();
private HBaseDao() {
}
 
public static HBaseDao getHbaseDao(){
return hbaseDao;
}
 
public void saveToHbase(FluxInfo fi){
//代码省略,见上一例代码
}
 
/**
 * 从hbase中查询数据
 * @param start 开始行键
 * @param stop 结束的行键
 * @param rk_regex 要查询的行键的正则规则
 * @return
 */
public List<FluxInfo> queryByRange(byte [] start,byte [] stop,String rk_regex){
HTable tab = null;
try {
//1.获取配置对象
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", 
"192.168.150.137:2181,192.168.150.138:2181,192.168.150.139:2181");
 
//2.获取表对象
tab = new HTable(conf, "flux".getBytes());
 
//3.扫描数据
Scan scan = new Scan();
scan.setStartRow(start);
scan.setStopRow(stop);
Filter filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(rk_regex));
 
scan.setFilter(filter);
 
//4.遍历结果
List<FluxInfo> list = new ArrayList<>();
ResultScanner rs = tab.getScanner(scan);
Iterator<Result> it = rs.iterator();
 
while(it.hasNext()){
Result r = it.next();
FluxInfo fi = new FluxInfo();
fi.setUrl(new String(r.getValue("cf1".getBytes(), "url".getBytes())));
fi.setUrlname(new String(r.getValue("cf1".getBytes(), "urlname".getBytes())));
fi.setUvid(new String(r.getValue("cf1".getBytes(), "uvid".getBytes())));
fi.setSsid(new String(r.getValue("cf1".getBytes(), "ssid".getBytes())));
fi.setSscount(new String(r.getValue("cf1".getBytes(), "sscount".getBytes())));
fi.setSstime(new String(r.getValue("cf1".getBytes(), "sstime".getBytes())));
fi.setCip(new String(r.getValue("cf1".getBytes(), "cip".getBytes())));
list.add(fi);
}
return list;
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
 
try {
tab.close();
} catch (IOException e) {
e.printStackTrace();
 
}
 
}
}
}
 

WeblogTopology示例代码:

public class WeblogTopology {
 
public static void main(String[] args) throws Exception {
Config conf=new Config();
//配置zk的集群列表
BrokerHosts hosts=new ZkHosts("192.168.150.137:2181,"
+ "192.168.150.138:2181,"
+ "192.168.150.139:2181");
 
SpoutConfig sconf=new SpoutConfig(hosts,"weblog","/weblog","info");
 
sconf.scheme=new SchemeAsMultiScheme(new StringScheme());
 
 
//--配置Kafka的消息源,即Storm可以通过这个Spout从Kafka消费数据了
KafkaSpout spout=new KafkaSpout(sconf);
 
 
TopologyBuilder builder=new TopologyBuilder();
PrintBolt printBolt=new PrintBolt();
ClearBolt clearBolt=new ClearBolt();
PvBolt pvBolt=new PvBolt();
UvBolt uvBolt=new UvBolt();
VvBolt vvBolt=new VvBolt();
 
HBaseBolt hbaseBolt=new HBaseBolt();
 
 
//--绑定数据源
builder.setSpout("Kafka_Spout", spout);
builder.setBolt("Clear_Bolt", clearBolt).shuffleGrouping("Kafka_Spout");
builder.setBolt("Pv_Bolt", pvBolt).globalGrouping("Clear_Bolt");
builder.setBolt("Uv_Bolt", uvBolt).globalGrouping("Pv_Bolt");
builder.setBolt("Vv_Bolt", vvBolt).globalGrouping("Uv_Bolt");
builder.setBolt("HBase_Bolt", hbaseBolt).globalGrouping("Vv_Bolt");
builder.setBolt("Print_Bolt", printBolt).globalGrouping("Vv_Bolt");
 
 
StormTopology toplogy=builder.createTopology();
LocalCluster cluster=new LocalCluster();
 
 
cluster.submitTopology("Weblog_Topology",conf,toplogy);
 
 
 
 
}
 
}

NewIpBolt+NewCustBolt


NewIpBolt代码示例:

扫描二维码关注公众号,回复: 5017595 查看本文章
public class NewIpBolt extends BaseRichBolt {
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
//1.获取当前日志的cip
String cip = input.getStringByField("cip");
//2.基于cip查询hbase
List<FluxInfo> list = HBaseDao.getHbaseDao().queryByField("cip", cip);
//3.如果找得到 newip为0 否则为1
int newip = list.size() == 0 ? 1 : 0;
//4.发送数据
List<Object> values = input.getValues();
values.add(newip);
collector.emit(values);
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url","urlname","uvid","ssid","sscount","sstime","cip","pv","uv","vv","newip"));
}
 
}
 

NewCustBolt代码示例:

public class NewCustBolt extends BaseRichBolt {
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
//1.获取当前日志的uvid
String uvid = input.getStringByField("uvid");
//2.基于uvid查询hbase
List<FluxInfo> list = HBaseDao.getHbaseDao().queryByField("uvid", uvid);
//3.如果找得到 newcust为0 否则为1
int newcust = list.size() == 0 ? 1 : 0;
//4.发送数据
List<Object> values = input.getValues();
values.add(newcust);
collector.emit(values);
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url","urlname","uvid","ssid","sscount","sstime","cip","pv","uv","vv","newip","newcust"));
}
 
}

HbaseDao代码示例:

public class HBaseDao {
 
private static HBaseDao hbaseDao = new HBaseDao();
private HBaseDao() {
}
 
public static HBaseDao getHbaseDao(){
return hbaseDao;
}
 
public void saveToHbase(FluxInfo fi){
//代码略
}
 
public List<FluxInfo> queryByRange(byte [] start,byte [] stop,String rk_regex){
//代码略
}
 
public List<FluxInfo> queryByField(String field,String value){
HTable tab = null;
try {
//1.获取配置对象
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
 
//2.获取表对象
tab = new HTable(conf, "flux".getBytes());
 
//3.扫描数据
Scan scan = new Scan();
Filter filter = new SingleColumnValueFilter("cf1".getBytes(), field.getBytes(), CompareOp.EQUAL, value.getBytes());
scan.setFilter(filter);
 
//4.遍历结果
List<FluxInfo> list = new ArrayList<>();
ResultScanner rs = tab.getScanner(scan);
Iterator<Result> it = rs.iterator();
while(it.hasNext()){
Result r = it.next();
FluxInfo fi = new FluxInfo();
fi.setUrl(new String(r.getValue("cf1".getBytes(), "url".getBytes())));
fi.setUrlname(new String(r.getValue("cf1".getBytes(), "urlname".getBytes())));
fi.setUvid(new String(r.getValue("cf1".getBytes(), "uvid".getBytes())));
fi.setSsid(new String(r.getValue("cf1".getBytes(), "ssid".getBytes())));
fi.setSscount(new String(r.getValue("cf1".getBytes(), "sscount".getBytes())));
fi.setSstime(new String(r.getValue("cf1".getBytes(), "sstime".getBytes())));
fi.setCip(new String(r.getValue("cf1".getBytes(), "cip".getBytes())));
list.add(fi);
}
return list;
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
if(tab != null){
try {
tab.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

WeblogTopology代码示例:

public class WeblogTopology {
 
public static void main(String[] args) throws Exception {
Config conf=new Config();
 
 
BrokerHosts hosts=new ZkHosts("192.168.150.137:2181,"
+ "192.168.150.138:2181,"
+ "192.168.150.139:2181");
 
SpoutConfig sconf=new SpoutConfig(hosts,"weblog","/weblog","info");
 
sconf.scheme=new SchemeAsMultiScheme(new StringScheme());
 
 
 
KafkaSpout spout=new KafkaSpout(sconf);
 
 
TopologyBuilder builder=new TopologyBuilder();
PrintBolt printBolt=new PrintBolt();
ClearBolt clearBolt=new ClearBolt();
PvBolt pvBolt=new PvBolt();
UvBolt uvBolt=new UvBolt();
VvBolt vvBolt=new VvBolt();
NewIpBolt newipBolt=new NewIpBolt();
NewCustBolt newcustBolt=new NewCustBolt();
 
HBaseBolt hbaseBolt=new HBaseBolt();
 
 
builder.setSpout("Kafka_Spout", spout);
builder.setBolt("Clear_Bolt", clearBolt).shuffleGrouping("Kafka_Spout");
builder.setBolt("Pv_Bolt", pvBolt).globalGrouping("Clear_Bolt");
builder.setBolt("Uv_Bolt", uvBolt).globalGrouping("Pv_Bolt");
builder.setBolt("Vv_Bolt", vvBolt).globalGrouping("Uv_Bolt");
builder.setBolt("NewIp_Bolt", newipBolt).globalGrouping("Vv_Bolt");
builder.setBolt("NewCust_Bolt", newcustBolt).globalGrouping("NewIp_Bolt");
builder.setBolt("HBase_Bolt", hbaseBolt).globalGrouping("NewCust_Bolt");
builder.setBolt("Print_Bolt", printBolt).globalGrouping("NewCust_Bolt");
 
 
StormTopology toplogy=builder.createTopology();
LocalCluster cluster=new LocalCluster();
 
 
cluster.submitTopology("Weblog_Topology",conf,toplogy);
 
 
}
 
}

数据存入mysql


数据库建表语句:
执行:mysql -uroot -proot

1)create database weblog;
2)use weblog;
3)create table tongji(date timestamp,pv int,uv int,vv int,newip int,newcust int);

注:别忘了在写代码时,在工程里加入mysql驱动jar包

TongjiInfo代码示例:
这个类用于封装记录的基本信息+各指标(pv,uv等)的值(比如uv=1,vv=0)

import java.sql.Date;
import java.sql.Timestamp;
 
public class TongjiInfo {
private Timestamp sstime; 
private int pv;
private int uv;
private int vv;
private int newip;
private int newcust;
 
public Timestamp getSstime() {
return sstime;
}
public void setSstime(Timestamp sstime) {
this.sstime = sstime;
}
public int getPv() {
return pv;
}
public void setPv(int pv) {
this.pv = pv;
}
public int getUv() {
return uv;
}
public void setUv(int uv) {
this.uv = uv;
}
public int getVv() {
return vv;
}
public void setVv(int vv) {
this.vv = vv;
}
public int getNewip() {
return newip;
}
public void setNewip(int newip) {
this.newip = newip;
}
public int getNewcust() {
return newcust;
}
public void setNewcust(int newcust) {
this.newcust = newcust;
}
}
 

MysqlBolt代码示例:

import java.sql.Timestamp;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import cn.tarena.dao.MySqlDao;
import cn.tarena.domain.TongjiInfo;
 
public class MysqlBolt extends BaseRichBolt{
 
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
try {
TongjiInfo info = new TongjiInfo();
Timestamp stamp = new Timestamp(Long.parseLong(input.getStringByField("sstime")));
info.setSstime(stamp);
info.setPv(input.getIntegerByField("pv"));
info.setUv(input.getIntegerByField("uv"));
info.setVv(input.getIntegerByField("vv"));
info.setNewip(input.getIntegerByField("newip"));
info.setNewcust(input.getIntegerByField("newcust"));
MySqlDao.getMySqlDao().flushData(info);
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
 
}
 
}
 

MySqlDao代码示例:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
 
import cn.tarena.domain.TongjiInfo;
 
 
public class MySqlDao {
private static MySqlDao mySqlDao = new MySqlDao();
private MySqlDao(){}
 
public static MySqlDao getMySqlDao(){
return mySqlDao;
}
 
public void flushData(TongjiInfo info){
Connection conn = null;
PreparedStatement ps = null;
try {
//1.注册数据库驱动
Class.forName("com.mysql.jdbc.Driver");
//2.获取数据库连接
conn = DriverManager.getConnection("jdbc:mysql://192.168.150.137:3306/weblog","root","root");
//3.获取传输器对象
ps = conn.prepareStatement("insert into tongji2 values (?,?,?,?,?,?)");
ps.setTimestamp(1, info.getSstime());
ps.setInt(2, info.getPv());
ps.setInt(3, info.getUv());
ps.setInt(4, info.getVv());
ps.setInt(5, info.getNewip());
ps.setInt(6, info.getNewcust());
 
//4.执行
ps.executeUpdate();
 
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
try {
//--资源关闭
ps.close();
conn=null;
} catch (SQLException e) {
 
e.printStackTrace();
}
 
}
}
}
 

WeblogTopology代码示例:

public static void main(String[] args) throws Exception {
Config conf=new Config();
 
BrokerHosts hosts=new ZkHosts("192.168.150.137:2181,"
+ "192.168.150.138:2181,"
+ "192.168.150.139:2181");
 
SpoutConfig sconf=new SpoutConfig(hosts,"weblog","/weblog","info");
 
sconf.scheme=new SchemeAsMultiScheme(new StringScheme());
 
KafkaSpout spout=new KafkaSpout(sconf);
 
TopologyBuilder builder=new TopologyBuilder();
PrintBolt printBolt=new PrintBolt();
ClearBolt clearBolt=new ClearBolt();
PvBolt pvBolt=new PvBolt();
UvBolt uvBolt=new UvBolt();
VvBolt vvBolt=new VvBolt();
NewIpBolt newipBolt=new NewIpBolt();
NewCustBolt newcustBolt=new NewCustBolt();
 
HBaseBolt hbaseBolt=new HBaseBolt();
MysqlBolt mysqlBolt=new MysqlBolt();
 
builder.setSpout("Kafka_Spout", spout);
builder.setBolt("Clear_Bolt", clearBolt).shuffleGrouping("Kafka_Spout");
builder.setBolt("Pv_Bolt", pvBolt).globalGrouping("Clear_Bolt");
builder.setBolt("Uv_Bolt", uvBolt).globalGrouping("Pv_Bolt");
builder.setBolt("Vv_Bolt", vvBolt).globalGrouping("Uv_Bolt");
builder.setBolt("NewIp_Bolt", newipBolt).globalGrouping("Vv_Bolt");
builder.setBolt("NewCust_Bolt", newcustBolt).globalGrouping("NewIp_Bolt");
builder.setBolt("HBase_Bolt", hbaseBolt).globalGrouping("NewCust_Bolt");
builder.setBolt("Print_Bolt", printBolt).globalGrouping("NewCust_Bolt");
builder.setBolt("Mysql_Bolt", mysqlBolt).globalGrouping("NewCust_Bolt");
 
StormTopology toplogy=builder.createTopology();
LocalCluster cluster=new LocalCluster();
 
cluster.submitTopology("Weblog_Topology",conf,toplogy);
 
}
 
 

访问页面,执行测试:
在这里插入图片描述

Storm伪实时处理—tick机制


TickBolt_1代码:

public class TickBolt_1 extends BaseRichBolt{
 
@Override
public Map<String, Object> getComponentConfiguration() {
Config config=new Config();
config.put(config.TOPOLOGY_TIC1K_TUPLE_FREQ_SECS,5);
return config;
}
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
 
 
}
 
@Override
public void execute(Tuple input) {
System.out.println("tick1");
 
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
 
}
 
 
 
}

TickBolt_2代码:

public class TickBolt_2 extends BaseRichBolt{
 
@Override
public Map<String, Object> getComponentConfiguration() {
Config config=new Config();
config.put(config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,7);
return config;
}
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
 
 
}
 
@Override
public void execute(Tuple input) {
System.out.println("tick2");
 
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
 
}
 
 
 
}

TickTopology代码:

public class TickTopology {
 
 
public static void main(String[] args) {
Config config=new Config();
//config.put(config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,1);
TickBolt_1 tickBolt1=new TickBolt_1();
TickBolt_2 tickBolt2=new TickBolt_2();
 
TopologyBuilder builder=new TopologyBuilder();
 
builder.setBolt("tick1", tickBolt1);
builder.setBolt("tick2", tickBolt2);
 
StormTopology topology=builder.createTopology();
 
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("Tick_Topology",config,topology);
}
}

伪实时业务查询


TimeBolt代码示例:

public class TimeBolt extends BaseRichBolt {
 
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
long time = System.currentTimeMillis();
collector.emit(input,new Values(time));
collector.ack(input);
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("time"));
}
 
}

BrBolt代码示例:

public class BrBolt extends BaseRichBolt{
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
//1.获取当前触发时间 作为 统计结束的位置
long stop = input.getLongByField("time");
//2.根据触发时间 向前推算15分钟 作为 统计开始的位置
long start = stop - 1000 * 60 *15;
//3.到hbase中查询这个时间段内的所有数据
List<FluxInfo> list = HBaseDao.getHbaseDao().queryByRange((start+"").getBytes(), (stop+"").getBytes(), "^.*$");
//4.根据这些数据计算br
Map<String,Integer> map = new HashMap<>();
for(FluxInfo fi : list){
String ssid = fi.getSsid();
map.put(ssid, map.containsKey(ssid) ? map.get(ssid) + 1 : 1);
}
 
int ssCount = map.size();
int brCount = 0;
for(Map.Entry<String, Integer>entry : map.entrySet()){
if(entry.getValue() == 1)brCount++;
}
 
double br = 0;
if(ssCount != 0){
br = Math.round(brCount * 10000.0 / ssCount)/10000.0;
}
//5.发送数据
List<Object> values = input.getValues();
values.add(br);
collector.emit(input,values);
collector.ack(input);
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("time","br"));
}
 
}

AvgTimeBolt代码示例:

public class AvgTimeBolt extends BaseRichBolt {
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
//1.获取当前触发时间 作为 统计结束的位置
long stop = input.getLongByField("time");
//2.根据触发时间 向前推算15分钟 作为 统计开始的位置
long start = stop - 1000 * 60 *15;
//3.到hbase中查询这个时间段内的所有数据
List<FluxInfo> list = HBaseDao.getHbaseDao().queryByRange((start+"").getBytes(), (stop+"").getBytes(), "^.*$");
//4.根据这些数据计算avgtime
Map<String,List<FluxInfo>> map = new HashMap<>();
for(FluxInfo fi : list){
String ssid = fi.getSsid();
if(map.containsKey(ssid)){
map.get(ssid).add(fi);
}else{
List<FluxInfo> fiList = new ArrayList<>();
fiList.add(fi);
map.put(ssid, fiList);
}
}
 
int ssCount = map.size();
long useTime = 0;
for(Map.Entry<String, List<FluxInfo>> entry : map.entrySet()){
String ssid = entry.getKey();
List<FluxInfo> fiList = entry.getValue();
long maxTime = Long.MIN_VALUE;
long minTime = Long.MAX_VALUE;
for(FluxInfo fi : fiList){
long ssTime = Long.parseLong(fi.getSstime());
if(ssTime >= maxTime){
maxTime = ssTime;
}
if(ssTime <= minTime){
minTime = ssTime;
}
}
useTime += maxTime - minTime;
}
 
double avgTime = 0;
if(ssCount != 0){
avgTime = Math.round(useTime * 10000.0 / ssCount)/10000.0;
}
 
//5.发送数据
List<Object> values = input.getValues();
values.add(avgTime);
collector.emit(input,values);
collector.ack(input);
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("time","br","avgtime"));
}
 
}
 
AvgDeepBolt代码示例:
public class AvgDeepBolt extends BaseRichBolt {
 
private OutputCollector collector = null;
 
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
 
@Override
public void execute(Tuple input) {
//1.获取当前触发时间 作为 统计结束的位置
long stop = input.getLongByField("time");
//2.根据触发时间 向前推算15分钟 作为 统计开始的位置
long start = stop - 1000 * 60 *15;
//3.到hbase中查询这个时间段内的所有数据
List<FluxInfo> list = HBaseDao.getHbaseDao().queryByRange((start+"").getBytes(), (stop+"").getBytes(), "^.*$");
//4.根据这些数据计算avgtime
Map<String,Set<String>> map = new HashMap<>();
for(FluxInfo fi : list){
String ssid = fi.getSsid();
if(map.containsKey(ssid)){
Set<String> set = map.get(ssid);
set.add(fi.getUrlname());
}else{
Set<String> set = new HashSet<>();
set.add(fi.getUrlname());
map.put(ssid, set);
}
}
 
int ssCount = map.size();
int deep = 0;
for(Map.Entry<String, Set<String>> entry : map.entrySet()){
deep += entry.getValue().size();
}
 
double avgDeep = 0;
if(ssCount != 0){
avgDeep = Math.round(deep * 10000.0 / ssCount)/10000.0;
}
 
//5.发送数据
List<Object> values = input.getValues();
values.add(avgDeep);
collector.emit(input,values);
collector.ack(input);
}
 
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("time","br","avgtime","avgdeep"));
}
 
}
 
WeblogTopology代码示例:
public class WeblogTopology {
 
public static void main(String[] args) throws Exception {
Config conf=new Config();
 
BrokerHosts hosts=new ZkHosts("192.168.150.137:2181,"
+ "192.168.150.138:2181,"
+ "192.168.150.139:2181");
 
SpoutConfig sconf=new SpoutConfig(hosts,"weblog","/weblog","info");
 
sconf.scheme=new SchemeAsMultiScheme(new StringScheme());
 
KafkaSpout spout=new KafkaSpout(sconf);
 
TopologyBuilder builder=new TopologyBuilder();
PrintBolt printBolt=new PrintBolt();
ClearBolt clearBolt=new ClearBolt();
PvBolt pvBolt=new PvBolt();
UvBolt uvBolt=new UvBolt();
VvBolt vvBolt=new VvBolt();
NewIpBolt newipBolt=new NewIpBolt();
NewCustBolt newcustBolt=new NewCustBolt();
 
HBaseBolt hbaseBolt=new HBaseBolt();
MysqlBolt mysqlBolt=new MysqlBolt();
 
 
builder.setSpout("Kafka_Spout", spout);
builder.setBolt("Clear_Bolt", clearBolt).shuffleGrouping("Kafka_Spout");
builder.setBolt("Pv_Bolt", pvBolt).globalGrouping("Clear_Bolt");
builder.setBolt("Uv_Bolt", uvBolt).globalGrouping("Pv_Bolt");
builder.setBolt("Vv_Bolt", vvBolt).globalGrouping("Uv_Bolt");
builder.setBolt("NewIp_Bolt", newipBolt).globalGrouping("Vv_Bolt");
builder.setBolt("NewCust_Bolt", newcustBolt).globalGrouping("NewIp_Bolt");
builder.setBolt("HBase_Bolt", hbaseBolt).globalGrouping("NewCust_Bolt");
builder.setBolt("Print_Bolt", printBolt).globalGrouping("NewCust_Bolt");
builder.setBolt("Mysql_Bolt", mysqlBolt).globalGrouping("NewCust_Bolt");
 
//1.tick组件的绑定
TimeBolt timeBolt = new TimeBolt();
BrBolt brBolt = new BrBolt();
AvgTimeBolt avgTimeBolt = new AvgTimeBolt();
AvgDeepBolt avgDeepBolt = new AvgDeepBolt();
PrintBolt tickPrintBolt = new PrintBolt();
//2.创建构建者
TopologyBuilder tickBuilder = new TopologyBuilder();
//3.组织拓扑结构
tickBuilder.setBolt("Time_Bolt", timeBolt);
tickBuilder.setBolt("Br_Bolt", brBolt).shuffleGrouping("Time_Bolt");
tickBuilder.setBolt("Avg_Time_Bolt", avgTimeBolt).shuffleGrouping("Br_Bolt");
tickBuilder.setBolt("Avg_Deep_Bolt", avgDeepBolt).shuffleGrouping("Avg_Time_Bolt");
tickBuilder.setBolt("Tick_Print_Bolt", tickPrintBolt).shuffleGrouping("Avg_Deep_Bolt");
 
StormTopology toplogy=builder.createTopology();
StormTopology tickTopology = tickBuilder.createTopology();
LocalCluster cluster=new LocalCluster();
 
 
cluster.submitTopology("Weblog_Topology",conf,toplogy);
cluster.submitTopology("Tick_Topology", conf, tickTopology);
 
 
 
}
 
}

上一篇 38.大数据之旅——网站流量统计项目之实时业务系统(Kafka,storm,Hbase)

猜你喜欢

转载自blog.csdn.net/qq_39188039/article/details/86589062