#!/bin/bash
### nohup sh rc_CPData4.sh --copy 120 &
###避免同一个脚本重复启动
while [ $# -ne 2 ] || [ `ps x |grep -v grep|grep rc_CPData4.sh|wc -l` -gt 2 ] ; do
echo "args are Incorrect"
exit
done
###配置需要同步的省份
#export HADOOP_USER_NAME="hdfs"
HADOOP_USER_NAME="hdfs"
declare -a provinces=("beijing" "shanghai" "guangdong")
###初始化并判断提交参数
command_type=$1
mapcount=$2
if ( [ $command_type != "--copy" ] || [ $command_type != "--update" ] ) && (( $mapcount <= 100 )) ; then
echo "args are Incorrect,--copy or --updata and mapcount more than 100"
exit
fi
DMRsetup="-Dmapreduce.job.queuename=ss_deploy -Dmapreduce.task.timeout=36000"
SOURCE_ROOT_DIR="hdfs://xxx:8020/serv/smartsteps/raw/events"
TARGET_ROOT_DIR="hdfs://xxx:8020/serv/smartsteps/raw/events"
###检测函数,判断源和目标是否存在
ret_Result()
{
hadoop fs -test -e ${1}
if [ $? -ne 0 ]; then echo "failed";else echo "success";fi
}
###进入省循环体
for p in "${provinces[@]}"
do
###读取需要同步的表,进入表循环
for line in `cat ./list.txt | grep -v "#"`
do
workspace=`echo $line |cut -d '|' -f1`
table=`echo $line |cut -d '|' -f2`
source_workspace=${SOURCE_ROOT_DIR}"/"${p}${workspace}
source_table=${source_workspace}${table}
target_workspace=${TARGET_ROOT_DIR}"/"${p}${workspace}
target_table=${target_workspace}${table}
source_table_flag=`ret_Result $source_table`
###如果命令为:"--copy"并且源表存在就开始进入同步
if [ ${command_type} == "--copy" ] && [ ${source_table_flag} == "success" ];then
target_workspace_flag=`ret_Result $target_workspace`
target_table_flag=`ret_Result $target_table`
###目标表存在就跳过
if [ ${target_workspace_flag} == "success" ] && [ ${target_table_flag} == "success" ];then
echo "-----table already exist,skip copy --"${target_table}
elif [ ${target_table_flag} == "failed" ];then
if [ ${target_workspace_flag} == "failed" ];then
echo "-----creat workspace_dir" ${target_workspace}
hadoop fs -mkdir -p ${target_workspace}
else
echo "-----workspace_dir already exist" ${target_workspace}
fi
echo "-----start Synchronous data,table name:" ${table}
echo "-----from " ${source_table} "to" ${target_workspace}
hadoop distcp ${DMRsetup} -m ${mapcount} ${source_table} ${target_workspace}
###判断同步结果是失败就删除临时文件,重新同步
if [ $? -ne 0 ] ; then
source_table_flag=`ret_Result $source_table`
target_table_flag=`ret_Result $target_table`
echo "-----Synchronous failed,del files :target_workspace/.distcp.tmp*,start updating it "
echo "-----start update table:" ${target_table}
hadoop fs -rm ${target_workspace}"/.distcp.tmp*"
if [ ${source_table_flag} == "success" ] && [ ${target_table_flag} == "success" ];then
hadoop distcp ${DMRsetup} -update -m ${mapcount} ${source_table} ${target_table}
if [ $? -ne 0 ]; then
echo "-----update failed,final------"
hadoop fs -rm ${target_table}"/.distcp.tmp*"
fi
elif [ ${source_table_flag} == "success" ] && [ ${target_table_flag} == "failed" ];then
echo "-----update table ,use Synchronous"
hadoop distcp ${DMRsetup} -m ${mapcount} ${source_table} ${target_workspace}
if [ $? -ne 0 ]; then
echo "-----update failed,final-----"
hadoop fs -rm ${target_workspace}"/.distcp.tmp*"
fi
fi
else
echo "=====Synchronous finished======"
fi
fi
###如果命令为:"--update"并且源表存在就开始进入更新模块,
elif [ ${command_type} == "--update" ] && [ ${source_table_flag} == "success" ]; then
source_table_flag=`ret_Result $source_table`
target_table_flag=`ret_Result $target_table`
if [ ${source_table_flag} == "failed" ];then
echo "-----source_table not exist-----"
fi
if [ ${target_table_flag} == "failed" ];then
echo "-----target_table not exist-----"
fi
###更新必须保持源表和目标表同时存在
if [ ${source_table_flag} == "success" ] && [ ${target_table_flag} == "success" ];then
echo "start update data" ${source_table} "to" ${target_workspace}
hadoop distcp ${DMRsetup} -update -m ${mapcount} ${source_table} ${target_table}
###如果更新失败就删除临时文件
if [ $? -ne 0 ]; then
hadoop fs -rm ${target_table}"/.distcp.tmp*"
echo "-----update failed"
else
echo "-----update finished"
fi
else
echo "-----"${target_table} "already not exist,can't update data----"
fi
else
echo "-----source table not exist," $source_table
fi
#echo $workspace
#echo $source_workspace
#echo $target_workspace
done
done
### nohup sh rc_CPData4.sh --copy 120 &
###避免同一个脚本重复启动
while [ $# -ne 2 ] || [ `ps x |grep -v grep|grep rc_CPData4.sh|wc -l` -gt 2 ] ; do
echo "args are Incorrect"
exit
done
###配置需要同步的省份
#export HADOOP_USER_NAME="hdfs"
HADOOP_USER_NAME="hdfs"
declare -a provinces=("beijing" "shanghai" "guangdong")
###初始化并判断提交参数
command_type=$1
mapcount=$2
if ( [ $command_type != "--copy" ] || [ $command_type != "--update" ] ) && (( $mapcount <= 100 )) ; then
echo "args are Incorrect,--copy or --updata and mapcount more than 100"
exit
fi
DMRsetup="-Dmapreduce.job.queuename=ss_deploy -Dmapreduce.task.timeout=36000"
SOURCE_ROOT_DIR="hdfs://xxx:8020/serv/smartsteps/raw/events"
TARGET_ROOT_DIR="hdfs://xxx:8020/serv/smartsteps/raw/events"
###检测函数,判断源和目标是否存在
ret_Result()
{
hadoop fs -test -e ${1}
if [ $? -ne 0 ]; then echo "failed";else echo "success";fi
}
###进入省循环体
for p in "${provinces[@]}"
do
###读取需要同步的表,进入表循环
for line in `cat ./list.txt | grep -v "#"`
do
workspace=`echo $line |cut -d '|' -f1`
table=`echo $line |cut -d '|' -f2`
source_workspace=${SOURCE_ROOT_DIR}"/"${p}${workspace}
source_table=${source_workspace}${table}
target_workspace=${TARGET_ROOT_DIR}"/"${p}${workspace}
target_table=${target_workspace}${table}
source_table_flag=`ret_Result $source_table`
###如果命令为:"--copy"并且源表存在就开始进入同步
if [ ${command_type} == "--copy" ] && [ ${source_table_flag} == "success" ];then
target_workspace_flag=`ret_Result $target_workspace`
target_table_flag=`ret_Result $target_table`
###目标表存在就跳过
if [ ${target_workspace_flag} == "success" ] && [ ${target_table_flag} == "success" ];then
echo "-----table already exist,skip copy --"${target_table}
elif [ ${target_table_flag} == "failed" ];then
if [ ${target_workspace_flag} == "failed" ];then
echo "-----creat workspace_dir" ${target_workspace}
hadoop fs -mkdir -p ${target_workspace}
else
echo "-----workspace_dir already exist" ${target_workspace}
fi
echo "-----start Synchronous data,table name:" ${table}
echo "-----from " ${source_table} "to" ${target_workspace}
hadoop distcp ${DMRsetup} -m ${mapcount} ${source_table} ${target_workspace}
###判断同步结果是失败就删除临时文件,重新同步
if [ $? -ne 0 ] ; then
source_table_flag=`ret_Result $source_table`
target_table_flag=`ret_Result $target_table`
echo "-----Synchronous failed,del files :target_workspace/.distcp.tmp*,start updating it "
echo "-----start update table:" ${target_table}
hadoop fs -rm ${target_workspace}"/.distcp.tmp*"
if [ ${source_table_flag} == "success" ] && [ ${target_table_flag} == "success" ];then
hadoop distcp ${DMRsetup} -update -m ${mapcount} ${source_table} ${target_table}
if [ $? -ne 0 ]; then
echo "-----update failed,final------"
hadoop fs -rm ${target_table}"/.distcp.tmp*"
fi
elif [ ${source_table_flag} == "success" ] && [ ${target_table_flag} == "failed" ];then
echo "-----update table ,use Synchronous"
hadoop distcp ${DMRsetup} -m ${mapcount} ${source_table} ${target_workspace}
if [ $? -ne 0 ]; then
echo "-----update failed,final-----"
hadoop fs -rm ${target_workspace}"/.distcp.tmp*"
fi
fi
else
echo "=====Synchronous finished======"
fi
fi
###如果命令为:"--update"并且源表存在就开始进入更新模块,
elif [ ${command_type} == "--update" ] && [ ${source_table_flag} == "success" ]; then
source_table_flag=`ret_Result $source_table`
target_table_flag=`ret_Result $target_table`
if [ ${source_table_flag} == "failed" ];then
echo "-----source_table not exist-----"
fi
if [ ${target_table_flag} == "failed" ];then
echo "-----target_table not exist-----"
fi
###更新必须保持源表和目标表同时存在
if [ ${source_table_flag} == "success" ] && [ ${target_table_flag} == "success" ];then
echo "start update data" ${source_table} "to" ${target_workspace}
hadoop distcp ${DMRsetup} -update -m ${mapcount} ${source_table} ${target_table}
###如果更新失败就删除临时文件
if [ $? -ne 0 ]; then
hadoop fs -rm ${target_table}"/.distcp.tmp*"
echo "-----update failed"
else
echo "-----update finished"
fi
else
echo "-----"${target_table} "already not exist,can't update data----"
fi
else
echo "-----source table not exist," $source_table
fi
#echo $workspace
#echo $source_workspace
#echo $target_workspace
done
done