//启动hive server 2
bin/hiveserver2
//启动beeline
bin/beeline
//链接hive server
!!connect jdbc:hive2://master.hadoop.com:10000 hadoop hadoop org.apache.hive.jdbc.HiveDriver
bin/beeline -u jdbc:hive2://hadoop-senior.ibeifeng.com:10000/default
HiveServer2 JDBC
将分析的结果存储在hive表(result),前段通过DAO代码,进行数据的查询。
================================================================
input -> map -> shuffle -> reduce -> output
数据压缩
数据量小
* 本地磁盘,IO
* 减少 网络IO
通常情况下
block -> map
10G ,10 block
压缩
5G , 5 block
=================================================================
1) 安装sanppy
2) 编译haodop 2.x源码
mvn package -Pdist,native -DskipTests -Dtar -Drequire.snappy
/opt/modules/hadoop-2.5.0-src/target/hadoop-2.5.0/lib/native
15/08/31 23:10:16 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native
15/08/31 23:10:16 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
Native library checking:
hadoop: true /opt/modules/hadoop-2.5.0/lib/native/libhadoop.so
zlib: true /lib64/libz.so.1
snappy: true /opt/modules/hadoop-2.5.0/lib/native/libsnappy.so.1
lz4: true revision:99
bzip2: true /lib64/libbz2.so.1
>>>>>>>>>>>>>>
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount /user/beifeng/mapreduce/wordcount/input /user/beifeng/mapreduce/wordcount/output
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar wordcount -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec /user/beifeng/mapreduce/wordcount/input /user/beifeng/mapreduce/wordcount/output22
=================================================================
file_format:
:
| SEQUENCEFILE
| TEXTFILE -- (Default, depending on hive.default.fileformat configuration)
| RCFILE -- (Note: Available in Hive 0.6.0 and later)
| ORC -- (Note: Available in Hive 0.11.0 and later)
| PARQUET -- (Note: Available in Hive 0.13.0 and later)
| AVRO -- (Note: Available in Hive 0.14.0 and later)
| INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname
数据存储
* 按行存储数据
* 按列存储数据
create table page_views(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
;
load data local inpath '/opt/datas/page_views.data' into table page_views ;
dfs -du -h /user/hive/warehouse/page_views/ ;
18.1 M /user/hive/warehouse/page_views/page_views.data
>>>>>>>orc
创建一个表来测试ORC file的使用
create table index_test.airline (
CODE string ,
PREFIX string ,
NAME string,
LOCAL_NAME string ,
E_MAIL string,
TYPE string,
MEMBER string,
MANAGER string,
ADDRESS string,
COUNTRY string,
BSP_DATE string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
stored as Textfile;
create table page_views_orc(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc ;
insert into table page_views_orc select * from page_views ;
dfs -du -h /user/hive/warehouse/page_views_orc/ ;
2.6 M /user/hive/warehouse/page_views_orc/000000_0
>>>>>>>> parquet
create table page_views_parquet(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS PARQUET
;
insert into table page_views_parquet select * from page_views ;
dfs -du -h /user/hive/warehouse/page_views_parquet/ ;
13.1 M /user/hive/warehouse/page_views_parquet/000000_0
select session_id,count(*) cnt from page_views group by session_id order by cnt desc limit 30 ;
select session_id,count(*) cnt from page_views_orc group by session_id order by cnt desc limit 30 ;
-------
select session_id from page_views limit 30 ;
select session_id from page_views_orc limit 30 ;
select session_id from page_views_parquet limit 30 ;
========================================================
create table page_views_orc_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
insert into table page_views_orc_snappy select * from page_views ;
dfs -du -h /user/hive/warehouse/page_views_orc_snappy/ ;
--------------
create table page_views_orc_none(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="NONE");
insert into table page_views_orc_none select * from page_views ;
dfs -du -h /user/hive/warehouse/page_views_orc_none/ ;
--------------------
set parquet.compression=SNAPPY ;
create table page_views_parquet_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS parquet;
insert into table page_views_parquet_snappy select * from page_views ;
dfs -du -h /user/hive/warehouse/page_views_parquet_snappy/ ;
总结:
在实际的项目开发当中,hive表的数据
* 存储格式
orcfile / qarquet
* 数据压缩
snappy
-------------------------------------------------------------------------
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
[AS select_statement];
CREATE EXTERNAL TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type [COMMENT col_comment], ...)]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[ROW FORMAT row_format]
set parquet.compression=SNAPPY ;
create table page_views_par_snappy
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS parquet
AS select * from page_views ;
dfs -du -h /user/hive/warehouse/page_views_par_snappy/ ;
------------------------------------
emp
dept
select e.a, e,b ,d.h,d.f from
(select .... from emp where emp.filter) e
join
(select .... from dept where emdeptp.filter) d
on(e.deptno = d.deptno);
select e.a, e,b ,d.h,d.f from
like '%xuanyu%'
=======================================================
Hive 企业中的Join优化
Common/Shuffle/Reduce Join
连接发生的阶段,发生在 Reduce Task
适用情形:大表对大表
每个表的数据都是从文件中读取的
Map Join
连接发生的阶段,发生在 Map Task
使用情形:小表对大表
* 大表的数据放从文件中读取 cid
* 小表的数据放到内存中 id
DistributedCache
SMB Join(Sort-Merge-BUCKET Join)
先排序,再join
====================================================
customer
3 bucket
1st
1001 - 1101
2nd
1201 - 1401
3rd
1501 - 1901
order
1st
1001 - 1101
2nd
1201 - 1401
3rd
1501 - 1901
====================================================
Hive 优化——执行计划
查看每个命令在MapReduce中执行的过程
EXPLAIN select * from emp ;
EXPLAIN select deptno,avg(sal) avg_sal from emp group by deptno ;
EXPLAIN EXTENDED select deptno,avg(sal) avg_sal from emp group by deptno ;
====================================================
job1 a join b aa
job2 c join d cc
job3 aa join cc
Map Task/Reduce Task
JVM 运行
推测执行
====================================================
bf_log_src
* 表的类型
外部表,分区表(month,day)
bf_log_ip
/month=2015-09/day=20
bf_log_page
bf_log_refer
前段数据入库
* src
load data ....
* bf_log_ip
load data path '' into table bf_log_ip partition(month='2015-09',day='20') ;
====================================================
实践项目思路
* 原表保存原始数据
* 针对不同的业务创建不同的子表
* 数据存储格式orcfile/parquet
* 数据压缩 snappy
* map output 数据压缩 snappy
* 外部表
* 分区表(演示)
//创建原表
create table IF NOT EXISTS default.bf_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
stored as textfile ;
//加载数据到创建好的表中
load data local inpath '/opt/datas/moodle.ibeifeng.access.log' into table default.bf_log_src ;
select count(*) from bf_log_src ;
select * from bf_log_src limit 5 ;
>>>>>>>>>>>>>>>
修改上面的那个表,使用正则表达式来格式化数据
drop table if exists default.bf_log_src ;
create table IF NOT EXISTS default.bf_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES
(
"input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
)
STORED AS TEXTFILE;
load data local inpath '/opt/datas/moodle.ibeifeng.access.log' into table default.bf_log_src ;
>>>>>>>>>>>>>>>>>>>>>>>>>>>
drop table if exists default.bf_log_comm ;
create table IF NOT EXISTS default.bf_log_comm (
remote_addr string,
time_local string,
request string,
http_referer string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY")
;
insert into table default.bf_log_comm select remote_addr, time_local, request,http_referer from default.bf_log_src ;
select * from bf_log_comm limit 5 ;
============
定义UDF,对原表数据进行清洗
第一个udf
去除引号
add jar /opt/datas/hiveudf2.jar ;
create temporary function my_removequotes as "com.beifeng.senior.hive.udf.RemoveQuotesUDF" ;
insert overwrite table default.bf_log_comm select my_removequotes(remote_addr), my_removequotes(time_local), my_removequotes(request), my_removequotes(http_referer) from default.bf_log_src ;
select * from bf_log_comm limit 5 ;
============
第二个 UDF
处理日期时间字段
31/Aug/2015:00:04:37 +0800
20150831000437
add jar /opt/datas/hiveudf3.jar ;
create temporary function my_datetransform as "com.beifeng.senior.hive.udf.DateTransformUDF" ;
insert overwrite table default.bf_log_comm select my_removequotes(remote_addr), my_datetransform(my_removequotes(time_local)), my_removequotes(request), my_removequotes(http_referer) from default.bf_log_src ;
select * from bf_log_comm limit 5 ;
========================================
desc function extended substring ;
substring('Facebook', 5, 1)
'b'
下标从1开始计数
select substring('20150831230437',9,2) hour from bf_log_comm limit 1 ;
select t.hour, count(*) cnt from
(select substring(time_local,9,2) hour from bf_log_comm ) t
group by t.hour order by cnt desc ;
----
select t.prex_ip, count(*) cnt from
(
select substring(remote_addr,1,7) prex_ip from bf_log_comm
) t
group by t.prex_ip order by cnt desc limit 20 ;
123.123 7
xx.xx 5
====================================================
196 242 3 881250949
userid moveid rate time
CREATE TABLE u_data (
userid INT,
movieid INT,
rating INT,
unixtime STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH '/opt/datas/ml-100k/u.data' OVERWRITE INTO TABLE u_data;
-------
CREATE TABLE u_data_new (
userid INT,
movieid INT,
rating INT,
weekday INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
add FILE /opt/datas/ml-100k/weekday_mapper.py;
INSERT OVERWRITE TABLE u_data_new
SELECT
TRANSFORM (userid, movieid, rating, unixtime) -- input from source table
USING 'python weekday_mapper.py' -- script
AS (userid, movieid, rating, weekday) --output from python
FROM u_data;
INSERT OVERWRITE TABLE u_data_new
SELECT
TRANSFORM (userid, movieid, rating, unixtime)
USING 'python weekday_mapper.py'
AS (userid, movieid, rating, weekday)
FROM u_data;
SELECT weekday, COUNT(*) FROM u_data_new GROUP BY weekday;
SELECT weekday, COUNT(1) cnt FROM u_data_new GROUP BY weekday order by cnt desc;
----
import sys
import datetime
for line in sys.stdin:
line = line.strip()
userid, movieid, rating, unixtime = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([userid, movieid, rating, str(weekday)])