电信项目:
一、idea项目构建
1、安装jdk并配置环境变量
2、安装maven,解压离线仓库,并设置settings
conf目录下的setttings.xml文件复制到离线仓库的m2目录下,并修改mirror标签以及离线仓库路径。
设置idea工具的maven选项,涉及到4个地方:work offline,以及3个maven设置吧。注意留意:override选项。
3、新建ct主项目目录(相当于eclipse的workset)
一个项目对应一个文件夹,举例:
workspace:
ct:(新建moduel)
ct_producer:
该项目的各种包
ct_analysis:
该项目的各种包
4、新建ct_producer 模块,用于数据生产代码的编写或构建
** 构建该项目选择maven,ct项目下所有的模块(module)都是maven工程。(maven要是用3.3.9的)
5、设置常用选项
取消idea自动打开之前项目的功能(搜索reopen,关闭相关标签即可)
设置字体大小(Editor——font——size)进行设置
设置字符编码:搜索:encoding,3个位置全部改为utf-8
自动导包以及自动提示设置(搜索auto,设置自动导包为ask,代码自动提示为first letter)
尖叫提示:
idea-setttings设置的是当前项目的配置(只针对当前项目生效)
idea-file-others-default settings设置的是全局默认配置(也就是说,以后新建项目都是按照这个默认配置)
二、数据生产
1、新建Producer.java
** 初始化联系人集合用于随机数据使用
** 随机两个电话号码
** 随机通话建立的时间,返回String,格式:yyyy-MM-dd HH:mm:ss
** 随机通话持续时间
** 将产生的数据写入到本地磁盘中(日志文件)
- 技术点
获得list集合的随机下标
获得主叫电话以及姓名
int indexcaller=(int)(Math.random()*phoneList.size());
获得被叫的电话以及姓名,如果主叫等于被叫就继续循环,不等于就对被叫进行赋值
获得随机日期
//将带格式的日期转换成毫秒数
Date startdata=formateTime.parse(startTime);
获得某时间区间范围内的日期
long radomdata=startdata.getTime()+(long)((enddata.getTime()-startdata.getTime())*Math.random());
路径是传入进去的args[0]
三、数据消费(数据存储)
flume:cloudera公司研发
适合下游数据消费者不多的情况;
适合数据安全性要求不高的操作;
适合与Hadoop生态圈对接的操作。
kafka:linkedin公司研发·
适合数据下游消费众多的情况;
适合数据安全性要求较高的操作(支持replication);
1、安装运行zookeeper
2、安装配置kafka,此时我使用的版本是2.10-0821
** 修改server.properties
3、启动kafka集群
启动kafka集群,分别在3台机器上执行:
/home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /home/admin/modules/kafka_2.10-0.8.2.1/config/server.properties
创建kafka主题:calllog
/home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper linux01:2181 --topic calllog --create --replication-factor 1 --partitions 4
** 查看主题列表
$ /home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper linux01:2181 --list
启动kafka控制台消费者,用于测试
$ /home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper linux01:2181 --topic calllog --from-beginning
4、配置flume,用于监听实时产生的数据文件
** 创建flume的job配置文件
尖叫提示:由于在配置flume的过程中,涉及到了数据监听读取方式的操作“tail -F -c +0”,即每次读取完整的文件,所以修改了java代码中,输出流的写出方式为:非追加,即覆盖文件。
启动flume服务,注意,该语句需要进如flume跟目录下执行
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobs/flume-kafka.conf
5、生产日志
6、使用Java KafkaAPI读取Kafka中缓存的数据
** 通过https://mvnrepository.com/网站找到你需要使用的依赖
** 导入依赖
** 建立包结构
** 创建kafka消费者类,同时创建配置文件以及配置文件的工具类
** 实现了将生产的日志实时读取到控制台
7、成功拿到数据之后,使用Java HBaseAPI将数据放入Hbase表中
由于数据支持实时查询所以选择将数据存放在hbase表中,而hive是低延迟
** 拿到一条数据,我要把这条数据放到Hbase表中
** 创建工具类创建命名空间配置文件,表,等
** 创建DAO实现数据存放功能
** 思路没有捋清楚:
1、创建命名空间
2、创建表(不要先不要添加协处理器)(注意,需要预分区)
3、创建rowkey生成方法,创建预分区
3.1rowkey是按照字典存储,因此设置rowkey时,要充分利用排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放到一块。
3.2预先创建一些空的Regions,这样当数据写入HBase的时候,会按照 Region分区情况,在进群内做数据的负载均衡。(保证数据在region中均衡分配)预分区返回的是一个字节二维数组
- 创建预分区号生成方法
将电话号码的后四位与年月的后六位进行异或除以分区数,这样设计使数据在region分布更均匀。
5、在HBaseDAO中的构造方法里,初始化命名空间,初始化表(注意判断表是否存在)
6、在HBaseDAO中创建put方法,用于存放数据
7、在kafka取得数据时,使用HbaseDAO的实例化对象,调用put方法,将数据存入即可。
Hbase的API使用流程:
- 首先需要创建HbaseAdmin表的操作对象,使表进入到编辑模式
- 初始化表的对象加载Configuration配置文件
- admin.tableExists(tableName) 判断表是否存在
- NamespaceDescriptor ns = NamespaceDescriptor
.create(namespace)创建命名空间描述
admin.createNamespace(ns);创建命名空间
5.HTableDescriptor tableDescriptor创建表的描述
admin.createTable(tableDescriptor, getSplitKeys(regions));创建表
6.通过配置文件拿到值创建表的预分区
7.需要添加的内容
//rowKey 主叫 被叫 通话时间 通话时间的毫秒表示 通话时长 主叫被叫的标志
rowKey的设计要用到经常用到的数据rowkey是按照字典存储,因此设置rowkey时,要充分利用排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放到一块。
//rowkey的设计 分区号 主叫 通话时间 被叫 主叫标志 通话时长
8.添加协处理器(插入一条被叫数据)
在admin.createTable之前将协处理器添加进去
tableDescriptor.addCoprocessor("hbase.CalleeWriteObserver");
协处理器在put的同时将添加协处理器的内容
协处理器操作也是需要得到表明,添加数据、与当前操作表进行比较、放入数据等操作
9、写入数据到HBase中注意事项
** 先确保表是成功创建的
** 检查Hbase各个节点均为正常,通过浏览器60010查看
** maven导包,不要导错了,不要重复导包,不要导错版本
** 代码逻辑执行顺序要注意
** 超时时间设置:
*** kafka根目录下的config目录下,修改server.properties文件对于zookeeper超时时间的限定。
*** 项目的resoureces目录下的kafka.properties文件中,关于zookeeper超时的设置。
以上两个值,设置都稍大一些,比如50000
10、优化数据存储方案:使用协处理器
1、同一条数据,存储两遍。
rowkey:实现了针对某个人,查询该人范围时间内的所有通话记录
分区号 + call1 + dateTime + call2 + flag + duration
15837312345_20170101000000_13733991234
2、讨论使用协处理器的原因
3、操作过程
** 创建协处理器类:CalleeWriteObserver extends BaseRegionObserver
** 覆写postPut方法
** 编写代码一大堆(实现将被叫数据存入)
** 创建表方法中:表述器中添加你成功创建的协处理器
** 修改resources中hbase-site.xml配置文件
** 打包
** 将包分别放于3台机器中的hbase根目录中的lib目录下(群发)
** 3台机器中的hbase-site.xml文件注册协处理器(群发)
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.china.coprocessor.CalleeWriteObserver</value>
</property
** 重启hbase
** 测试
** 如果测试成功,记得把表初始化一下
运行语句:
1.启动yarn
2.启动hadoop
3.启动hbase
4.启动kafka
(1)启动kafka的集群
bin/kafka-server-start.sh -daemon config/server.properties
(2)创建kefka分区
bin/kafka-topics.sh --zookeeper had01:2181 --topic calllog --create --replication-factor 1 --partitions 3
(3)查看分区
bin/kafka-topics.sh --zookeeper had01:2181 --list
(4)启动消费者
bin/kafka-console-consumer.sh --zookeeper had01:2181 --topic calllog --from-beginning
5.启动flum
bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/app/flume-1.8.0/calllog/flum-calllog-kafka.conf
6.开启kafka消费者
java -cp /datas/ct_consumer-1.0-SNAPSHOT.jar:/datas/mvn/* kafka.HBaseConsumer
7.java生产者
java -cp ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /home/hadoop/call/calllog.csv
注意:运行的时候需要将idea的依赖包导出来放到运行目录下并指定
11、打包,执行消费数据方法
方式一:
java -Djava.ext.dirs=./lib/ -cp ct_consumer.jar com.china.kafka.HBaseConsumer
方式二:
windows:java -cp ./lib/*;ct_consumer.jar com.china.kafka.HBaseConsumer
linux:java -cp ./lib/*:ct_consumer.jar com.china.kafka.HBaseConsumer
idea打包,并提交到linux执行
** 打包,并将打好的包以及第三方依赖整个拷贝出来
** 上传该文件夹(ct_consumer_jar)到linux中
** 运行:java -Djava.ext.dirs=./lib/ -cp ct_consumer.jar com.china.kafka.HBaseConsumer
6、某个用户,传入指定的时间范围,查询该时间范围内的该用户的所有通话记录(包含主叫和被叫)
15837312345 2017-01-01 2017-05-01
rowkey:01_15837312345_20171102181630_13737312345_1_0180
** scan.setStartRow
2017-01-01
** scan.setStopRow
2017-02-01
** 组装rowkey
01_15837312345_201711
01_15837312345_201712
7、Filter测试讲解
四、数据分析
类的结构以及功能:
1.mapper----reduce------runner
mapper端的输出-------kv设计类
key的组成分为两个类联系人+日期------组合(有父类)电话号码+时间
valve:通话次数+通话时长(有父类)
2.runner类输入hbase------输出mysql----
3.MySQLOutputFormat类实现主表数据的插入
输出的形式1_1 1 1 次数 通话时长
4.DimensionConverterImpl:获取联系人信息以及时间的信息的id
为了避免从mysql中重复拿数据所以需要用到lruCache 缓存类,将常用的数据放到缓存中,查找id的话首先在缓存中进行查找,缓存中没有的到mysql中进行查找。mysql中没有的话就将数据插入然后再次进行查找最后返回id信息。
- 连接数据库的工具类
连接数据库,关闭数据库
- LRUCache工具类
类的具体实现
1、统计所有用户每月通话记录(通话次数,通话时长)
2、统计所有用户每月通话记录(通话次数,通话时长)
3、导入Mysql建表语句(db_telecom.sql)
4、新建项目,构建包结构,创建能够想到的需要使用的类,不需要任何实现
(1)JDBCutils:
本类是jdbc连接的工具类负责打开数据库的连接关闭数据库的连接
(2)JDBCinstance:
不让其他对象过多的建立数据库的连接
- base文件夹主要是kv的接口类方便向上转型以及向下转型
- key中存放key的分类以及key的结合的类
kv.key.ContactDimension
private String telephone;
private String name;
kv.key.DateDimension
private String year;
private String month;
private String day;
kv.key.CommDimension
private ContactDimension contactDimension=new ContactDimension();
private DateDimension dateDimension=new DateDimension();
value类封装的是通话时长以及通话次数的属性
- mapper类的解析
在mapper阶段主要分析的是key的值继承tablemapper类从hbase中加载数据
过滤掉被调数据,使用主调数据的rowkey进行操作
将主叫数据年份、年月、年月日+通话时间
rowkey的被叫数据年份、年月、年月日+通话时间
分别通过context对象传递给reduce的阶段
context(date主叫年+contact,通话时长)
context(date主叫月+contact,通话时长)
context(date主叫日+contact,通话时长)
context(date被叫年+contact,通话时长)
context(date被叫月+contact,通话时长)
context(date被叫日+contact,通话时长)
- reduce类的实现
每个人的通话次数以及通话时长进行汇总
- runner类的实现
a.输入的路径为hbase表
b.输出的路径是mysql表
c.runner类继承的是Tool类,可以实现配置文件的set方法以及get方法
d.实现的是HBaseConfiguration.create(conf)
e.run方法的描述
f.创建job以及job运行的主类
g.设置inputstream路径
获得操作表的admin操作对象
判断表是否存在,不存在则抛出异常,存在则扫描这张表的数据
//初始化mapper
TableMapReduceUtil.initTableMapperJob("ns_ct:calllog", scan,
CountDurctionMapper.class, CommDimension.class, Text.class,
job, true);
h.设置job的reduce
i.设置outputformat
关联到MySQLoutputformat类
return job.waitForCompletion(true) ? 0 : 1;
j.执行run方法,args为mysql-connect工具
int status = ToolRunner.run(new countDurationRunner(), args);
(8)MySQLOutputFormat类的实现
继承OutputFormat
- 定义输出对象
OutputCommitter
- 初始化jdbc连接器,建立作业
RecordWriter<CommDimension, CountDurationValue> getRecordWriter(TaskAttemptContext context)
return new MysqlRecordWriter(conn);
- MysqlRecordWriter对象的实现
本类实现向数据库写数据
获得要写入数据库的所有的属性
时间id以及联系人的id的获取要通过DimensionConverterImpl类来实现
- 书写sql语句
- 将sql语句加载到preparedStatement对象中(执行数据库操作的接口)
设置preparedStatement对象的属性
preparedStatement.addBatch();批量执行sql语句
- 关闭数据库的操作
g.执行作业的提交getOutputCommitter()重写此方法
(9)返回联系人以及时间的id的类操作
a. 产生本类的日志文件
private static final Logger logger = LoggerFactory.getLogger(DimensionConverterImpl.class);
b. 对象线程化,每个线程管理自己的jdbc
private ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<>();
c. 构建内存缓存对象
private LRUCache lruCache = new LRUCache(3000);
d.获得时间对象或者联系人对象的toString的形式
String cacheKey = genCacheKey(dimension);
e. 缓存中的数据是以键值对的形式存在的
lruCache.containsKey(cacheKey)
判断是否包含key对应的值,如果包含的话则返回对应的值id
不包含的话则获得查询的sql的语句集合,包含插入的以及查询的sql的语句
f. 先执行查询操作
将语句放入preparedStatement 对象
设置参数,setArguments(preparedStatement, dimension);
返回结果,如果结果不为空返回对应的id如果为空继续执行
g. 返回的结果为空
执行插入操作后,再次执行对应的查询语句,返回对应的id
(10)问题总结:
1、ComDimension构造方法中,实例化时间维度和联系人维度对象。
2、MySQLOutputformat的close方法中,没有关闭资源,关闭:JDBCUtil.close(conn, preparedStatement, null);
3、Runner,Mapper,Reducer中的泛型,不要使用抽象类
4、DimensionConverter中的genSQL方法写反了,需要调换位置。
5、DimensionConverter中设置JVM退出时,关闭资源,如下:Runtime.getRuntime().addShutdownHook(new Thread(() ->
JDBCUtil.close(threadLocal.get(), null, null)));
6、Mysql的url连接一定要是具体的主机名或者IP地址
7、DimensionConverter中的close方法关闭数据库连接
8、调试时,打包jar,上传到linux,拔掉网线,进行测试。
9、数据库连接到底何时关闭,要梳理清楚。解决 Too many connections;
MySQLOutputformat -- MysqlRecordWriter -- DimensionConverter
10、mysql-driver包没有导入成功
执行语句
启动hadoop
启动zookeeper
启动日志检测
mr-jobhistory-daemon.sh start historyserver
启动hbase
/opt/app/hadoop-2.7.2/bin/yarn jar /datas/ct_analysis-1.0-SNAPSHOT.jar runner.countDurationRunner -libjars ./mysql-connector-java-5.1.27-bin.jar
此执行的方法是将mysql-connector-java-5.1.27-bin.jar打包到程序中后可以这样执行
/opt/app/hadoop-2.7.2/bin/yarn jar /datas/ct_analysis.jar runner.countDurationRunner
四、数据展示
1、展示数据所需要的字段都有哪些:
call_sum, call_duration_sum, telephone, name, year, month, day
2、通话通话次数与通话时长,展示用户关系。
3、通过表格,展示一个人当年所有的话单信息。
4.ssm框架,用到了spring相当于配置文件、和springMVC框架相当于controller层,dao层相当于mybatis层
类的实现:
(1)bean类的实现
QueryInfo,将前台传来的值封装成的类
bean.CallLog,将后台拿出来的值封装成的类
- 前台拿到数据,通过post的方式进行提交name+对应的值
由web.xml文件-----applicationContext.xml文件-----此文件中包含对应的需要扫描的包------同时此文件中还包括对应的返回路径的时候对应的路径
- 此时前端的数据到了controller层
前台的结果通过参数封装在 QueryInfo queryInfo对象中
将前端的数据封装为HashMap集合用于给sql语句进行传值
获得dao层的对象
判断前端返回过来的结果是否符合数据要求,不符合则提示输入错误
调用callLogDAO类(dao层)进行数据的结果查询
返回list<callLog>集合
获得月份/日期的集合
获得次数集合
获得通话时长集合
将数据通过model对象加载到另一个页面
- CallLogDAO类的实现
getcallloglist(HashMap<String,String> paramsMap)
可以给命名参数设置值
private NamedParameterJdbcTemplate namedParameterJdbcTemplate ;
参数传过来一个HashMap的集合
判断传过来的日期是符合用户全年查询还是全月的查询
并组装sql语句
BeanPropertyRowMapper<CallLog> beanPropertyRowMapper = new BeanPropertyRowMapper<>(CallLog.class);
List<CallLog> list = namedParameterJdbcTemplate.query(sql, paramsMap, beanPropertyRowMapper);
通过这两句返回结果集
前端通过${requestScope.name}参数进行拿值进行处理
五.Kafka问题修复:
初始化zookeeper
IDEA常用快捷键:
alt + enter:智能修复
ctrl + alt + v:自动生成当前对象名
ctrl + alt + t:自动呼出包裹菜单
ctrl + o:呼出覆写菜单
ctrl + alt + l:格式化代码
ctrl + shift + enter:自动补全当前行代码缺失的符号
IDEA方式打包工程:
File--project stru-- arti--点击加号--copy to the output....--ok--ok--build--rebuild