上一篇中的测试时是采用kafka消费者,如果把消费者换成hbase就可以实现hbase提取kafka中的数据进行存储。
启动hbase要先启动hadoop,hbase需要zk
启动hadoop:start-dfs.sh
启动hbase:start-hbase.sh
要hbase高可用,需要在其他节点中启动:hbase-daemon.sh start master
各节点进程:
创建hbase消费者:
在idea中需要引入hbase-site.xml以及hdfs-site.xml 文件 一样配置文件外部化:
kafka.properties:
zookeeper.connect=s128:2181,s129:2181,s130:2181 group.id=g4 //用户组 zookeeper.session.timeout.ms=500 zookeeper.sync.time.ms=250 auto.commit.interval.ms=1000 auto.offset.reset=smallest #主题 topic=calllog //kafka中的topic #表名 table.name=ns1:calllogs //hbase中数据表名 #分区数 partition.number=100 #主叫标记 caller.flag=0 #hash区域的模式 hashcode.pattern=00
创建HbaseDao类,访问hbase,进行数据相关操作:
/** * Hbase数据访问对象 */ public class HbaseDao { // private DecimalFormat df = new DecimalFormat() ; private Table table = null ; private int partitions ; private String flag ; public HbaseDao(){ try { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName name = TableName.valueOf(PropertiesUtil.getProp("table.name")); table = conn.getTable(name); df.applyPattern(PropertiesUtil.getProp("hashcode.pattern")); partitions = Integer.parseInt(PropertiesUtil.getProp("partition.number")); flag = PropertiesUtil.getProp("caller.flag") ; } catch (Exception e) { e.printStackTrace(); } } /** * put数据到hbase */ public void put(String log){ if (log == null || log.equals("")) { return; } try { //解析日志 String[] arr = log.split(","); if (arr != null && arr.length == 4) { String caller = arr[0]; String callee = arr[1]; String callTime = arr[2]; callTime = callTime.replace("/","") ; //删除/ callTime = callTime.replace(" ","") ; //删除空格 callTime = callTime.replace(":","") ; //删除空格 String callDuration = arr[3]; //结算区域号 //构造put对象 String rowkey = genRowkey(getHashcode(caller, callTime), caller, callTime, flag, callee, callDuration); // Put put = new Put(Bytes.toBytes(rowkey)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callTime"), Bytes.toBytes(callTime)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callDuration"), Bytes.toBytes(callDuration)); table.put(put); } } catch (Exception e) { e.printStackTrace(); } } public String getHashcode(String caller ,String callTime){ int len = caller.length(); //取出后四位电话号码 String last4Code = caller.substring(len - 4); //取出时间单位,年份和月份. String mon = callTime.substring(0,6); // int hashcode = (Integer.parseInt(mon) ^ Integer.parseInt(last4Code)) % partitions ; return df.format(hashcode); } /** * 生成rowkey * @param hash * @param caller * @param time * @param flag * @param callee * @param duration * @return */ public String genRowkey(String hash,String caller,String time,String flag,String callee,String duration){ return hash + "," + caller + "," + time + "," + flag + "," + callee + "," + duration ; } }
创建HbaseConsumer(hbase消费者):
** * Hbase消费者,从kafka提取数据,存储到hbase中。 */ public class HbaseConsumer { public static void main(String[] args) throws Exception { HbaseDao dao = new HbaseDao(); //创建配置对象 ConsumerConfig config = new ConsumerConfig(PropertiesUtil.props); //获得主题 String topic = PropertiesUtil.getProp("topic"); // Map<String, Integer> map = new HashMap<String, Integer>(); map.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(PropertiesUtil.props)).createMessageStreams(map); List<KafkaStream<byte[], byte[]>> msgList = msgs.get(topic); String msg = null ; for (KafkaStream<byte[], byte[]> stream : msgList) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { byte[] message = it.next().message(); //取得kafka的消息 msg = new String(message) ; //写入hbase中。 dao.put(msg); } } } }
打成jar包放到s128。
因为事先要到入很多相关包,所以在window下使用mvn命令,下载工件的所有依赖软件包
----------------------------------------
mvn -DoutputDirectory=./lib -DgroupId=com.chenway -DartifactId=CallLogConsumerModule -Dversion=1.0-SNAPSHOT dependency:copy-dependencies -DgroupId=com.chenway -DartifactId=CallLogConsumerModule -Dversion=1.0-SNAPSHOT dependency:copy-dependencies
将生成的所有jar包放入s128下lib文件夹
编写run-kafkaconsumer.sh脚本:
运行生成数据以及hbase消费者脚本:
./run-kafkaconsumer.sh
./calllog.sh
可以进入hbase shell
查看命令:scan ‘ns1:calllogs’