版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/shenchaohao12321/article/details/83020719
1、flume配置
tail-hdfs.conf
#tail-hdfs.conf
用tail命令获取数据,下沉到hdfs
启动命令:
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
########
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/log/test.log
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2、移动数据到预处理工作目录
movetopreworkdir.sh
#!/bin/bash
#
# ===========================================================================
# 程序名称:
# 功能描述: 移动文件到预处理工作目录
# 输入参数: 运行日期
# 目标路径: /data/weblog/preprocess/input
# 数据源 : /data/weblog/preprocess/output
# ===========================================================================
#set java env
export JAVA_HOME=/home/hadoop/apps/jdk1.7.0_51
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
#set hadoop env
export HADOOP_HOME=/home/hadoop/apps/hadoop-2.6.1
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH
#flume采集生成的日志文件存放的目录
log_flume_dir=/data/flumedata/
#预处理程序的工作目录
log_pre_input=/data/weblog/preprocess/input
#获取时间信息
day_01=`date -d'-1 day' +%Y-%m-%d`
syear=`date --date=$day_01 +%Y`
smonth=`date --date=$day_01 +%m`
sday=`date --date=$day_01 +%d`
#读取日志文件的目录,判断是否有需要上传的文件
files=`hadoop fs -ls $log_flume_dir | grep $day_01 | wc -l`
if [ $files -gt 0 ]; then
hadoop fs -mv ${log_flume_dir}/${day_01} ${log_pre_input}
echo "success moved ${log_flume_dir}/${day_01} to ${log_pre_input} ....."
fi
3、运行预处理
log_preprocess.sh
#!/bin/bash
# ===========================================================================
# 程序名称:
# 功能描述: 预处理原始日志
# 输入参数: 运行日期
# 目标路径: /data/weblog/preprocess/input
# 数据源 : /data/weblog/preprocess/output
# ===========================================================================
#set java env
export JAVA_HOME=/home/hadoop/apps/jdk1.7.0_51
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
#set hadoop env
export HADOOP_HOME=/home/hadoop/apps/hadoop-2.6.1
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH
#预处理程序类名
preprocess_class="com.learn.bigdata.hive.mr.pre.WeblogPreProcess"
#只输出valid记录的预处理程序类名
pre_valid_class="com.learn.bigdata.hive.mr.pre.WeblogPreValid"
#待处理日志存放的目录
log_pre_input=/data/weblog/preprocess/input
#预处理输出结果(raw)目录
log_pre_output=/data/weblog/preprocess/output
#预处理输出结果(valid)目录
log_pre_valid_output=/data/weblog/preprocess/valid_output
#获取时间信息
day_01=`date -d'-1 day' +%Y-%m-%d`
syear=`date --date=$day_01 +%Y`
smonth=`date --date=$day_01 +%m`
sday=`date --date=$day_01 +%d`
#读取日志文件的目录,判断是否有当日待处理的目录(如:2016-03-18)
files=`hadoop fs -ls $log_pre_input | grep $day_01 | wc -l`
if [ $files -gt 0 ]; then
#提交mr任务job运行
echo "running.. hadoop jar weblog.jar $preprocess_class $log_pre_input/$day_01 /$log_pre_output/$day_01"
hadoop jar weblog.jar $preprocess_class $log_pre_input/$day_01 $log_pre_output/$day_01
fi
echo "raw预处理运行结果: $?"
if [ $? -eq 0 ];then
#提交mr任务job运行
echo "running.. hadoop jar weblog.jar $pre_valid_class $log_pre_input $day_01 /$log_pre_valid_output/$day_01"
hadoop jar weblog.jar $pre_valid_class $log_pre_input/$day_01 $log_pre_valid_output/$day_01
fi
#如果失败
#发送邮件或短信,人为来干预
log_click.sh
#!/bin/bash
# ===========================================================================
# 程序名称:
# 功能描述: 点击流模型数据预处理
# 输入参数: 运行日期
# 目标路径: /data/weblog/preprocess/input
# 数据源 : /data/weblog/preprocess/output
# ===========================================================================
#set java env
export JAVA_HOME=/home/hadoop/apps/jdk1.7.0_51
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
#set hadoop env
export HADOOP_HOME=/home/hadoop/apps/hadoop-2.6.1
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH
#点击流pagevies模型预处理程序类名
click_pv_class="com.learn.bigdata.hive.mr.ClickStreamThree"
#点击流pagevies模型程序输入目录,即预处理输出结果(valid)目录
log_pre_output=/data/weblog/preprocess/output
#点击流pagevies模型预处理程序输出目录
click_pvout=/data/weblog/preprocess/click_pv_out
#点击流visit模型预处理程序类名
click_visit_class="com.learn.bigdata.hive.mr.ClickStreamVisit"
#点击流visit模型预处理程序输入目录,即pagevies模型预处理程序输出目录 $click_pvout
#点击流visit模型预处理程序输出目录
click_vstout=/data/weblog/preprocess/click_visit_out
#获取时间信息
day_01=`date -d'-1 day' +%Y-%m-%d`
syear=`date --date=$day_01 +%Y`
smonth=`date --date=$day_01 +%m`
sday=`date --date=$day_01 +%d`
#读取日志文件的目录,判断是否有当日待处理的目录(如:2016-03-18)
files=`hadoop fs -ls $log_pre_output | grep $day_01 | wc -l`
if [ $files -gt 0 ]; then
#提交mr任务job运行
echo "running.. hadoop jar weblog.jar $click_pv_class $log_pre_output/$day_01 $click_pvout/$day_01"
hadoop jar weblog.jar $click_pv_class $log_pre_output/$day_01 $click_pvout/$day_01
fi
echo "pv处理运行结果: $?"
if [ $? -eq 0 ];then
#提交mr任务job运行
echo "running.. hadoop jar weblog.jar $click_visit_class $click_pvout $day_01 $click_vstout/$day_01"
hadoop jar weblog.jar $click_visit_class $click_pvout/$day_01 $click_vstout/$day_01
fi
4、加载数据到ODS
load_ods_table.sh
#!/bin/bash
# ===========================================================================
# 程序名称:
# 功能描述: 加载数据到ODS
# 输入参数: 运行日期
# 数据路径: /data/weblog/preprocess/output
# 目标hive: sz.ods_weblog_orgin
# ===========================================================================
#set java env
export JAVA_HOME=/home/hadoop/apps/jdk1.7.0_51
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
#set hadoop env
export HADOOP_HOME=/home/hadoop/apps/hadoop-2.6.1
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH
#获取时间信息
day_01=`date -d'-1 day' +%Y-%m-%d`
syear=`date --date=$day_01 +%Y`
smonth=`date --date=$day_01 +%m`
sday=`date --date=$day_01 +%d`
#预处理输出结果(raw)目录
log_pre_output=/data/weblog/preprocess/output
#点击流pagevies模型预处理程序输出目录
click_pvout="/data/weblog/preprocess/click_pv_out"
#点击流visit模型预处理程序输出目录
click_vstout="/data/weblog/preprocess/click_visit_out"
#目标hive表
ods_weblog_origin="shizhan.ods_weblog_origin"
ods_click_pageviews="shizhan.ods_click_pageviews"
ods_click_visit="shizhan.ods_click_visit"
#导入raw数据到zs.ods_weblog_origin
HQL_origin="load data inpath '$log_pre_output/$day_01' into table $ods_weblog_origin partition(datestr='$day_01')"
echo $HQL_origin
/home/hadoop/apps/hive/bin/hive -e "$HQL_origin"
#导入点击流模型pageviews数据到
HQL_pvs="load data inpath '$click_pvout/$day_01' into table $ods_click_pageviews partition(datestr='$day_01')"
echo $HQL_pvs
/home/hadoop/apps/hive/bin/hive -e "$HQL_pvs"
#导入点击流模型visit数据到
HQL_vst="load data inpath '$click_vstout/$day_01' into table $ods_click_visit partition(datestr='$day_01')"
echo $HQL_vst
/home/hadoop/apps/hive/bin/hive -e "$HQL_vst"
5、etl脚本
etl_detail.sh
#!/bin/bash
# . /home/anjianbing/soft/functions/wait4FlagFile.sh
# ===========================================================================
# 程序名称:
# 功能描述: 抽取明细宽表
# 输入参数: 运行日期
# 目标表名: zs.ods_weblog_detail
# 数据源表: zs.ods_weblog_origin
# ===========================================================================
### 1.参数加载
exe_hive="/home/hadoop/apps/hive/bin/hive"
if [ $# -eq 1 ]
then
day_01=`date --date="${1}" +%Y-%m-%d`
else
day_01=`date -d'-1 day' +%Y-%m-%d`
fi
syear=`date --date=$day_01 +%Y`
smonth=`date --date=$day_01 +%m`
sday=`date --date=$day_01 +%d`
TARGET_DB=zs
TARGET_TABLE=ods_weblog_detail
### 2.定义执行HQL
HQL="
insert into table zs.ods_weblog_detail partition(datestr='$day_01')
select c.valid,c.remote_addr,c.remote_user,c.time_local,
substring(c.time_local,0,10) as daystr,
substring(c.time_local,12) as tmstr,
substring(c.time_local,6,2) as month,
substring(c.time_local,9,2) as day,
substring(c.time_local,11,3) as hour,
c.request,
c.status,
c.body_bytes_sent,
c.http_referer,
c.ref_host,
c.ref_path,
c.ref_query,
c.ref_query_id,
c.http_user_agent
from
(SELECT
a.valid,
a.remote_addr,
a.remote_user,a.time_local,
a.request,a.status,a.body_bytes_sent,a.http_referer,a.http_user_agent,b.ref_host,b.ref_path,b.ref_query,b.ref_query_id
FROM zs.ods_weblog_origin a
LATERAL VIEW
parse_url_tuple(regexp_replace(http_referer, \"\\\"\", \"\"), 'HOST', 'PATH','QUERY', 'QUERY:id') b
as ref_host, ref_path, ref_query, ref_query_id) c
"
#执行hql
$exe_hive -e "$HQL"
#异常处理
#如果失败,发送邮件、短信
etl_pvs_hour.sh
#!/bin/bash
# . /home/anjianbing/soft/functions/wait4FlagFile.sh
# ===========================================================================
# 程序名称:
# 功能描述: 抽取明细宽表
# 输入参数: 运行日期
# 目标表名: zs.dw_pvs_hours
# 数据源表: zs.ods_weblog_detail
# ===========================================================================
### 1.参数加载
exe_hive="/home/hadoop/apps/hive/bin/hive"
if [ $# -eq 1 ]
then
day_01=`date --date="${1}" +%Y-%m-%d`
else
day_01=`date -d'-1 day' +%Y-%m-%d`
fi
syear=`date --date=$day_01 +%Y`
smonth=`date --date=$day_01 +%m`
sday=`date --date=$day_01 +%d`
TARGET_DB=zs
TARGET_TABLE=dw_pvs_hours
HQL="
insert into table zs.dw_pvs_hour partition(datestr='$day_01')
select a.month as month,a.day as day,a.hour as hour,count(1) as pvs from zs.ods_weblog_detail a
where a.datestr='$day_01' group by a.month,a.day,a.hour;
"
#执行hql
$exe_hive -e "$HQL"
6、sqoop导出结果
sqoop_export.sh
#!/bin/bash
if [ $# -eq 1 ]
then
cur_date=`date --date="${1}" +%Y-%m-%d`
else
cur_date=`date -d'-1 day' +%Y-%m-%d`
fi
echo "cur_date:"${cur_date}
year=`date --date=$cur_date +%Y`
month=`date --date=$cur_date +%m`
day=`date --date=$cur_date +%d`
table_name=""
table_columns=""
hadoop_dir=/user/rd/bi_dm/app_user_experience_d/year=${year}/month=${month}/day=${day}
mysql_db_pwd=biall_pwd2015
mysql_db_name=bi_tag_all
echo 'sqoop start'
$SQOOP_HOME/bin/sqoop export \
--connect "jdbc:mysql://hadoop03:3306/biclick" \
--username $mysql_db_name \
--password $mysql_db_pwd \
--table $table_name \
--columns $table_columns \
--fields-terminated-by '\001' \
--export-dir $hadoop_dir
echo 'sqoop end'
7、数据仓库DDL脚本
#数据仓库DDL
#ods贴源表
drop table if exists ods_weblog_origin;
create table ods_weblog_origin(
valid string,
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
http_referer string,
http_user_agent string)
partitioned by (datestr string)
row format delimited
fields terminated by '\001';
#ods点击流pageviews表
drop table if exists ods_click_pageviews;
create table ods_click_pageviews(
Session string,
remote_addr string,
#加一个字段 user string,
time_local string,
request string,
visit_step string,
page_staylong string,
http_referer string,
http_user_agent string,
body_bytes_sent string,
status string)
partitioned by (datestr string)
row format delimited
fields terminated by '\001';
#点击流visit表
drop table if exist ods_click_visit;
create table ods_click_visit(
session string,
remote_addr string,
inTime string,
outTime string,
inPage string,
outPage string,
referal string,
pageVisits int)
partitioned by (datestr string);
#etl明细宽表
drop table ods_weblog_detail;
create table ods_weblog_detail(
valid string, --有效标识
remote_addr string, --来源IP
remote_user string, --用户标识
time_local string, --访问完整时间
daystr string, --访问日期
timestr string, --访问时间
month string, --访问月
day string, --访问日
hour string, --访问时
request string, --请求的url
status string, --响应码
body_bytes_sent string, --传输字节数
http_referer string, --来源url
ref_host string, --来源的host
ref_path string, --来源的路径
ref_query string, --来源参数query
ref_query_id string, --来源参数query的值
http_user_agent string --客户终端标识
)
partitioned by(datestr string);
#时间维度表
create table v_time(year string,month string,day string,hour string)
row format delimited
fields terminated by ',';
load data local inpath '/home/hadoop/v_time.txt' into table v_time;
#每小时pv统计表
drop table dw_pvs_hour;
create table dw_pvs_hour(month string,day string,hour string,pvs bigint) partitioned by(datestr string);
#每日用户平均pv
drop table dw_avgpv_user_d;
create table dw_avgpv_user_d(
day string,
avgpv string);
#来源维度PV统计表(小时粒度)
drop table zs.dw_pvs_referer_h;
create table zs.dw_pvs_referer_h(referer_url string,referer_host string,month string,day string,hour string,pv_referer_cnt bigint) partitioned by(datestr string);
#每小时来源PV topn
drop table zs.dw_pvs_refhost_topn_h;
create table zs.dw_pvs_refhost_topn_h(
hour string,
toporder string,
ref_host string,
ref_host_cnts string
) partitioned by(datestr string);