/**
* 问题:
* 在对数据进行训练时,Rating要求的是int,int,double,但是现有的数据是long,string,double类
型,使用toInt进行转换时依然会报错,这是因为long类型转换成int类型已经超出了int的最大值。
*
* 解决思路:
* 创建两张映射表,将long类型的字段映射成int类型(使用row_number()添加自增序列),然后拿映射的
* 字段进行训练,训练完成后,再将映射的字段转换成原有字段,而后进行推荐。
*
*/
user_id (bigdata) | news_number(string) | rate(double) |
142110036 | ZXW1245848266 | 0.86 |
user_id (bigdata) | user_id_mapping (int) |
142110036 | 1 |
152110037 | 2 |
.................... | n |
news_number(string) | news_number_mapping(int) |
ZXW1245848267 | 1 |
ZXW1245848268 | 2 |
........................... | n |
第一步:编写impalaJdbcDao
hive新建两张表user_id_mapping、news_number_mapping
object ImpalaJdbcDap{
/**
*
* hive 新建两张表
* (1)
* create table userid_mapping(user_id bigint,user_id_number_mapping bigint);
* (2)
* create table newsNumber_mapping(news_number string,news_number_mapping bigint);
*
*
*/
//创建userid_mapping映射表
def insertUserIDMapping():Unit = {
val conn = getConnection()
val sql =
'''
insert overwrite table userId_mapping
select
user_id,
ROW_NUMBER() OVER(order by user_id) user_id_mapping
from
(select distinct user_id from user_browse_cation) t
'''
val ps = conn.prepareStatement(sql)
ps.execute()
ps.close()
conn.close()
}
//创建newsNumber_mapping映射表
def insertNewsNumberMapping():Unit = {
val conn = getConnection()
val sql =
'''
insert overwrite table newsNumber_mapping
select
news_number,
ROW_NUMBER() OVER(order by user_id) news_number_mapping
from
(select distinct news_number from user_browse_cation) t
'''
val ps = conn.prepareStatement(sql)
ps.execute()
ps.close()
conn.close()
}
}
第二步:编写调用类 BatchNewsStreaming.scala
//在BatchNewsStreaming.scala类中调用映射表
/*
* 生成user_id映射表(userid_mapping)
*
* user_id user_id_mapping
* 120040203 1
* 120040208 2
* 120040209 3
* ......... ....
* 128272742 n
*/
ImpalaJdbcDao.insertUserIDMapping()
/*
* 生成new_number映射表(newsnumber_mapping)
*
* news_number news_number_mapping
* zxm352719321 1
* zxm352719323 2
* zxm354371932 3
* ......... ....
* zxm354371543 n
*/
ImpalaJdbcDao.insertNewsNumberMapping()
//计算评分数据开始
val sql =
'''
select
t.user_id,
t.news_number,
sum(t.oper_rating) as rate
from
(select
user_id,
news_number,
case
poer_type
when 2 then 1
when 3 then 2
else 0 end
as oper_rating
from
(
select
t2.news_number_mapping as news_number,
t3.user_id_mapping as user_id,
t1.oper_type
from
user_browse_action t1,
newsNumber_mapping t2,
userId_mapping t3
where
t1.news_number=t2.news_number
and
t1.user_id=t3.user_id
) t4
) t
group by
t.user_id,
t.news_number
'''
val userRatingDf = sess.read
.format("jdbc")
.option("url",url)
.option("dbtable","(" + sql +") as result")
.option("driver","com.cloudera.impala.jdbc41.Driver")
.load
userRatingDf.show(30)
println("计算评分数据结束")
/**
* 构建推荐模型
*/
println("开始模型数据准备")
val userRatingRdd = userRatingDf.rdd
val ratings = userRatingRdd.map(t =>{
// 经过映射处理之后,此处再转换成int时,运行之后就不会报错了
Rating(t.getLong(0).toInt,t.getLong(1).toInt,t.getLong(2).toDouble)
})
println("结束模型数据准备")